forgot to store their history). Also, added some hardware schematics showing the workings of the interface hat for the CW key (and the key line to the RS-HFIQ).
504 lines
18 KiB
Python
504 lines
18 KiB
Python
########################################################################
|
|
# Pi-HFIQ hardware control module for Quisk.
|
|
#
|
|
# Original 'hardware_usbserial.py' for RS-HFIQ hardware by _____.
|
|
#
|
|
# Adapted into the "Pi-HFIQ" configuration by Rob French, KC4UPR.
|
|
#
|
|
# This file provides for USB control of the HobbyPCB RS-HFIQ SDR
|
|
# transceiver, in conjunction with a Raspberry Pi for control. The
|
|
# Raspberry Pi provides ADC/DAC of the RX and TX I/Q signals, as well as
|
|
# input methods for audio, digital, and CW modes.
|
|
#
|
|
# TODO as of 2020-04-21:
|
|
# (1) Implement all/most of the keyer parameters as user-controllable
|
|
# items in the configuration file, widgets, and/or config menu.
|
|
# (2) Make power level settings automatically get stored/restored per
|
|
# band, mode, and possibly even frequency ("nearest 10 KHz" or
|
|
# something like that... memory is cheap). This is due to the
|
|
# fact that I get the RS-HFIQ "CLIP" light coming on with different
|
|
# output volumes depending on mode, CW vs digital, etc.
|
|
# (3) Clean up more comments.
|
|
# (4) Add an automatic device finder... so you don't have to edit
|
|
# /dev/ttyUSB0, ...USB1, etc.
|
|
#
|
|
########################################################################
|
|
|
|
from __future__ import print_function
|
|
|
|
import math
|
|
import psutil
|
|
import serial
|
|
import serial.tools.list_ports
|
|
import struct
|
|
import sys
|
|
import threading
|
|
import time
|
|
import traceback
|
|
|
|
from quisk_hardware_model import Hardware as BaseHardware
|
|
import _quisk as QS
|
|
|
|
DEBUG = 1
|
|
|
|
KEYER_MODE_LIST = (('Keyer', None), ('Bug', None), ('Iambic A', None), ('Iambic B', None))
|
|
|
|
DEFAULT_KEYER_MODE = 2
|
|
DEFAULT_KEYER_WPM = 15
|
|
|
|
########################################################################
|
|
|
|
def dprint(*args, **kwargs):
|
|
print(*args, file=sys.stdout, **kwargs)
|
|
|
|
def eprint(*args, **kwargs):
|
|
print(*args, file=sys.stderr, **kwargs)
|
|
|
|
########################################################################
|
|
# RSHFIQ class
|
|
#
|
|
# An object that encapsulates all of the functionality associated with
|
|
# the RS-HFIQ serial interface. It provides methods and/or properties
|
|
# for each of the commands in the RS-HFIQ interface command list, and
|
|
# performs all of the necessary serial port operations.
|
|
########################################################################
|
|
|
|
class Hardware(BaseHardware):
|
|
def __init__(self, app, conf, debug=DEBUG):
|
|
BaseHardware.__init__(self, app, conf)
|
|
|
|
# serial port for the transceiver (rig) -- RS-HFIQ
|
|
self.rig = RSHFIQ(device="/dev/ttyUSB0", debug=DEBUG)
|
|
|
|
self.debug = debug
|
|
self.vfo = None
|
|
self.ptt_button = 0
|
|
self.is_cw = False
|
|
self.key_thread = None
|
|
self.mon_thread = None
|
|
|
|
self.keyer_mode_list = [x[0] for x in KEYER_MODE_LIST]
|
|
|
|
self.keyer_mode = DEFAULT_KEYER_MODE
|
|
self.keyer_wpm = DEFAULT_KEYER_WPM
|
|
|
|
self.SetKeyerMode(self.keyer_mode)
|
|
self.SetKeyerWPM(self.keyer_wpm)
|
|
|
|
def open(self): # Called once to open the Hardware
|
|
if self.rig.open():
|
|
version = self.rig.version
|
|
if version[0:7] == "RS-HFIQ":
|
|
if self.conf.name_of_mic_play and self.conf.key_poll_msec:
|
|
self.key_thread = KeyThread(self.rig, self.conf.key_poll_msec / 1000.0, self.conf.key_hang_time)
|
|
self.key_thread.start()
|
|
self.mon_thread = StatusThread(self.rig)
|
|
self.mon_thread.start()
|
|
return version
|
|
else:
|
|
eprint("[Pi-HFIQ] could not find the RS-HFIQ device -- terminating")
|
|
exit()
|
|
else:
|
|
return "[Pi-HFIQ] unable to open device"
|
|
|
|
def close(self): # Called once to close the Hardware
|
|
if self.key_thread:
|
|
self.key_thread.stop()
|
|
self.key_thread = None
|
|
|
|
if self.mon_thread:
|
|
self.mon_thread.stop()
|
|
self.mon_thread = None
|
|
|
|
self.rig.serial.close()
|
|
|
|
return "[Pi-HFIQ] closed"
|
|
|
|
def HeartBeat(self):
|
|
if self.application.bottom_widgets:
|
|
self.application.bottom_widgets.UpdateText(
|
|
"Rig: %.0fC, CPU: %.0fC, Proc: %.0f%%, Mem: %.0f%%" % (
|
|
self.mon_thread.rig_temp,
|
|
self.mon_thread.cpu_temp,
|
|
self.mon_thread.cpu_load,
|
|
self.mon_thread.mem_load))
|
|
pass
|
|
|
|
def ReturnFrequency(self):
|
|
return None, self.rig.frequency
|
|
|
|
def ChangeFrequency(self, tune, vfo, source='', band='', event=None):
|
|
if self.vfo <> vfo :
|
|
self.vfo = vfo
|
|
self.rig.frequency = self.vfo
|
|
# probably need to check that this actually worked!
|
|
return tune, self.vfo
|
|
|
|
def OnButtonPTT(self, event=None):
|
|
if event:
|
|
if event.GetEventObject().GetValue():
|
|
if self.debug: dprint("[Pi-HFIQ] PTT pressed")
|
|
self.ptt_button = 1
|
|
else:
|
|
if self.debug: dprint("[Pi-HFIQ] PTT released")
|
|
self.ptt_button = 0
|
|
if self.key_thread and self.is_cw:
|
|
# keyer thread exists--it handles PTT
|
|
# Also temporarily using this only for CW, because digimodes
|
|
# aren't working with the keyer thread...
|
|
if self.debug: dprint("[Pi-HFIQ] PTT handled by keyer thread")
|
|
self.key_thread.OnPTT(self.ptt_button)
|
|
else:
|
|
# no keyer thread--PTT handled here
|
|
if self.debug: dprint("[Pi-HFIQ] PTT state not handled by keyer thread")
|
|
self.rig.transmit = (self.ptt_button == 1)
|
|
QS.set_key_down(event.GetEventObject().GetValue())
|
|
|
|
# if self.is_cw:
|
|
# QS.set_key_down(0)
|
|
# QS.set_transmit_mode(self.ptt_button)
|
|
# else:
|
|
# QS.set_key_down(self.ptt_button)
|
|
|
|
def ChangeMode(self, mode): # Change the tx/rx mode
|
|
# mode is a string: "USB", "AM", etc.
|
|
if mode in ('CWU', 'CWL'):
|
|
self.is_cw = True
|
|
QS.set_gpio_keyer_enabled(1)
|
|
else:
|
|
self.is_cw = False
|
|
QS.set_gpio_keyer_enabled(0)
|
|
if self.key_thread:
|
|
self.key_thread.IsCW(self.is_cw)
|
|
elif hasattr(self, 'OnButtonPTT'):
|
|
# what is this for?
|
|
self.OnButtonPTT()
|
|
|
|
def OnSpot(self, level):
|
|
if self.key_thread:
|
|
self.key_thread.OnSpot(level)
|
|
|
|
def SetKeyerMode(self, value):
|
|
self.keyer_mode = value
|
|
if (self.keyer_mode == 0):
|
|
QS.set_gpio_keyer_enabled(0)
|
|
else:
|
|
QS.set_gpio_keyer_enabled(1)
|
|
QS.set_gpio_keyer_mode(self.keyer_mode - 1)
|
|
|
|
def SetKeyerWPM(self, value):
|
|
self.keyer_wpm = value
|
|
QS.set_gpio_keyer_speed(self.keyer_wpm)
|
|
|
|
class KeyThread(threading.Thread):
|
|
"""Create a thread to monitor the key state."""
|
|
def __init__(self, rig, poll_secs, key_hang_time):
|
|
self.rig = rig
|
|
self.poll_secs = poll_secs
|
|
self.key_hang_time = key_hang_time
|
|
|
|
self.ptt_button = 0
|
|
self.spot_level = -1 # level is -1 for Spot button Off; else the Spot level 0 to 1000.
|
|
self.currently_in_tx = 0
|
|
self.is_cw = False
|
|
self.key_timer = 0
|
|
self.key_transmit = 0
|
|
|
|
threading.Thread.__init__(self)
|
|
self.doQuit = threading.Event()
|
|
self.doQuit.clear()
|
|
|
|
def run(self):
|
|
while not self.doQuit.isSet():
|
|
key_down = QS.is_key_down() # get the internal Quisk key state
|
|
|
|
# in the future, we can have the CW section handled by the H/W key
|
|
# line to the RS-HFIQ
|
|
if self.is_cw:
|
|
pass # this needs to be configurable; however, commenting
|
|
# everything below out to test key down via discrete
|
|
# if self.spot_level >= 0 or key_down: # key is down
|
|
# self.key_transmit = 1
|
|
# self.key_timer = time.time() # QSK/semi-QSK, as applicable
|
|
|
|
# else: # key is up
|
|
# #QS.set_key_down(0)
|
|
# if self.key_transmit and time.time() - self.key_timer > self.key_hang_time:
|
|
# self.key_transmit = 0
|
|
|
|
# if self.key_transmit != self.currently_in_tx:
|
|
# QS.set_transmit_mode(self.key_transmit)
|
|
# self.rig.transmit = (self.key_transmit == 1)
|
|
# self.currently_in_tx = self.key_transmit # success
|
|
# if DEBUG: print ("Change CW currently_in_tx", self.currently_in_tx)
|
|
|
|
# modes other than CW -- only allow the PTT button (and CAT?)
|
|
elif False: # temporarily disabled this chunk...
|
|
if self.ptt_button:
|
|
self.key_transmit = 1
|
|
if self.debug: dprint("[Pi-HFIQ Keyer Thread] non-CW transmit ON")
|
|
else:
|
|
self.key_transmit = 0
|
|
if self.debug: dprint("[Pi-HFIQ Keyer Thread] non-CW transmit OFF")
|
|
|
|
if self.key_transmit != self.currently_in_tx:
|
|
#QS.set_transmit_mode(self.key_transmit)
|
|
QS.set_key_down(self.key_transmit)
|
|
self.rig.transmit = (self.key_transmit == 1)
|
|
self.currently_in_tx = self.key_transmit # success
|
|
if DEBUG: print ("Change CW currently_in_tx", self.currently_in_tx)
|
|
time.sleep(self.poll_secs)
|
|
|
|
def stop(self):
|
|
"""Set a flag to indicate that the thread should end."""
|
|
self.doQuit.set()
|
|
|
|
def OnPTT(self, ptt):
|
|
self.ptt_button = ptt
|
|
|
|
def OnSpot(self, level):
|
|
self.spot_level = level
|
|
|
|
def IsCW(self, is_cw):
|
|
self.is_cw = is_cw
|
|
|
|
class StatusThread(threading.Thread):
|
|
"""Create a thread to monitor various parameters."""
|
|
def __init__(self, rig, poll_secs=0.5):
|
|
self.rig = rig
|
|
self.poll_secs = poll_secs
|
|
|
|
self.__rig_temp = 0
|
|
self.__cpu_temp = 0
|
|
self.__cpu_load = 0
|
|
self.__mem_load = 0
|
|
|
|
threading.Thread.__init__(self)
|
|
self.doQuit = threading.Event()
|
|
self.doQuit.clear()
|
|
|
|
def run(self):
|
|
while not self.doQuit.isSet():
|
|
self.__rig_temp = self.rig.temperature
|
|
self.__cpu_temp = psutil.sensors_temperatures()['cpu-thermal'][0].current
|
|
self.__cpu_load = psutil.cpu_percent()
|
|
self.__mem_load = psutil.virtual_memory().percent
|
|
time.sleep(self.poll_secs)
|
|
|
|
def stop(self):
|
|
"""Set a flag to indicate that the thread should end."""
|
|
self.doQuit.set()
|
|
|
|
@property
|
|
def rig_temp(self):
|
|
return self.__rig_temp
|
|
|
|
@property
|
|
def cpu_temp(self):
|
|
return self.__cpu_temp
|
|
|
|
@property
|
|
def cpu_load(self):
|
|
return self.__cpu_load
|
|
|
|
@property
|
|
def mem_load(self):
|
|
return self.__mem_load
|
|
|
|
|
|
########################################################################
|
|
# RSHFIQ class
|
|
#
|
|
# An object that encapsulates all of the functionality associated with
|
|
# the RS-HFIQ serial interface. It provides methods and/or properties
|
|
# for each of the commands in the RS-HFIQ interface command list, and
|
|
# performs all of the necessary serial port operations.
|
|
########################################################################
|
|
|
|
RSHFIQ_OUTPUT_LEVEL = ("off", "2 ma", "4 ma", "6 ma", "8 ma")
|
|
|
|
class RSHFIQ(object):
|
|
def __init__(self, device="/dev/ttyUSB0", output_level=2, debug=0):
|
|
"""Create a new RS-HFIQ object, associated with the specified
|
|
serial port. Optionally set output level of the VFO, as well as
|
|
debug level.
|
|
"""
|
|
|
|
self.serial = serial.Serial()
|
|
self.serial.port = device
|
|
self.serial.baudrate = 57600
|
|
self.serial.bytesize = serial.EIGHTBITS # number of bits per byte
|
|
self.serial.parity = serial.PARITY_NONE # set parity check: no parity
|
|
self.serial.stopbits = serial.STOPBITS_ONE # number of stop bits
|
|
self.serial.timeout = 1 # non-block read
|
|
self.serial.rtscts = False
|
|
|
|
self.debug = debug
|
|
|
|
self.__version = None
|
|
self.__output = output_level
|
|
self.__transmit = False
|
|
self.__lock = threading.Lock()
|
|
|
|
if self.debug: dprint("[RS-HFIQ] created new object: ", self.serial.port)
|
|
|
|
#===================================================================
|
|
|
|
def open(self):
|
|
"""Open the serial port associated with the RS-HFIQ, clear the
|
|
input, retrieve the version info, and configure for use.
|
|
"""
|
|
try:
|
|
self.serial.open()
|
|
except Exception, e:
|
|
print(e)
|
|
raise Exception
|
|
|
|
if self.serial.isOpen():
|
|
if self.debug: dprint("[RS-HFIQ] opened device: ", self.serial.port)
|
|
self.serial.flushInput() # flush input buffer, discarding all its contents
|
|
self.serial.flushOutput() # flush output buffer, aborting current output
|
|
# and discard all that is in buffer
|
|
time.sleep(1) # wait a moment for init to finish
|
|
|
|
# BUG: For some reason, I have to set a parameter to the
|
|
# RS-HFIQ before I can successfully read version info???
|
|
# But I don't have to do this if I just login to the RS-HFIQ
|
|
# via a serial terminal...
|
|
self.output_level = self.__output
|
|
self.transmit = False
|
|
|
|
if self.debug: dprint("[RS-HFIQ] getting version info")
|
|
# TODO: Make this it's own method... retrieving the version.
|
|
self.__lock.acquire()
|
|
self.serial.write("*W\r")
|
|
self.__version = self.serial.readline().strip()
|
|
self.__lock.release()
|
|
if self.debug: dprint("[RS-HFIQ] firmware version: ", self.__version)
|
|
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
#===================================================================
|
|
|
|
def close(self):
|
|
"""Close the serial port associated with the RS-HFIQ.
|
|
"""
|
|
if self.serial.isOpen():
|
|
self.serial.close()
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
#-------------------------------------------------------------------
|
|
# version - read-only property, returns the version of the RS-HFIQ.
|
|
#-------------------------------------------------------------------
|
|
|
|
@property
|
|
def version(self):
|
|
if self.debug: dprint("[RS-HFIQ] firmware version: ", self.__version)
|
|
return self.__version
|
|
|
|
#-------------------------------------------------------------------
|
|
# frequency - read-write property, sets the tuned frequency (VFO) of
|
|
# the RS-HFIQ as specified.
|
|
#-------------------------------------------------------------------
|
|
|
|
@property
|
|
def frequency(self):
|
|
if self.serial.isOpen():
|
|
self.__lock.acquire()
|
|
self.serial.write("*F?\r")
|
|
freq = self.serial.readline().strip()
|
|
self.__lock.release()
|
|
if self.debug: dprint("[RS-HFIQ] current tuned frequency: ", freq)
|
|
return freq
|
|
else:
|
|
return None
|
|
|
|
@frequency.setter
|
|
def frequency(self, freq):
|
|
if self.serial.isOpen():
|
|
self.__lock.acquire()
|
|
self.serial.write("*F" + str(freq) + "\r")
|
|
self.__lock.release()
|
|
if self.debug: dprint("[RS-HFIQ] setting tuned frequency: ", str(freq))
|
|
else:
|
|
pass
|
|
|
|
#-------------------------------------------------------------------
|
|
# output_level - read-write property, sets output level of the
|
|
# RS-HFIQ tuner frequency per the interface definition:
|
|
# 0 = off
|
|
# 1 = 2 ma drive
|
|
# 2 = 4 ma drive
|
|
# 3 = 6 ma drive
|
|
# 4 = 8 ma drive
|
|
#-------------------------------------------------------------------
|
|
|
|
@property
|
|
def output_level(self):
|
|
# should probably do some bounds checking on the debug statement
|
|
if self.debug: dprint("[RS-HFIQ] current output level: ", RSHFIQ_OUTPUT_LEVEL[self.__output])
|
|
return self.__output
|
|
|
|
@output_level.setter
|
|
def output_level(self, output_level):
|
|
if self.serial.isOpen():
|
|
self.__output = output_level
|
|
self.__lock.acquire()
|
|
self.serial.write("*OF" + str(self.__output) + "\r")
|
|
self.__lock.release()
|
|
if self.debug: dprint("[RS-HFIQ] set output level: ", RSHFIQ_OUTPUT_LEVEL[self.__output])
|
|
else:
|
|
pass
|
|
|
|
#-------------------------------------------------------------------
|
|
# transmit - read-write property, True/False sets the transmit state
|
|
# of the RS-HFIQ to ON/OFF.
|
|
#-------------------------------------------------------------------
|
|
|
|
@property
|
|
def transmit(self):
|
|
if self.debug: dprint("[RS-HFIQ] current transmit state ", "ON" if self.__transmit else "OFF")
|
|
return self.__transmit
|
|
|
|
@transmit.setter
|
|
def transmit(self, state):
|
|
if self.serial.isOpen():
|
|
if state:
|
|
self.__lock.acquire()
|
|
self.serial.write("*X1\r")
|
|
self.__lock.release()
|
|
self.__transmit = True
|
|
if self.debug: dprint("[RS-HFIQ] set transmit ON")
|
|
else:
|
|
self.__lock.acquire()
|
|
self.serial.write("*X0\r")
|
|
self.__lock.release()
|
|
self.__transmit = False
|
|
if self.debug: dprint("[RS-HFIQ] set transmit OFF")
|
|
else:
|
|
pass
|
|
|
|
#-------------------------------------------------------------------
|
|
# temperature - read-only property, read from the RS-HFIQ
|
|
#-------------------------------------------------------------------
|
|
|
|
@property
|
|
def temperature(self):
|
|
if self.serial.isOpen():
|
|
self.__lock.acquire()
|
|
self.serial.write("*T\r")
|
|
temp = self.serial.readline().strip()[:-1]
|
|
self.__lock.release()
|
|
if self.debug: dprint("[RS-HFIQ] current temperature: ", temp)
|
|
return float(temp)
|
|
else:
|
|
return None
|
|
|
|
########################################################################
|
|
# EOF
|
|
########################################################################
|