quisk-kc4upr/kc4upr/hardware_pihfiq.py
Rob French 94267ab4b6 Added the hardware and widgets files (sadly, I
forgot to store their history).  Also, added
some hardware schematics showing the workings of
the interface hat for the CW key (and the key
line to the RS-HFIQ).
2020-10-27 14:57:47 -05:00

504 lines
18 KiB
Python

########################################################################
# Pi-HFIQ hardware control module for Quisk.
#
# Original 'hardware_usbserial.py' for RS-HFIQ hardware by _____.
#
# Adapted into the "Pi-HFIQ" configuration by Rob French, KC4UPR.
#
# This file provides for USB control of the HobbyPCB RS-HFIQ SDR
# transceiver, in conjunction with a Raspberry Pi for control. The
# Raspberry Pi provides ADC/DAC of the RX and TX I/Q signals, as well as
# input methods for audio, digital, and CW modes.
#
# TODO as of 2020-04-21:
# (1) Implement all/most of the keyer parameters as user-controllable
# items in the configuration file, widgets, and/or config menu.
# (2) Make power level settings automatically get stored/restored per
# band, mode, and possibly even frequency ("nearest 10 KHz" or
# something like that... memory is cheap). This is due to the
# fact that I get the RS-HFIQ "CLIP" light coming on with different
# output volumes depending on mode, CW vs digital, etc.
# (3) Clean up more comments.
# (4) Add an automatic device finder... so you don't have to edit
# /dev/ttyUSB0, ...USB1, etc.
#
########################################################################
from __future__ import print_function
import math
import psutil
import serial
import serial.tools.list_ports
import struct
import sys
import threading
import time
import traceback
from quisk_hardware_model import Hardware as BaseHardware
import _quisk as QS
DEBUG = 1
KEYER_MODE_LIST = (('Keyer', None), ('Bug', None), ('Iambic A', None), ('Iambic B', None))
DEFAULT_KEYER_MODE = 2
DEFAULT_KEYER_WPM = 15
########################################################################
def dprint(*args, **kwargs):
print(*args, file=sys.stdout, **kwargs)
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
########################################################################
# RSHFIQ class
#
# An object that encapsulates all of the functionality associated with
# the RS-HFIQ serial interface. It provides methods and/or properties
# for each of the commands in the RS-HFIQ interface command list, and
# performs all of the necessary serial port operations.
########################################################################
class Hardware(BaseHardware):
def __init__(self, app, conf, debug=DEBUG):
BaseHardware.__init__(self, app, conf)
# serial port for the transceiver (rig) -- RS-HFIQ
self.rig = RSHFIQ(device="/dev/ttyUSB0", debug=DEBUG)
self.debug = debug
self.vfo = None
self.ptt_button = 0
self.is_cw = False
self.key_thread = None
self.mon_thread = None
self.keyer_mode_list = [x[0] for x in KEYER_MODE_LIST]
self.keyer_mode = DEFAULT_KEYER_MODE
self.keyer_wpm = DEFAULT_KEYER_WPM
self.SetKeyerMode(self.keyer_mode)
self.SetKeyerWPM(self.keyer_wpm)
def open(self): # Called once to open the Hardware
if self.rig.open():
version = self.rig.version
if version[0:7] == "RS-HFIQ":
if self.conf.name_of_mic_play and self.conf.key_poll_msec:
self.key_thread = KeyThread(self.rig, self.conf.key_poll_msec / 1000.0, self.conf.key_hang_time)
self.key_thread.start()
self.mon_thread = StatusThread(self.rig)
self.mon_thread.start()
return version
else:
eprint("[Pi-HFIQ] could not find the RS-HFIQ device -- terminating")
exit()
else:
return "[Pi-HFIQ] unable to open device"
def close(self): # Called once to close the Hardware
if self.key_thread:
self.key_thread.stop()
self.key_thread = None
if self.mon_thread:
self.mon_thread.stop()
self.mon_thread = None
self.rig.serial.close()
return "[Pi-HFIQ] closed"
def HeartBeat(self):
if self.application.bottom_widgets:
self.application.bottom_widgets.UpdateText(
"Rig: %.0fC, CPU: %.0fC, Proc: %.0f%%, Mem: %.0f%%" % (
self.mon_thread.rig_temp,
self.mon_thread.cpu_temp,
self.mon_thread.cpu_load,
self.mon_thread.mem_load))
pass
def ReturnFrequency(self):
return None, self.rig.frequency
def ChangeFrequency(self, tune, vfo, source='', band='', event=None):
if self.vfo <> vfo :
self.vfo = vfo
self.rig.frequency = self.vfo
# probably need to check that this actually worked!
return tune, self.vfo
def OnButtonPTT(self, event=None):
if event:
if event.GetEventObject().GetValue():
if self.debug: dprint("[Pi-HFIQ] PTT pressed")
self.ptt_button = 1
else:
if self.debug: dprint("[Pi-HFIQ] PTT released")
self.ptt_button = 0
if self.key_thread and self.is_cw:
# keyer thread exists--it handles PTT
# Also temporarily using this only for CW, because digimodes
# aren't working with the keyer thread...
if self.debug: dprint("[Pi-HFIQ] PTT handled by keyer thread")
self.key_thread.OnPTT(self.ptt_button)
else:
# no keyer thread--PTT handled here
if self.debug: dprint("[Pi-HFIQ] PTT state not handled by keyer thread")
self.rig.transmit = (self.ptt_button == 1)
QS.set_key_down(event.GetEventObject().GetValue())
# if self.is_cw:
# QS.set_key_down(0)
# QS.set_transmit_mode(self.ptt_button)
# else:
# QS.set_key_down(self.ptt_button)
def ChangeMode(self, mode): # Change the tx/rx mode
# mode is a string: "USB", "AM", etc.
if mode in ('CWU', 'CWL'):
self.is_cw = True
QS.set_gpio_keyer_enabled(1)
else:
self.is_cw = False
QS.set_gpio_keyer_enabled(0)
if self.key_thread:
self.key_thread.IsCW(self.is_cw)
elif hasattr(self, 'OnButtonPTT'):
# what is this for?
self.OnButtonPTT()
def OnSpot(self, level):
if self.key_thread:
self.key_thread.OnSpot(level)
def SetKeyerMode(self, value):
self.keyer_mode = value
if (self.keyer_mode == 0):
QS.set_gpio_keyer_enabled(0)
else:
QS.set_gpio_keyer_enabled(1)
QS.set_gpio_keyer_mode(self.keyer_mode - 1)
def SetKeyerWPM(self, value):
self.keyer_wpm = value
QS.set_gpio_keyer_speed(self.keyer_wpm)
class KeyThread(threading.Thread):
"""Create a thread to monitor the key state."""
def __init__(self, rig, poll_secs, key_hang_time):
self.rig = rig
self.poll_secs = poll_secs
self.key_hang_time = key_hang_time
self.ptt_button = 0
self.spot_level = -1 # level is -1 for Spot button Off; else the Spot level 0 to 1000.
self.currently_in_tx = 0
self.is_cw = False
self.key_timer = 0
self.key_transmit = 0
threading.Thread.__init__(self)
self.doQuit = threading.Event()
self.doQuit.clear()
def run(self):
while not self.doQuit.isSet():
key_down = QS.is_key_down() # get the internal Quisk key state
# in the future, we can have the CW section handled by the H/W key
# line to the RS-HFIQ
if self.is_cw:
pass # this needs to be configurable; however, commenting
# everything below out to test key down via discrete
# if self.spot_level >= 0 or key_down: # key is down
# self.key_transmit = 1
# self.key_timer = time.time() # QSK/semi-QSK, as applicable
# else: # key is up
# #QS.set_key_down(0)
# if self.key_transmit and time.time() - self.key_timer > self.key_hang_time:
# self.key_transmit = 0
# if self.key_transmit != self.currently_in_tx:
# QS.set_transmit_mode(self.key_transmit)
# self.rig.transmit = (self.key_transmit == 1)
# self.currently_in_tx = self.key_transmit # success
# if DEBUG: print ("Change CW currently_in_tx", self.currently_in_tx)
# modes other than CW -- only allow the PTT button (and CAT?)
elif False: # temporarily disabled this chunk...
if self.ptt_button:
self.key_transmit = 1
if self.debug: dprint("[Pi-HFIQ Keyer Thread] non-CW transmit ON")
else:
self.key_transmit = 0
if self.debug: dprint("[Pi-HFIQ Keyer Thread] non-CW transmit OFF")
if self.key_transmit != self.currently_in_tx:
#QS.set_transmit_mode(self.key_transmit)
QS.set_key_down(self.key_transmit)
self.rig.transmit = (self.key_transmit == 1)
self.currently_in_tx = self.key_transmit # success
if DEBUG: print ("Change CW currently_in_tx", self.currently_in_tx)
time.sleep(self.poll_secs)
def stop(self):
"""Set a flag to indicate that the thread should end."""
self.doQuit.set()
def OnPTT(self, ptt):
self.ptt_button = ptt
def OnSpot(self, level):
self.spot_level = level
def IsCW(self, is_cw):
self.is_cw = is_cw
class StatusThread(threading.Thread):
"""Create a thread to monitor various parameters."""
def __init__(self, rig, poll_secs=0.5):
self.rig = rig
self.poll_secs = poll_secs
self.__rig_temp = 0
self.__cpu_temp = 0
self.__cpu_load = 0
self.__mem_load = 0
threading.Thread.__init__(self)
self.doQuit = threading.Event()
self.doQuit.clear()
def run(self):
while not self.doQuit.isSet():
self.__rig_temp = self.rig.temperature
self.__cpu_temp = psutil.sensors_temperatures()['cpu-thermal'][0].current
self.__cpu_load = psutil.cpu_percent()
self.__mem_load = psutil.virtual_memory().percent
time.sleep(self.poll_secs)
def stop(self):
"""Set a flag to indicate that the thread should end."""
self.doQuit.set()
@property
def rig_temp(self):
return self.__rig_temp
@property
def cpu_temp(self):
return self.__cpu_temp
@property
def cpu_load(self):
return self.__cpu_load
@property
def mem_load(self):
return self.__mem_load
########################################################################
# RSHFIQ class
#
# An object that encapsulates all of the functionality associated with
# the RS-HFIQ serial interface. It provides methods and/or properties
# for each of the commands in the RS-HFIQ interface command list, and
# performs all of the necessary serial port operations.
########################################################################
RSHFIQ_OUTPUT_LEVEL = ("off", "2 ma", "4 ma", "6 ma", "8 ma")
class RSHFIQ(object):
def __init__(self, device="/dev/ttyUSB0", output_level=2, debug=0):
"""Create a new RS-HFIQ object, associated with the specified
serial port. Optionally set output level of the VFO, as well as
debug level.
"""
self.serial = serial.Serial()
self.serial.port = device
self.serial.baudrate = 57600
self.serial.bytesize = serial.EIGHTBITS # number of bits per byte
self.serial.parity = serial.PARITY_NONE # set parity check: no parity
self.serial.stopbits = serial.STOPBITS_ONE # number of stop bits
self.serial.timeout = 1 # non-block read
self.serial.rtscts = False
self.debug = debug
self.__version = None
self.__output = output_level
self.__transmit = False
self.__lock = threading.Lock()
if self.debug: dprint("[RS-HFIQ] created new object: ", self.serial.port)
#===================================================================
def open(self):
"""Open the serial port associated with the RS-HFIQ, clear the
input, retrieve the version info, and configure for use.
"""
try:
self.serial.open()
except Exception, e:
print(e)
raise Exception
if self.serial.isOpen():
if self.debug: dprint("[RS-HFIQ] opened device: ", self.serial.port)
self.serial.flushInput() # flush input buffer, discarding all its contents
self.serial.flushOutput() # flush output buffer, aborting current output
# and discard all that is in buffer
time.sleep(1) # wait a moment for init to finish
# BUG: For some reason, I have to set a parameter to the
# RS-HFIQ before I can successfully read version info???
# But I don't have to do this if I just login to the RS-HFIQ
# via a serial terminal...
self.output_level = self.__output
self.transmit = False
if self.debug: dprint("[RS-HFIQ] getting version info")
# TODO: Make this it's own method... retrieving the version.
self.__lock.acquire()
self.serial.write("*W\r")
self.__version = self.serial.readline().strip()
self.__lock.release()
if self.debug: dprint("[RS-HFIQ] firmware version: ", self.__version)
return True
else:
return False
#===================================================================
def close(self):
"""Close the serial port associated with the RS-HFIQ.
"""
if self.serial.isOpen():
self.serial.close()
return True
else:
return False
#-------------------------------------------------------------------
# version - read-only property, returns the version of the RS-HFIQ.
#-------------------------------------------------------------------
@property
def version(self):
if self.debug: dprint("[RS-HFIQ] firmware version: ", self.__version)
return self.__version
#-------------------------------------------------------------------
# frequency - read-write property, sets the tuned frequency (VFO) of
# the RS-HFIQ as specified.
#-------------------------------------------------------------------
@property
def frequency(self):
if self.serial.isOpen():
self.__lock.acquire()
self.serial.write("*F?\r")
freq = self.serial.readline().strip()
self.__lock.release()
if self.debug: dprint("[RS-HFIQ] current tuned frequency: ", freq)
return freq
else:
return None
@frequency.setter
def frequency(self, freq):
if self.serial.isOpen():
self.__lock.acquire()
self.serial.write("*F" + str(freq) + "\r")
self.__lock.release()
if self.debug: dprint("[RS-HFIQ] setting tuned frequency: ", str(freq))
else:
pass
#-------------------------------------------------------------------
# output_level - read-write property, sets output level of the
# RS-HFIQ tuner frequency per the interface definition:
# 0 = off
# 1 = 2 ma drive
# 2 = 4 ma drive
# 3 = 6 ma drive
# 4 = 8 ma drive
#-------------------------------------------------------------------
@property
def output_level(self):
# should probably do some bounds checking on the debug statement
if self.debug: dprint("[RS-HFIQ] current output level: ", RSHFIQ_OUTPUT_LEVEL[self.__output])
return self.__output
@output_level.setter
def output_level(self, output_level):
if self.serial.isOpen():
self.__output = output_level
self.__lock.acquire()
self.serial.write("*OF" + str(self.__output) + "\r")
self.__lock.release()
if self.debug: dprint("[RS-HFIQ] set output level: ", RSHFIQ_OUTPUT_LEVEL[self.__output])
else:
pass
#-------------------------------------------------------------------
# transmit - read-write property, True/False sets the transmit state
# of the RS-HFIQ to ON/OFF.
#-------------------------------------------------------------------
@property
def transmit(self):
if self.debug: dprint("[RS-HFIQ] current transmit state ", "ON" if self.__transmit else "OFF")
return self.__transmit
@transmit.setter
def transmit(self, state):
if self.serial.isOpen():
if state:
self.__lock.acquire()
self.serial.write("*X1\r")
self.__lock.release()
self.__transmit = True
if self.debug: dprint("[RS-HFIQ] set transmit ON")
else:
self.__lock.acquire()
self.serial.write("*X0\r")
self.__lock.release()
self.__transmit = False
if self.debug: dprint("[RS-HFIQ] set transmit OFF")
else:
pass
#-------------------------------------------------------------------
# temperature - read-only property, read from the RS-HFIQ
#-------------------------------------------------------------------
@property
def temperature(self):
if self.serial.isOpen():
self.__lock.acquire()
self.serial.write("*T\r")
temp = self.serial.readline().strip()[:-1]
self.__lock.release()
if self.debug: dprint("[RS-HFIQ] current temperature: ", temp)
return float(temp)
else:
return None
########################################################################
# EOF
########################################################################