182 lines
4.6 KiB
Python
182 lines
4.6 KiB
Python
from flask import Flask, jsonify, request
|
|
import sys
|
|
import getopt
|
|
import time
|
|
import serial
|
|
from subprocess import check_output
|
|
import threading
|
|
|
|
app = Flask(__name__)
|
|
|
|
shell_cmd = "./scripts/toggle_antenna.sh"
|
|
serial_cmd = "hrtu1"
|
|
serial_cmd_msg = "Tune"
|
|
|
|
# The serial/USB port the HR50 is connected to
|
|
serial_port = '/dev/ttyUSB0'
|
|
|
|
# doesn't really matter but must match settings of the HR50
|
|
baud = 19200
|
|
|
|
# the active command that has been sent via the API
|
|
# If populated: next connection to the HR50 will be used
|
|
# to send the command
|
|
# If empty: next connection to the HR50 will be used
|
|
# to query status information
|
|
cmd_rcvd = ""
|
|
|
|
# time in milliseconds when we started to display a status message
|
|
msg_time = 0
|
|
|
|
# time in milliseconds a status message will be displayed
|
|
msg_duration = 3000
|
|
|
|
'''
|
|
bands = {
|
|
'6': '0',
|
|
'10': '1',
|
|
'12': '2',
|
|
'15': '3',
|
|
'17': '4',
|
|
'20': '5',
|
|
'30': '6',
|
|
'40': '7',
|
|
'60': '8',
|
|
'80': '9',
|
|
'160': '10'
|
|
}
|
|
'''
|
|
|
|
vlcd = {
|
|
'STA': '-',
|
|
'PTT': '-',
|
|
'BND': '-',
|
|
'VLT': '-',
|
|
'PEP': '-',
|
|
'AVG': '-',
|
|
'SWR': '-',
|
|
'TMP': '-',
|
|
'RET': 'ok'
|
|
}
|
|
|
|
'''
|
|
keying_methods = {
|
|
"off" : "0",
|
|
"ptt" : "1",
|
|
"cor" : "2",
|
|
"qrp" : "3"
|
|
}
|
|
'''
|
|
|
|
def current_milli_time():
|
|
return round(time.time() * 1000)
|
|
|
|
|
|
def get_serial(port, baud):
|
|
try:
|
|
ser = serial.Serial(port, baud, timeout=2)
|
|
return ser
|
|
except serial.serialutil.SerialException as e:
|
|
print("The following error has occured while opening the serial connecti on to {} with {} baud:".format(port, baud))
|
|
print(e)
|
|
sys.exit(2)
|
|
|
|
|
|
# sends a command to the
|
|
# Hardrock-50 via the serial interface
|
|
def send_cmd_via_serial(cmd):
|
|
ser = get_serial(serial_port, baud)
|
|
res = None
|
|
try:
|
|
command = cmd + ';'
|
|
ser.write(str.encode(command))
|
|
res = ser.readline().decode("utf-8").rstrip()
|
|
except Exception as e:
|
|
print("The following error has occured while sending the command {} to t he HR50:".format(port, baud))
|
|
print(e)
|
|
ser.close()
|
|
return res
|
|
|
|
|
|
# Executed two commands vie the serial interface,
|
|
# parsed the result ad polulates a dict with the
|
|
# collected information
|
|
def get_info():
|
|
global msg_time
|
|
ser = get_serial(serial_port, baud)
|
|
try:
|
|
ser.write(b'HRRX;')
|
|
time.sleep(0.5)
|
|
res = ser.readline().decode("utf-8").rstrip().replace(';', '').split(',')
|
|
except Exception as e:
|
|
print("The following error has occured while sending the command {} to the HR50:".format(port, baud))
|
|
print(e)
|
|
ser.close()
|
|
return None
|
|
if res and len(res) > 3:
|
|
vlcd['STA'] = res[0]
|
|
vlcd['PTT'] = res[1]
|
|
vlcd['BND'] = res[2]
|
|
vlcd['TMP'] = res[3]
|
|
vlcd['VLT'] = res[4]
|
|
ser.write(b'HRMX;')
|
|
time.sleep(0.5)
|
|
res = ser.readline().decode("utf-8").rstrip().split()
|
|
ser.close()
|
|
vlcd['PEP'] = res[1][1:]
|
|
vlcd['AVG'] = res[2][1:]
|
|
if res[3][1:] != "0":
|
|
vlcd['SWR'] = res[3][1:2] + "." + res[3][2:3]
|
|
else:
|
|
vlcd['RET'] = "(!)"
|
|
msg_time = current_milli_time()
|
|
|
|
|
|
def background_loop():
|
|
global cmd_rcvd
|
|
global msg_time
|
|
global serial_cmd_msg
|
|
threading.Timer(3.0, background_loop).start()
|
|
if cmd_rcvd != "":
|
|
ret = ""
|
|
try:
|
|
send_cmd_via_serial(cmd_rcvd)
|
|
vlcd['RET'] = serial_cmd_msg
|
|
msg_time = current_milli_time()
|
|
except Exception as e:
|
|
ret = "ERROR"
|
|
cmd_rcvd = ""
|
|
else:
|
|
get_info()
|
|
if msg_time != 0 and msg_time + msg_duration < current_milli_time() or vlcd['RET'] == "ok":
|
|
vlcd['RET'] = vlcd['VLT']
|
|
|
|
|
|
@app.route('/get_status')
|
|
def get_status():
|
|
return jsonify(vlcd)
|
|
|
|
|
|
@app.route('/exec_shell')
|
|
def exec_shell_command():
|
|
global msg_time
|
|
out = check_output([shell_cmd, ""])
|
|
if out:
|
|
vlcd['RET'] = out.decode("utf-8")
|
|
msg_time = current_milli_time()
|
|
return out
|
|
|
|
|
|
@app.route('/exec_serial')
|
|
def exec_serial_command():
|
|
global cmd_rcvd
|
|
cmd_rcvd = serial_cmd
|
|
return jsonify(isError= False,
|
|
message= "Success",
|
|
statusCode= 200,
|
|
data= "" ), 200
|
|
|
|
background_loop()
|
|
|
|
|