quisk-kc4upr/softrock/hardware_usb.py

325 lines
12 KiB
Python
Raw Normal View History

# Please do not change this hardware control module for Quisk.
# It provides USB control of SoftRock hardware.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
import sys, struct, threading, time, traceback, math
from quisk_hardware_model import Hardware as BaseHardware
import _quisk as QS
# All USB access is through control transfers using pyusb.
# byte_array = dev.ctrl_transfer (IN, bmRequest, wValue, wIndex, length, timout)
# len(string_msg) = dev.ctrl_transfer (OUT, bmRequest, wValue, wIndex, string_msg, timout)
try:
import usb.core, usb.util
except:
if sys.platform == 'win32':
dlg = wx.MessageDialog(None, "The Python pyusb module is required but not installed. Do you want me to install it?",
"Install Python pyusb", style = wx.YES|wx.NO)
if dlg.ShowModal() == wx.ID_YES:
subprocess.call([sys.executable, "-m", "pip", "install", "pyusb"])
try:
import usb.core, usb.util
except:
dlg = wx.MessageDialog(None, "Installation of Python pyusb failed. Please install it by hand.",
"Installation failed", style=wx.OK)
dlg.ShowModal()
else:
dlg = wx.MessageDialog(None, "The Python pyusb module is required but not installed. Please install package python-usb.",
"Install Python pyusb", style = wx.OK)
dlg.ShowModal()
DEBUG = 0
# Thanks to Ethan Blanton, KB8OJH, for this patch for the Si570 (many SoftRocks):
# These are used by SetFreqByDirect(); see below.
# The Si570 DCO must be clamped between these values
SI570_MIN_DCO = 4.85e9
SI570_MAX_DCO = 5.67e9
# The Si570 has 6 valid HSDIV values. Subtract 4 from HSDIV before
# stuffing it. We want to find the highest HSDIV first, so start
# from 11.
SI570_HSDIV_VALUES = [11, 9, 7, 6, 5, 4]
IN = usb.util.build_request_type(usb.util.CTRL_IN, usb.util.CTRL_TYPE_VENDOR, usb.util.CTRL_RECIPIENT_DEVICE)
OUT = usb.util.build_request_type(usb.util.CTRL_OUT, usb.util.CTRL_TYPE_VENDOR, usb.util.CTRL_RECIPIENT_DEVICE)
UBYTE2 = struct.Struct('<H')
UBYTE4 = struct.Struct('<L') # Thanks to Sivan Toledo
class Hardware(BaseHardware):
def __init__(self, app, conf):
BaseHardware.__init__(self, app, conf)
self.usb_dev = None
self.vfo = None
self.repeater_freq = None # original repeater output frequency
try:
self.repeater_delay = conf.repeater_delay # delay for changing repeater frequency in seconds
except:
self.repeater_delay = 0.25
self.repeater_time0 = 0 # time of repeater change in frequency
self.ptt_button = 0
self.is_cw = False
self.key_thread = None
self.si570_i2c_address = conf.si570_i2c_address
def open(self): # Called once to open the Hardware
# find our device
usb_dev = usb.core.find(idVendor=self.conf.usb_vendor_id, idProduct=self.conf.usb_product_id)
if usb_dev is None:
text = 'USB device not found VendorID 0x%X ProductID 0x%X' % (
self.conf.usb_vendor_id, self.conf.usb_product_id)
else:
try: # This exception occurs for the Peabody SDR. Thanks to ON7VQ for figuring out the problem,
usb_dev.set_configuration() # and to David, AE9RB, for the fix.
except:
if DEBUG: traceback.print_exc()
try:
ret = usb_dev.ctrl_transfer(IN, 0x00, 0x0E00, 0, 2)
except:
if DEBUG: traceback.print_exc()
text = "No permission to access the SoftRock USB interface"
else:
self.usb_dev = usb_dev # success
if len(ret) == 2:
ver = "%d.%d" % (ret[1], ret[0])
else:
ver = 'unknown'
sound = self.conf.name_of_sound_capt
if len(sound) > 50:
sound = sound[0:30] + '|||' + sound[-17:]
text = 'Capture from SoftRock USB on %s, Firmware %s' % (sound, ver)
if self.conf.name_of_mic_play and self.conf.key_poll_msec:
self.key_thread = KeyThread(usb_dev, self.conf.key_poll_msec / 1000.0, self.conf.key_hang_time)
self.key_thread.start()
#self.application.bottom_widgets.info_text.SetLabel(text)
if DEBUG and usb_dev:
print ('Startup freq', self.GetStartupFreq())
print ('Run freq', self.GetFreq())
print ('Address 0x%X' % usb_dev.ctrl_transfer(IN, 0x41, 0, 0, 1)[0])
sm = usb_dev.ctrl_transfer(IN, 0x3B, 0, 0, 2)
sm = UBYTE2.unpack(sm)[0]
print ('Smooth tune', sm)
return text
def close(self): # Called once to close the Hardware
if self.key_thread:
self.key_thread.stop()
self.key_thread = None
def ChangeFrequency(self, tune, vfo, source='', band='', event=None):
if self.usb_dev and self.vfo != vfo:
if self.conf.si570_direct_control:
if self.SetFreqByDirect(vfo - self.transverter_offset):
self.vfo = vfo
elif self.SetFreqByValue(vfo - self.transverter_offset):
self.vfo = vfo
if DEBUG:
print ('Change to', vfo)
print ('Run freq', self.GetFreq())
return tune, vfo
def ReturnFrequency(self):
# Return the current tuning and VFO frequency. If neither have changed,
# you can return (None, None). This is called at about 10 Hz by the main.
# return (tune, vfo) # return changed frequencies
return None, None # frequencies have not changed
def RepeaterOffset(self, offset=None): # Change frequency for repeater offset during Tx
if offset is None: # Return True if frequency change is complete
if time.time() > self.repeater_time0 + self.repeater_delay:
return True
elif offset == 0: # Change back to the original frequency
if self.repeater_freq is not None:
self.repeater_time0 = time.time()
self.ChangeFrequency(self.repeater_freq, self.repeater_freq, 'repeater')
self.repeater_freq = None
else: # Shift to repeater input frequency
self.repeater_time0 = time.time()
self.repeater_freq = self.vfo
vfo = self.vfo + int(offset * 1000) # Convert kHz to Hz
self.ChangeFrequency(vfo, vfo, 'repeater')
return False
def ChangeMode(self, mode): # Change the tx/rx mode
# mode is a string: "USB", "AM", etc.
if mode in ('CWU', 'CWL'):
self.is_cw = True
else:
self.is_cw = False
if self.key_thread:
self.key_thread.IsCW(self.is_cw)
elif hasattr(self, 'OnButtonPTT'):
self.OnButtonPTT()
def ChangeBand(self, band):
# band is a string: "60", "40", "WWV", etc.
BaseHardware.ChangeBand(self, band)
def OnSpot(self, level):
if self.key_thread:
self.key_thread.OnSpot(level)
def HeartBeat(self): # Called at about 10 Hz by the main
pass
def OnButtonPTT(self, event=None):
if event:
if event.GetEventObject().GetValue():
self.ptt_button = 1
else:
self.ptt_button = 0
if self.key_thread:
self.key_thread.OnPTT(self.ptt_button)
elif self.usb_dev:
if self.is_cw:
QS.set_key_down(0)
QS.set_transmit_mode(self.ptt_button)
else:
QS.set_key_down(self.ptt_button)
try:
self.usb_dev.ctrl_transfer(IN, 0x50, self.ptt_button, 0, 3)
except usb.core.USBError:
if DEBUG: traceback.print_exc()
try:
self.usb_dev.ctrl_transfer(IN, 0x50, self.ptt_button, 0, 3)
except usb.core.USBError:
if DEBUG: traceback.print_exc()
def GetStartupFreq(self): # return the startup frequency / 4
if not self.usb_dev:
return 0
ret = self.usb_dev.ctrl_transfer(IN, 0x3C, 0, 0, 4)
s = ret.tostring()
freq = UBYTE4.unpack(s)[0]
freq = int(freq * 1.0e6 / 2097152.0 / 4.0 + 0.5)
return freq
def GetFreq(self): # return the running frequency / 4
if not self.usb_dev:
return 0
ret = self.usb_dev.ctrl_transfer(IN, 0x3A, 0, 0, 4)
s = ret.tostring()
freq = UBYTE4.unpack(s)[0]
freq = int(freq * 1.0e6 / 2097152.0 / 4.0 + 0.5)
return freq
def SetFreqByValue(self, freq):
freq = int(freq/1.0e6 * 2097152.0 * 4.0 + 0.5)
if freq <= 0:
return
s = UBYTE4.pack(freq)
try:
self.usb_dev.ctrl_transfer(OUT, 0x32, self.si570_i2c_address + 0x700, 0, s)
except usb.core.USBError:
if DEBUG: traceback.print_exc()
else:
return True
def SetFreqByDirect(self, freq): # Thanks to Ethan Blanton, KB8OJH
if freq == 0.0:
return False
# For now, find the minimum DCO speed that will give us the
# desired frequency; if we're slewing in the future, we want this
# to additionally yield an RFREQ ~= 512.
freq = int(freq * 4)
dco_new = None
hsdiv_new = 0
n1_new = 0
for hsdiv in SI570_HSDIV_VALUES:
n1 = int(math.ceil(SI570_MIN_DCO / (freq * hsdiv)))
if n1 < 1:
n1 = 1
else:
n1 = ((n1 + 1) // 2) * 2
dco = (freq * 1.0) * hsdiv * n1
# Since we're starting with max hsdiv, this can only happen if
# freq was larger than we can handle
if n1 > 128:
continue
if dco < SI570_MIN_DCO or dco > SI570_MAX_DCO:
# This really shouldn't happen
continue
if not dco_new or dco < dco_new:
dco_new = dco
hsdiv_new = hsdiv
n1_new = n1
if not dco_new:
# For some reason, we were unable to calculate a frequency.
# Probably because the frequency requested is outside the range
# of our device.
return False # Failure
rfreq = dco_new / self.conf.si570_xtal_freq
rfreq_int = int(rfreq)
rfreq_frac = int(round((rfreq - rfreq_int) * 2**28))
# It looks like the DG8SAQ protocol just passes r7-r12 straight
# To the Si570 when given command 0x30. Easy enough.
# n1 is stuffed as n1 - 1, hsdiv is stuffed as hsdiv - 4.
hsdiv_new = hsdiv_new - 4
n1_new = n1_new - 1
s = struct.Struct('>BBL').pack((hsdiv_new << 5) + (n1_new >> 2),
((n1_new & 0x3) << 6) + (rfreq_int >> 4),
((rfreq_int & 0xf) << 28) + rfreq_frac)
self.usb_dev.ctrl_transfer(OUT, 0x30, self.si570_i2c_address + 0x700, 0, s)
return True # Success
class KeyThread(threading.Thread):
"""Create a thread to monitor the key state."""
def __init__(self, dev, poll_secs, key_hang_time):
self.usb_dev = dev
self.poll_secs = poll_secs
self.key_hang_time = key_hang_time
self.ptt_button = 0
self.spot_level = -1 # level is -1 for Spot button Off; else the Spot level 0 to 1000.
self.currently_in_tx = 0
self.is_cw = False
self.key_timer = 0
self.key_transmit = 0
threading.Thread.__init__(self)
self.doQuit = threading.Event()
self.doQuit.clear()
def run(self):
while not self.doQuit.isSet():
try: # Test key up/down state
ret = self.usb_dev.ctrl_transfer(IN, 0x51, 0, 0, 1)
except usb.core.USBError:
key_down = None
if DEBUG: traceback.print_exc()
else:
# bit 0x20 is the tip, bit 0x02 is the ring (ring not used)
if ret[0] & 0x20 == 0: # Tip: key is down
key_down = True
else: # key is up
key_down = False
if self.is_cw:
if self.spot_level >= 0 or key_down: # key is down
QS.set_key_down(1)
self.key_transmit = 1
self.key_timer = time.time()
else: # key is up
QS.set_key_down(0)
if self.key_transmit and time.time() - self.key_timer > self.key_hang_time:
self.key_transmit = 0
if self.key_transmit != self.currently_in_tx:
try:
self.usb_dev.ctrl_transfer(IN, 0x50, self.key_transmit, 0, 3)
except usb.core.USBError:
if DEBUG: traceback.print_exc()
else:
self.currently_in_tx = self.key_transmit # success
QS.set_transmit_mode(self.key_transmit)
if DEBUG: print ("Change CW currently_in_tx", self.currently_in_tx)
else:
if key_down or self.ptt_button:
self.key_transmit = 1
else:
self.key_transmit = 0
if self.key_transmit != self.currently_in_tx:
QS.set_key_down(self.key_transmit)
try:
self.usb_dev.ctrl_transfer(IN, 0x50, self.key_transmit, 0, 3)
except usb.core.USBError:
if DEBUG: traceback.print_exc()
else:
self.currently_in_tx = self.key_transmit # success
if DEBUG: print ("Change currently_in_tx", self.currently_in_tx)
time.sleep(self.poll_secs)
def stop(self):
"""Set a flag to indicate that the thread should end."""
self.doQuit.set()
def OnPTT(self, ptt):
self.ptt_button = ptt
def OnSpot(self, level):
self.spot_level = level
def IsCW(self, is_cw):
self.is_cw = is_cw