mirror of
https://codeberg.org/mclemens/hr50-remote-display.git
synced 2024-09-28 23:46:14 -04:00
24fe5c873d
fixed a lot of bugs added pushable button with long and short press added antenna toggle script
180 lines
4.6 KiB
Python
180 lines
4.6 KiB
Python
from flask import Flask, jsonify, request
|
|
import sys
|
|
import getopt
|
|
import time
|
|
import serial
|
|
from subprocess import check_output
|
|
import threading
|
|
|
|
app = Flask(__name__)
|
|
|
|
shell_cmd = "./scripts/toggle_antenna.sh"
|
|
serial_cmd = "hrtu1"
|
|
|
|
# The serial/USB port the HR50 is connected to
|
|
serial_port = '/dev/ttyUSB0'
|
|
|
|
# doesn't really matter but must match settings of the HR50
|
|
baud = 19200
|
|
|
|
# the active command that has been sent via the API
|
|
# If populated: next connection to the HR50 will be used
|
|
# to send the command
|
|
# If empty: next connection to the HR50 will be used
|
|
# to query status information
|
|
cmd_rcvd = ""
|
|
|
|
# time in milliseconds when we started to display a status message
|
|
msg_time = 0
|
|
|
|
# time in milliseconds a status message will be displayed
|
|
msg_duration = 3000
|
|
|
|
'''
|
|
bands = {
|
|
'6': '0',
|
|
'10': '1',
|
|
'12': '2',
|
|
'15': '3',
|
|
'17': '4',
|
|
'20': '5',
|
|
'30': '6',
|
|
'40': '7',
|
|
'60': '8',
|
|
'80': '9',
|
|
'160': '10'
|
|
}
|
|
'''
|
|
|
|
vlcd = {
|
|
'STA': '-',
|
|
'PTT': '-',
|
|
'BND': '-',
|
|
'VLT': '-',
|
|
'PEP': '-',
|
|
'AVG': '-',
|
|
'SWR': '-',
|
|
'TMP': '-',
|
|
'RET': 'ok'
|
|
}
|
|
|
|
'''
|
|
keying_methods = {
|
|
"off" : "0",
|
|
"ptt" : "1",
|
|
"cor" : "2",
|
|
"qrp" : "3"
|
|
}
|
|
'''
|
|
|
|
def current_milli_time():
|
|
return round(time.time() * 1000)
|
|
|
|
|
|
def get_serial(port, baud):
|
|
try:
|
|
ser = serial.Serial(port, baud, timeout=2)
|
|
return ser
|
|
except serial.serialutil.SerialException as e:
|
|
print("The following error has occured while opening the serial connecti on to {} with {} baud:".format(port, baud))
|
|
print(e)
|
|
sys.exit(2)
|
|
|
|
|
|
# sends a command to the
|
|
# Hardrock-50 via the serial interface
|
|
def send_cmd_via_serial(cmd):
|
|
ser = get_serial(serial_port, baud)
|
|
res = None
|
|
try:
|
|
command = cmd + ';'
|
|
ser.write(str.encode(command))
|
|
res = ser.readline().decode("utf-8").rstrip()
|
|
except Exception as e:
|
|
print("The following error has occured while sending the command {} to t he HR50:".format(port, baud))
|
|
print(e)
|
|
ser.close()
|
|
return res
|
|
|
|
|
|
# Executed two commands vie the serial interface,
|
|
# parsed the result ad polulates a dict with the
|
|
# collected information
|
|
def get_info():
|
|
global msg_time
|
|
ser = get_serial(serial_port, baud)
|
|
try:
|
|
ser.write(b'HRRX;')
|
|
time.sleep(0.5)
|
|
res = ser.readline().decode("utf-8").rstrip().replace(';', '').split(',')
|
|
except Exception as e:
|
|
print("The following error has occured while sending the command {} to the HR50:".format(port, baud))
|
|
print(e)
|
|
ser.close()
|
|
return None
|
|
if res and len(res) > 3:
|
|
vlcd['STA'] = res[0]
|
|
vlcd['PTT'] = res[1]
|
|
vlcd['BND'] = res[2]
|
|
vlcd['TMP'] = res[3]
|
|
vlcd['VLT'] = res[4]
|
|
ser.write(b'HRMX;')
|
|
time.sleep(0.5)
|
|
res = ser.readline().decode("utf-8").rstrip().split()
|
|
ser.close()
|
|
vlcd['PEP'] = res[1][1:]
|
|
vlcd['AVG'] = res[2][1:]
|
|
if res[3][1:] != "0":
|
|
vlcd['SWR'] = res[3][1:2] + "." + res[3][2:3]
|
|
else:
|
|
vlcd['RET'] = "(!)"
|
|
msg_time = current_milli_time()
|
|
|
|
|
|
def background_loop():
|
|
global cmd_rcvd
|
|
global msg_time
|
|
threading.Timer(3.0, background_loop).start()
|
|
if cmd_rcvd != "":
|
|
ret = ""
|
|
try:
|
|
send_cmd_via_serial(cmd_rcvd)
|
|
vlcd['RET'] = 'Tune'
|
|
msg_time = current_milli_time()
|
|
except Exception as e:
|
|
ret = "ERROR"
|
|
cmd_rcvd = ""
|
|
else:
|
|
get_info()
|
|
if msg_time != 0 and msg_time + msg_duration < current_milli_time() or vlcd['RET'] == "ok":
|
|
vlcd['RET'] = vlcd['VLT']
|
|
|
|
|
|
@app.route('/get_status')
|
|
def get_status():
|
|
return jsonify(vlcd)
|
|
|
|
|
|
@app.route('/exec_shell')
|
|
def exec_shell_command():
|
|
global msg_time
|
|
out = check_output([shell_cmd, ""])
|
|
if out:
|
|
vlcd['RET'] = out.decode("utf-8")
|
|
msg_time = current_milli_time()
|
|
return out
|
|
|
|
|
|
@app.route('/exec_serial')
|
|
def exec_serial_command():
|
|
global cmd_rcvd
|
|
cmd_rcvd = serial_cmd
|
|
return jsonify(isError= False,
|
|
message= "Success",
|
|
statusCode= 200,
|
|
data= "" ), 200
|
|
|
|
background_loop()
|
|
|
|
|