427 lines
17 KiB
Python
Executable File
427 lines
17 KiB
Python
Executable File
# This is a sample hardware file for UDP control. Use this file for my 2010 transceiver
|
|
# described in QEX and for the improved version HiQSDR. To turn on the extended
|
|
# features in HiQSDR, update your FPGA firmware to version 1.1 or later and use use_rx_udp = 2.
|
|
|
|
from __future__ import print_function
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
|
|
import struct, socket, math, traceback
|
|
import _quisk as QS
|
|
|
|
from quisk_hardware_model import Hardware as BaseHardware
|
|
|
|
DEBUG = 0
|
|
|
|
class Hardware(BaseHardware):
|
|
def __init__(self, app, conf):
|
|
BaseHardware.__init__(self, app, conf)
|
|
self.got_udp_status = '' # status from UDP receiver
|
|
# want_udp_status is a 14-byte string with numbers in little-endian order:
|
|
# [0:2] 'St'
|
|
# [2:6] Rx tune phase
|
|
# [6:10] Tx tune phase
|
|
# [10] Tx output level 0 to 255
|
|
# [11] Tx control bits:
|
|
# 0x01 Enable CW transmit
|
|
# 0x02 Enable all other transmit
|
|
# 0x04 Use the HiQSDR extended IO pins not present in the 2010 QEX ver 1.0
|
|
# 0x08 The key is down (software key)
|
|
# bits 5 and 4: Transmit sample rate
|
|
# 0b00 48k
|
|
# 0b01 192k
|
|
# 0b10 480k
|
|
# 0b11 8k
|
|
# 0x40 odyssey: Spot button is in use
|
|
# 0x80 odyssey: Mic Boost 20dB
|
|
# [12] Rx control bits
|
|
# bits 5 through 0
|
|
# Second stage decimation less one, 1-39, six bits
|
|
# bits 7, 6
|
|
# 0b00 Prescaler 8, 3-byte samples I and Q; 1440 / 6 = 240 samples per UDP packet
|
|
# 0b01 Prescaler 2, 2-byte samples
|
|
# 0b10 Prescaler 40, 3-byte samples
|
|
# 0b11 Prescaler 2, 1-byte samples
|
|
# [13] zero or firmware version number
|
|
# The above is used for firmware version 1.0.
|
|
# Version 1.1 adds eight more bytes for the HiQSDR conntrol ports:
|
|
# [14] X1 connector: Preselect pins 69, 68, 65, 64; Preamp pin 63, Tx LED pin 57
|
|
# [15] Attenuator pins 84, 83, 82, 81, 80
|
|
# [16] More bits: AntSwitch pin 41 is 0x01
|
|
# [17:22] The remaining five bytes are sent as zero.
|
|
# Version 1.2 uses the same format as 1.1, but adds the "Qs" command (see below).
|
|
# Version 1.3 adds features needed by the new quisk_vna.py program:
|
|
# [17] The sidetone volume 0 to 255
|
|
# [18:20] This is vna_count, the number of VNA data points; or zero for normal operation
|
|
# [20] The CW delay as specified in the config file
|
|
# [21] Control bits:
|
|
# 0x01 Switch on tx mirror on rx for adaptive predistortion
|
|
# [22:24] Noise blanker level
|
|
|
|
# The "Qs" command is a two-byte UDP packet sent to the control port. It returns the hardware status
|
|
# as the above string, except that the string starts with "Qs" instead of "St". Do not send the "Qs" command
|
|
# from Quisk, as it interferes with the "St" command. The "Qs" command is meant to be used from an
|
|
# external program, such as HamLib or a logging program.
|
|
|
|
# When vna_count != 0, we are in VNA mode. The start frequency is rx_phase, and for each point tx_phase is added
|
|
# to advance the frequency. A zero sample is added to mark the blocks. The samples are I and Q averaged at DC.
|
|
|
|
self.rx_phase = 0
|
|
self.tx_phase = 0
|
|
self.tx_level = 0
|
|
self.tx_control = 0
|
|
self.rx_control = 0
|
|
QS.set_sample_bytes(3)
|
|
self.vna_count = 0 # VNA scan count; MUST be zero for non-VNA operation
|
|
self.cw_delay = conf.cw_delay
|
|
self.index = 0
|
|
self.mode = None
|
|
self.usingSpot = False
|
|
self.band = None
|
|
self.rf_gain = 0
|
|
self.sidetone_volume = 0 # sidetone volume 0 to 255
|
|
self.repeater_freq = None # original repeater output frequency
|
|
self.HiQSDR_Connector_X1 = 0
|
|
self.HiQSDR_Attenuator = 0
|
|
self.HiQSDR_Bits = 0
|
|
try:
|
|
if conf.radio_sound_mic_boost:
|
|
self.tx_control = 0x80
|
|
except:
|
|
pass
|
|
if conf.use_rx_udp == 2: # Set to 2 for the HiQSDR
|
|
self.rf_gain_labels = ('RF 0 dB', 'RF +10', 'RF -10', 'RF -20', 'RF -30')
|
|
self.antenna_labels = ('Ant 1', 'Ant 2')
|
|
self.firmware_version = None # firmware version is initially unknown
|
|
self.rx_udp_socket = None
|
|
self.vfo_frequency = 0 # current vfo frequency
|
|
self.tx_frequency = 0
|
|
self.decimations = [] # supported decimation rates
|
|
for dec in (40, 20, 10, 8, 5, 4, 2):
|
|
self.decimations.append(dec * 64)
|
|
self.decimations.append(80)
|
|
self.decimations.append(64)
|
|
if self.conf.fft_size_multiplier == 0:
|
|
self.conf.fft_size_multiplier = 6 # Set size needed by VarDecim
|
|
def open(self):
|
|
# Create the proper broadcast address for rx_udp_ip.
|
|
nm = self.conf.rx_udp_ip_netmask.split('.')
|
|
ip = self.conf.rx_udp_ip.split('.')
|
|
nm = list(map(int, nm))
|
|
ip = list(map(int, ip))
|
|
bc = ''
|
|
for i in range(4):
|
|
x = (ip[i] | ~ nm[i]) & 0xFF
|
|
bc = bc + str(x) + '.'
|
|
self.broadcast_addr = bc[:-1]
|
|
# This socket is used for the Simple Network Discovery Protocol by AE4JY
|
|
self.socket_sndp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self.socket_sndp.setblocking(0)
|
|
self.socket_sndp.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
|
self.sndp_request = bytearray(56)
|
|
self.sndp_request[0] = 56
|
|
self.sndp_request[1] = 0
|
|
self.sndp_request[2] = 0x5A
|
|
self.sndp_request[3] = 0xA5
|
|
self.sndp_active = self.conf.sndp_active
|
|
# conf.rx_udp_port is used for returning ADC samples
|
|
# conf.rx_udp_port + 1 is used for control
|
|
self.rx_udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self.rx_udp_socket.setblocking(0)
|
|
self.rx_udp_socket.connect((self.conf.rx_udp_ip, self.conf.rx_udp_port + 1))
|
|
return QS.open_rx_udp(self.conf.rx_udp_ip, self.conf.rx_udp_port)
|
|
def close(self):
|
|
if self.rx_udp_socket:
|
|
self.rx_udp_socket.close()
|
|
self.rx_udp_socket = None
|
|
def ReturnFrequency(self): # Return the current tuning and VFO frequency
|
|
return None, None # frequencies have not changed
|
|
def ReturnVfoFloat(self, freq=None): # Return the accurate VFO as a float
|
|
if freq is None:
|
|
rx_phase = self.rx_phase
|
|
else:
|
|
rx_phase = int(float(freq) / self.conf.rx_udp_clock * 2.0**32 + 0.5) & 0xFFFFFFFF
|
|
return float(rx_phase) * self.conf.rx_udp_clock / 2.0**32
|
|
def ChangeFrequency(self, tx_freq, vfo_freq, source='', band='', event=None):
|
|
if vfo_freq != self.vfo_frequency:
|
|
self.vfo_frequency = vfo_freq
|
|
self.rx_phase = int(float(vfo_freq - self.transverter_offset) / self.conf.rx_udp_clock * 2.0**32 + 0.5) & 0xFFFFFFFF
|
|
if tx_freq and tx_freq > 0:
|
|
self.tx_frequency = tx_freq
|
|
self.tx_phase = int(float(tx_freq - self.transverter_offset) / self.conf.rx_udp_clock * 2.0**32 + 0.5) & 0xFFFFFFFF
|
|
self.NewUdpStatus()
|
|
return tx_freq, vfo_freq
|
|
def RepeaterOffset(self, offset=None): # Change frequency for repeater offset during Tx
|
|
if offset is None: # Return True if frequency change is complete
|
|
self.HeartBeat()
|
|
return self.want_udp_status == self.got_udp_status
|
|
if offset == 0: # Change back to the original frequency
|
|
if self.repeater_freq is None: # Frequency was already reset
|
|
return self.want_udp_status == self.got_udp_status
|
|
self.tx_frequency = self.repeater_freq
|
|
self.repeater_freq = None
|
|
else: # Shift to repeater input frequency
|
|
self.repeater_freq = self.tx_frequency
|
|
offset = int(offset * 1000) # Convert kHz to Hz
|
|
self.tx_frequency += offset
|
|
self.tx_phase = int(float(self.tx_frequency - self.transverter_offset) / self.conf.rx_udp_clock * 2.0**32 + 0.5) & 0xFFFFFFFF
|
|
self.NewUdpStatus(True)
|
|
return False
|
|
def ChangeMode(self, mode):
|
|
# mode is a string: "USB", "AM", etc.
|
|
self.mode = mode
|
|
self.tx_control &= ~0x03 # Erase last two bits
|
|
if self.vna_count:
|
|
pass
|
|
elif self.usingSpot:
|
|
self.tx_control |= 0x02
|
|
elif mode in ("CWL", "CWU"):
|
|
self.tx_control |= 0x01
|
|
else:
|
|
self.tx_control |= 0x02
|
|
self.SetTxLevel()
|
|
def ChangeBand(self, band):
|
|
# band is a string: "60", "40", "WWV", etc.
|
|
BaseHardware.ChangeBand(self, band)
|
|
self.band = band
|
|
self.HiQSDR_Connector_X1 &= ~0x0F # Mask in the last four bits
|
|
self.HiQSDR_Connector_X1 |= self.conf.HiQSDR_BandDict.get(band, 0) & 0x0F
|
|
self.SetTxLevel()
|
|
def SetTxLevel(self):
|
|
# As tx_level varies from 50 to 200, the output level changes from 263 to 752 mV
|
|
# So 0 to 255 is 100 to 931, or 1.0 to 9.31; v = 1.0 + 0.0326 * level
|
|
if not self.vna_count:
|
|
try:
|
|
self.tx_level = self.conf.tx_level[self.band]
|
|
except KeyError:
|
|
self.tx_level = self.conf.tx_level.get(None, 127) # The default
|
|
if self.mode[0:3] in ('DGT', 'FDV'): # Digital modes; change power by a percentage
|
|
reduc = self.application.digital_tx_level
|
|
else:
|
|
reduc = self.application.tx_level
|
|
level = 1.0 + self.tx_level * 0.0326
|
|
level *= math.sqrt(reduc / 100.0) # Convert from a power to an amplitude
|
|
self.tx_level = int((level - 1.0) / 0.0326 + 0.5)
|
|
if self.tx_level < 0:
|
|
self.tx_level = 0
|
|
elif self.tx_level > 255:
|
|
self.tx_level = 255
|
|
self.NewUdpStatus()
|
|
def OnButtonRfGain(self, event):
|
|
# The HiQSDR attenuator is five bits: 2, 4, 8, 10, 20 dB
|
|
btn = event.GetEventObject()
|
|
n = btn.index
|
|
self.HiQSDR_Connector_X1 &= ~0x10 # Mask in the preamp bit
|
|
if n == 0: # 0dB
|
|
self.HiQSDR_Attenuator = 0
|
|
self.rf_gain = 0
|
|
elif n == 1: # +10
|
|
self.HiQSDR_Attenuator = 0
|
|
self.HiQSDR_Connector_X1 |= 0x10
|
|
self.rf_gain = 10
|
|
elif n == 2: # -10
|
|
self.HiQSDR_Attenuator = 0x08
|
|
self.rf_gain = -10
|
|
elif n == 3: # -20
|
|
self.HiQSDR_Attenuator = 0x10
|
|
self.rf_gain = -20
|
|
elif n == 4: # -30
|
|
self.HiQSDR_Attenuator = 0x18
|
|
self.rf_gain = -30
|
|
else:
|
|
self.HiQSDR_Attenuator = 0
|
|
self.rf_gain = 0
|
|
print ('Unknown RfGain')
|
|
self.NewUdpStatus()
|
|
def OnButtonPTT(self, event):
|
|
# This feature requires firmware version 1.1 or higher
|
|
if self.firmware_version:
|
|
btn = event.GetEventObject()
|
|
if btn.GetValue(): # Turn the software key bit on or off
|
|
self.tx_control |= 0x08
|
|
else:
|
|
self.tx_control &= ~0x08
|
|
self.NewUdpStatus(True) # Prompt update for PTT
|
|
def OnButtonAntenna(self, event):
|
|
# This feature requires extended IO
|
|
btn = event.GetEventObject()
|
|
if btn.index:
|
|
self.HiQSDR_Bits |= 0x01
|
|
else:
|
|
self.HiQSDR_Bits &= ~0x01
|
|
self.NewUdpStatus()
|
|
def ChangeSidetone(self, value): # The sidetone volume changed
|
|
self.sidetone_volume = int(value * 255.1) # Change 0.0-1.0 to 0-255
|
|
self.NewUdpStatus()
|
|
def HeartBeat(self):
|
|
if self.sndp_active: # AE4JY Simple Network Discovery Protocol - attempt to set the FPGA IP address
|
|
try:
|
|
if DEBUG: print("Sndp send")
|
|
self.socket_sndp.sendto(self.sndp_request, (self.broadcast_addr, 48321))
|
|
data, ffrom = self.socket_sndp.recvfrom(1024)
|
|
if DEBUG: print("Sndp From", ffrom, "Data", repr(data))
|
|
except:
|
|
# traceback.print_exc()
|
|
pass
|
|
else:
|
|
data = bytearray(data)
|
|
if len(data) == 56 and data[5:14] == bytearray(b'HiQSDR-v1'):
|
|
ip = self.conf.rx_udp_ip.split('.')
|
|
t = data[0:4]
|
|
t.append(2)
|
|
t += data[5:37]
|
|
t.append(int(ip[3]))
|
|
t.append(int(ip[2]))
|
|
t.append(int(ip[1]))
|
|
t.append(int(ip[0]))
|
|
t += bytearray(12)
|
|
t.append(self.conf.rx_udp_port & 0xFF)
|
|
t.append(self.conf.rx_udp_port >> 8)
|
|
t.append(0)
|
|
if DEBUG: print("Sndp reply", repr(t))
|
|
self.socket_sndp.sendto(t, (self.broadcast_addr, 48321))
|
|
try: # receive the old status if any
|
|
data = self.rx_udp_socket.recv(1024)
|
|
if DEBUG:
|
|
self.PrintStatus(' got ', data)
|
|
except:
|
|
pass
|
|
else:
|
|
data = bytearray(data)
|
|
if data[0:2] == b'St':
|
|
self.got_udp_status = data
|
|
if self.firmware_version is None: # get the firmware version
|
|
if self.want_udp_status[0:13] != self.got_udp_status[0:13]:
|
|
try:
|
|
self.rx_udp_socket.send(self.want_udp_status)
|
|
if DEBUG:
|
|
self.PrintStatus('Start', self.want_udp_status)
|
|
except:
|
|
pass
|
|
else: # We got a correct response.
|
|
self.firmware_version = self.got_udp_status[13] # Firmware version is returned here
|
|
if DEBUG:
|
|
print ('Got version', self.firmware_version)
|
|
if self.firmware_version > 0 and self.conf.use_rx_udp == 2:
|
|
self.tx_control |= 0x04 # Use extra control bytes
|
|
self.sndp_active = False
|
|
self.NewUdpStatus()
|
|
else:
|
|
if self.want_udp_status != self.got_udp_status:
|
|
if DEBUG:
|
|
self.PrintStatus('Have ', self.got_udp_status)
|
|
self.PrintStatus(' send', self.want_udp_status)
|
|
try:
|
|
self.rx_udp_socket.send(self.want_udp_status)
|
|
except:
|
|
pass
|
|
elif DEBUG:
|
|
self.rx_udp_socket.send(b'Qs')
|
|
def PrintStatus(self, msg, data):
|
|
print (msg, ' ', end=' ')
|
|
print (data[0:2], end=' ')
|
|
for c in data[2:]:
|
|
print ("%2X" % c, end=' ')
|
|
print ()
|
|
def GetFirmwareVersion(self):
|
|
return self.firmware_version
|
|
def OnSpot(self, level):
|
|
# level is -1 for Spot button Off; else the Spot level 0 to 1000.
|
|
# The Spot button sets the mode to SSB-equivalent for CW so that the Spot level works.
|
|
if level >= 0 and not self.usingSpot: # Spot was turned on
|
|
self.usingSpot = True
|
|
self.tx_control |= 0x40
|
|
self.ChangeMode(self.mode)
|
|
elif level < 0 and self.usingSpot: # Spot was turned off
|
|
self.usingSpot = False
|
|
self.tx_control &= ~0x40
|
|
self.ChangeMode(self.mode)
|
|
def OnBtnFDX(self, is_fdx): # Status of FDX button, 0 or 1
|
|
if is_fdx:
|
|
self.HiQSDR_Connector_X1 |= 0x20 # Mask in the FDX bit
|
|
else:
|
|
self.HiQSDR_Connector_X1 &= ~0x20
|
|
self.NewUdpStatus()
|
|
def VarDecimGetChoices(self): # return text labels for the control
|
|
clock = self.conf.rx_udp_clock
|
|
l = [] # a list of sample rates
|
|
for dec in self.decimations:
|
|
l.append(str(int(float(clock) / dec / 1e3 + 0.5)))
|
|
return l
|
|
def VarDecimGetLabel(self): # return a text label for the control
|
|
return "Sample rate ksps"
|
|
def VarDecimGetIndex(self): # return the current index
|
|
return self.index
|
|
def VarDecimSet(self, index=None): # set decimation, return sample rate
|
|
if index is None: # initial call to set decimation before the call to open()
|
|
rate = self.application.vardecim_set # May be None or from different hardware
|
|
try:
|
|
dec = int(float(self.conf.rx_udp_clock // rate + 0.5))
|
|
self.index = self.decimations.index(dec)
|
|
except:
|
|
try:
|
|
self.index = self.decimations.index(self.conf.rx_udp_decimation)
|
|
except:
|
|
self.index = 0
|
|
else:
|
|
self.index = index
|
|
dec = self.decimations[self.index]
|
|
if dec >= 128:
|
|
self.rx_control = dec // 64 - 1 # Second stage decimation less one
|
|
QS.set_sample_bytes(3)
|
|
else:
|
|
self.rx_control = dec // 16 - 1 # Second stage decimation less one
|
|
self.rx_control |= 0b01000000 # Change prescaler to 2 (instead of 8)
|
|
QS.set_sample_bytes(2)
|
|
self.NewUdpStatus()
|
|
return int(float(self.conf.rx_udp_clock) / dec + 0.5)
|
|
def VarDecimRange(self):
|
|
return (48000, 960000)
|
|
def NewUdpStatus(self, do_tx=False):
|
|
s = bytearray(b'St')
|
|
s = s + struct.pack("<L", self.rx_phase)
|
|
s = s + struct.pack("<L", self.tx_phase)
|
|
s.append(self.tx_level & 0xFF)
|
|
s.append(self.tx_control & 0xFF)
|
|
s.append(self.rx_control & 0xFF)
|
|
if self.firmware_version: # Add the version
|
|
s.append(self.firmware_version & 0xFF) # The firmware version will be returned
|
|
if self.tx_control & 0x04: # Use extra HiQSDR control bytes
|
|
s.append(self.HiQSDR_Connector_X1 & 0xFF)
|
|
s.append(self.HiQSDR_Attenuator & 0xFF)
|
|
s.append(self.HiQSDR_Bits & 0xFF)
|
|
else:
|
|
s += bytearray(3)
|
|
s.append(self.sidetone_volume & 0xFF)
|
|
s = s + struct.pack("<H", self.vna_count)
|
|
s.append(self.cw_delay & 0xFF)
|
|
s.append(0)
|
|
else: # firmware version 0 or None
|
|
s.append(0) # assume version 0
|
|
self.want_udp_status = s
|
|
if do_tx:
|
|
try:
|
|
self.rx_udp_socket.send(s)
|
|
except:
|
|
pass
|
|
def SetVNA(self, key_down=None, vna_start=None, vna_stop=None, vna_count=None, do_tx=False):
|
|
if key_down is None:
|
|
pass
|
|
elif key_down:
|
|
self.tx_control |= 0x08
|
|
else:
|
|
self.tx_control &= ~0x08
|
|
if vna_count is not None:
|
|
self.vna_count = vna_count # Number of scan points
|
|
if vna_start is not None: # Set the start and stop frequencies. The tx_phase is the frequency delta.
|
|
self.rx_phase = int(float(vna_start) / self.conf.rx_udp_clock * 2.0**32 + 0.5) & 0xFFFFFFFF
|
|
self.tx_phase = int(float(vna_stop - vna_start) / (self.vna_count - 1) / self.conf.rx_udp_clock * 2.0**32 + 0.5) & 0xFFFFFFFF
|
|
self.tx_control &= ~0x03 # Erase last two bits
|
|
self.rx_control = 40 - 1
|
|
self.tx_level = 255
|
|
self.NewUdpStatus(do_tx)
|
|
start = int(float(self.rx_phase) * self.conf.rx_udp_clock / 2.0**32 + 0.5)
|
|
phase = self.rx_phase + self.tx_phase * (self.vna_count - 1)
|
|
stop = int(float(phase) * self.conf.rx_udp_clock / 2.0**32 + 0.5)
|
|
return start, stop # return the start and stop frequencies after integer rounding
|