from flask import Flask, jsonify, request import sys import getopt import time import serial from subprocess import check_output import threading app = Flask(__name__) shell_cmd = "./scripts/toggle_antenna.sh" serial_cmd = "hrtu1" serial_cmd_msg = "Tune" # The serial/USB port the HR50 is connected to serial_port = '/dev/ttyUSB0' # doesn't really matter but must match settings of the HR50 baud = 19200 # the active command that has been sent via the API # If populated: next connection to the HR50 will be used # to send the command # If empty: next connection to the HR50 will be used # to query status information cmd_rcvd = "" # time in milliseconds when we started to display a status message msg_time = 0 # time in milliseconds a status message will be displayed msg_duration = 3000 ''' bands = { '6': '0', '10': '1', '12': '2', '15': '3', '17': '4', '20': '5', '30': '6', '40': '7', '60': '8', '80': '9', '160': '10' } ''' vlcd = { 'STA': '-', 'PTT': '-', 'BND': '-', 'VLT': '-', 'PEP': '-', 'AVG': '-', 'SWR': '-', 'TMP': '-', 'RET': 'ok' } ''' keying_methods = { "off" : "0", "ptt" : "1", "cor" : "2", "qrp" : "3" } ''' def current_milli_time(): return round(time.time() * 1000) def get_serial(port, baud): try: ser = serial.Serial(port, baud, timeout=2) return ser except serial.serialutil.SerialException as e: print("The following error has occured while opening the serial connecti on to {} with {} baud:".format(port, baud)) print(e) sys.exit(2) # sends a command to the # Hardrock-50 via the serial interface def send_cmd_via_serial(cmd): ser = get_serial(serial_port, baud) res = None try: command = cmd + ';' ser.write(str.encode(command)) res = ser.readline().decode("utf-8").rstrip() except Exception as e: print("The following error has occured while sending the command {} to t he HR50:".format(port, baud)) print(e) ser.close() return res # Executed two commands vie the serial interface, # parsed the result ad polulates a dict with the # collected information def get_info(): global msg_time ser = get_serial(serial_port, baud) try: ser.write(b'HRRX;') time.sleep(0.5) res = ser.readline().decode("utf-8").rstrip().replace(';', '').split(',') except Exception as e: print("The following error has occured while sending the command {} to the HR50:".format(port, baud)) print(e) ser.close() return None if res and len(res) > 3: vlcd['STA'] = res[0] vlcd['PTT'] = res[1] vlcd['BND'] = res[2] vlcd['TMP'] = res[3] vlcd['VLT'] = res[4] ser.write(b'HRMX;') time.sleep(0.5) res = ser.readline().decode("utf-8").rstrip().split() ser.close() vlcd['PEP'] = res[1][1:] vlcd['AVG'] = res[2][1:] if res[3][1:] != "0": vlcd['SWR'] = res[3][1:2] + "." + res[3][2:3] else: vlcd['RET'] = "(!)" msg_time = current_milli_time() def background_loop(): global cmd_rcvd global msg_time global serial_cmd_msg threading.Timer(3.0, background_loop).start() if cmd_rcvd != "": ret = "" try: send_cmd_via_serial(cmd_rcvd) vlcd['RET'] = serial_cmd_msg msg_time = current_milli_time() except Exception as e: ret = "ERROR" cmd_rcvd = "" else: get_info() if msg_time != 0 and msg_time + msg_duration < current_milli_time() or vlcd['RET'] == "ok": vlcd['RET'] = vlcd['VLT'] @app.route('/get_status') def get_status(): return jsonify(vlcd) @app.route('/exec_shell') def exec_shell_command(): global msg_time out = check_output([shell_cmd, ""]) if out: vlcd['RET'] = out.decode("utf-8") msg_time = current_milli_time() return out @app.route('/exec_serial') def exec_serial_command(): global cmd_rcvd cmd_rcvd = serial_cmd return jsonify(isError= False, message= "Success", statusCode= 200, data= "" ), 200 background_loop()