hr50-remote-display/server/hr50_rd_server.py

182 lines
4.6 KiB
Python

from flask import Flask, jsonify, request
import sys
import getopt
import time
import serial
from subprocess import check_output
import threading
app = Flask(__name__)
shell_cmd = "./scripts/toggle_antenna.sh"
serial_cmd = "hrtu1"
serial_cmd_msg = "Tune"
# The serial/USB port the HR50 is connected to
serial_port = '/dev/ttyUSB0'
# doesn't really matter but must match settings of the HR50
baud = 19200
# the active command that has been sent via the API
# If populated: next connection to the HR50 will be used
# to send the command
# If empty: next connection to the HR50 will be used
# to query status information
cmd_rcvd = ""
# time in milliseconds when we started to display a status message
msg_time = 0
# time in milliseconds a status message will be displayed
msg_duration = 3000
'''
bands = {
'6': '0',
'10': '1',
'12': '2',
'15': '3',
'17': '4',
'20': '5',
'30': '6',
'40': '7',
'60': '8',
'80': '9',
'160': '10'
}
'''
vlcd = {
'STA': '-',
'PTT': '-',
'BND': '-',
'VLT': '-',
'PEP': '-',
'AVG': '-',
'SWR': '-',
'TMP': '-',
'RET': 'ok'
}
'''
keying_methods = {
"off" : "0",
"ptt" : "1",
"cor" : "2",
"qrp" : "3"
}
'''
def current_milli_time():
return round(time.time() * 1000)
def get_serial(port, baud):
try:
ser = serial.Serial(port, baud, timeout=2)
return ser
except serial.serialutil.SerialException as e:
print("The following error has occured while opening the serial connecti on to {} with {} baud:".format(port, baud))
print(e)
sys.exit(2)
# sends a command to the
# Hardrock-50 via the serial interface
def send_cmd_via_serial(cmd):
ser = get_serial(serial_port, baud)
res = None
try:
command = cmd + ';'
ser.write(str.encode(command))
res = ser.readline().decode("utf-8").rstrip()
except Exception as e:
print("The following error has occured while sending the command {} to t he HR50:".format(port, baud))
print(e)
ser.close()
return res
# Executed two commands vie the serial interface,
# parsed the result ad polulates a dict with the
# collected information
def get_info():
global msg_time
ser = get_serial(serial_port, baud)
try:
ser.write(b'HRRX;')
time.sleep(0.5)
res = ser.readline().decode("utf-8").rstrip().replace(';', '').split(',')
except Exception as e:
print("The following error has occured while sending the command {} to the HR50:".format(port, baud))
print(e)
ser.close()
return None
if res and len(res) > 3:
vlcd['STA'] = res[0]
vlcd['PTT'] = res[1]
vlcd['BND'] = res[2]
vlcd['TMP'] = res[3]
vlcd['VLT'] = res[4]
ser.write(b'HRMX;')
time.sleep(0.5)
res = ser.readline().decode("utf-8").rstrip().split()
ser.close()
vlcd['PEP'] = res[1][1:]
vlcd['AVG'] = res[2][1:]
if res[3][1:] != "0":
vlcd['SWR'] = res[3][1:2] + "." + res[3][2:3]
else:
vlcd['RET'] = "(!)"
msg_time = current_milli_time()
def background_loop():
global cmd_rcvd
global msg_time
global serial_cmd_msg
threading.Timer(3.0, background_loop).start()
if cmd_rcvd != "":
ret = ""
try:
send_cmd_via_serial(cmd_rcvd)
vlcd['RET'] = serial_cmd_msg
msg_time = current_milli_time()
except Exception as e:
ret = "ERROR"
cmd_rcvd = ""
else:
get_info()
if msg_time != 0 and msg_time + msg_duration < current_milli_time() or vlcd['RET'] == "ok":
vlcd['RET'] = vlcd['VLT']
@app.route('/get_status')
def get_status():
return jsonify(vlcd)
@app.route('/exec_shell')
def exec_shell_command():
global msg_time
out = check_output([shell_cmd, ""])
if out:
vlcd['RET'] = out.decode("utf-8")
msg_time = current_milli_time()
return out
@app.route('/exec_serial')
def exec_serial_command():
global cmd_rcvd
cmd_rcvd = serial_cmd
return jsonify(isError= False,
message= "Success",
statusCode= 200,
data= "" ), 200
background_loop()