hr50-api/hr50api.py

135 lines
3.4 KiB
Python

from flask import Flask, jsonify, request
import sys
import getopt
import time
import serial
from subprocess import check_output
app = Flask(__name__)
exec_cmd = "/home/pi/bin/toggle_antenna.sh"
serial_port = '/dev/ttyUSB0'
baud = 19200
'''
bands = {
'6': '0',
'10': '1',
'12': '2',
'15': '3',
'17': '4',
'20': '5',
'30': '6',
'40': '7',
'60': '8',
'80': '9',
'160': '10'
}
'''
vlcd = {
'STA': '-',
'PTT': '-',
'BND': '-',
'VLT': '-',
'PEP': '-',
'AVG': '-',
'SWR': '-',
'TMP': '-'
}
'''
keying_methods = {
"off" : "0",
"ptt" : "1",
"cor" : "2",
"qrp" : "3"
}
'''
def get_serial(port, baud):
try:
ser = serial.Serial(port, baud, timeout=2)
return ser
except serial.serialutil.SerialException as e:
print("The following error has occured while opening the serial connecti on to {} with {} baud:".format(port, baud))
print(e)
sys.exit(2)
# sends a command to the
# Hardrock-50 via the serial interface
def send_cmd_via_serial(cmd):
ser = get_serial(serial_port, baud)
res = None
try:
command = cmd + ';'
ser.write(str.encode(command))
res = ser.readline().decode("utf-8").rstrip()
except Exception as e:
print("The following error has occured while sending the command {} to t he HR50:".format(port, baud))
print(e)
ser.close()
return res
# Executed two commands vie the serial interface,
# parsed the result ad polulates a dict with the
# collected information
def get_info():
ser = get_serial(serial_port, baud)
try:
ser.write(b'HRRX;')
time.sleep(0.5)
res = ser.readline().decode("utf-8").rstrip().replace(';', '').split(',')
except Exception as e:
print("The following error has occured while sending the command {} to the HR50:".format(port, baud))
print(e)
ser.close()
return None
if res and len(res) > 3:
vlcd['STA'] = res[0]
vlcd['PTT'] = res[1]
vlcd['BND'] = res[2]
vlcd['TMP'] = res[3]
vlcd['VLT'] = res[4]
ser.write(b'HRMX;')
time.sleep(0.5)
res = ser.readline().decode("utf-8").rstrip().split()
ser.close()
vlcd['PEP'] = res[1][1:]
vlcd['AVG'] = res[2][1:]
if res[3][1:] != "0":
vlcd['SWR'] = res[3][1:2] + "." + res[3][2:3]
else:
vlcd['PTT'] = "ERR"
@app.route('/status')
def get_status():
get_info()
return jsonify(vlcd)
@app.route('/exec')
def exec_shell_command():
out = check_output([exec_cmd, ""])
'''
x = jsonify(isError= False,
message= "Success",
statusCode= 200,
data= out.decode("utf-8") ), 200
print(x)
'''
return out
@app.route('/', methods=['GET'])
def send_command():
ret = ""
try:
ret = request.args.get('cmd', default="", type=str)
send_cmd_via_serial(ret)
except Exception as e:
ret = "ERROR"
return ret