quisk-kc4upr/hermes/quisk_hardware.py

932 lines
37 KiB
Python
Executable File

# This is a sample hardware file for UDP control using the Hermes-Metis protocol. Use this for
# the HermesLite project. It can also be used for the HPSDR, but since I don't have one, I
# can't test it.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
import socket, traceback, time, math, os
import wx
import _quisk as QS
import quisk_utils
from quisk_hardware_model import Hardware as BaseHardware
DEBUG = 0
class Hardware(BaseHardware):
var_rates = ['48', '96', '192', '384']
def __init__(self, app, conf):
BaseHardware.__init__(self, app, conf)
self.var_index = 0
self.hermes_mac = bytearray(6)
self.hermes_ip = ""
self.hermes_code_version = -1
self.hermes_board_id = -1
self.hermes_temperature = 0.0
self.hermes_fwd_power = 0.0
self.hermes_rev_power = 0.0
self.hermes_pa_current = 0.0
self.eeprom_valid = 0
self.alex_hpf_f1 = 0
self.alex_hpf_f2 = 0
self.alex_lpf_f1 = 0
self.alex_lpf_f2 = 0
self.mode = None
self.band = None
self.vfo_frequency = 0
self.tx_frequency = 0
self.vna_count = 0
self.vna_started = False
self.repeater_freq = None # original repeater output frequency
try:
self.repeater_delay = conf.repeater_delay # delay for changing repeater frequency in seconds
except:
self.repeater_delay = 0.25
self.repeater_time0 = 0 # time of repeater change in frequency
# Create the proper broadcast addresses for socket_discover
if conf.udp_rx_ip: # Known IP address of hardware
self.broadcast_addrs = [conf.udp_rx_ip]
else:
self.broadcast_addrs = []
for interf in QS.ip_interfaces():
broadc = interf[3] # broadcast address
if broadc and broadc[0:4] != '127.':
self.broadcast_addrs.append(broadc)
self.broadcast_addrs.append('255.255.255.255')
if DEBUG: print ('broadcast_addrs', self.broadcast_addrs)
# This is the control data to send to the Hermes using the Metis protocol
# Duplex must be on or else the first Rx frequency is locked to the Tx frequency
self.pc2hermes = bytearray(17 * 4) # Control bytes not including C0. Python initializes this to zero.
self.pc2hermeslitewritequeue = bytearray(4 * 5)
self.pc2hermes[3] = 0x04 # C0 index == 0, C4[5:3]: number of receivers 0b000 -> one receiver; C4[2] duplex on
self.pc2hermes[4 * 9] = 63 # C0 index == 0b1001, C1[7:0] Tx level
for c0 in range(1, 9): # Set all frequencies to 7012352, 0x006B0000
self.SetControlByte(c0, 2, 0x6B)
value = conf.keyupDelay
if value > 1023:
value = 1023
self.SetControlByte(0x10, 2, value & 0x3) # cw_hang_time
self.SetControlByte(0x10, 1, (value >> 2) & 0xFF) # cw_hang_time
self.SetLowPwrEnable(conf.hermes_lowpwr_tr_enable)
self.EnablePowerAmp(conf.hermes_power_amp)
self.ChangeTxLNA(conf.hermes_TxLNA_dB)
self.MakePowerCalibration()
def pre_open(self):
# This socket is used for the Metis Discover protocol
self.discover_request = b"\xEF\xFE\x02" + b"\x00" * 60
self.socket_discover = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket_discover.setblocking(0)
self.socket_discover.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
found = False
st = "No capture device found."
port = self.conf.rx_udp_port
for i in range(5):
if found:
break
if DEBUG: print ('Send discover')
try:
for broadcast_addr in self.broadcast_addrs:
self.socket_discover.sendto(self.discover_request, (broadcast_addr, port))
if DEBUG: print ('discover_request', (broadcast_addr, port))
time.sleep(0.01)
except:
if DEBUG > 1: traceback.print_exc()
for j in range(5):
try:
data, addr = self.socket_discover.recvfrom(1500)
except:
if DEBUG > 1: traceback.print_exc()
time.sleep(0.02)
continue
else:
if DEBUG: print('recvfrom', addr, 'length', len(data), "type", type(data))
data = bytearray(data)
if len(data) > 32 and data[0] == 0xEF and data[1] == 0xFE:
if DEBUG: print('data', data)
ver = self.conf.hermes_code_version
bid = self.conf.hermes_board_id
if ver >= 0 and data[9] != ver:
pass
elif bid >= 0 and data[10] != bid:
pass
else:
st = 'Capture from Hermes device: Mac %2x:%2x:%2x:%2x:%2x:%2x, Code version %d, ID %d' % tuple(data[3:11])
self.hermes_mac = data[3:9]
self.hermes_ip = addr[0]
self.hermes_code_version = data[9]
self.hermes_board_id = data[10]
QS.set_hermes_id(data[9], data[10])
if data[0x16] >> 6 == 0:
QS.set_params(bandscopeScale = 2048)
if DEBUG: print (st)
adr = self.conf.rx_udp_ip
found = True
if adr and adr != addr[0]: # Change the IP address
if DEBUG: print("Change IP address from %s to %s" % (addr[0], adr))
ip = adr.split('.')
ip = list(map(int, ip))
cmd = bytearray(73)
cmd[0] = 0xEF
cmd[1] = 0xFE
cmd[2] = 0x03
cmd[3] = data[3]
cmd[4] = data[4]
cmd[5] = data[5]
cmd[6] = data[6]
cmd[7] = data[7]
cmd[8] = data[8]
cmd[9] = ip[0]
cmd[10] = ip[1]
cmd[11] = ip[2]
cmd[12] = ip[3]
for broadcast_addr in self.broadcast_addrs:
self.socket_discover.sendto(cmd, (broadcast_addr, port))
time.sleep(0.01)
# Note: There is no response, contrary to the documentation
self.hermes_ip = adr
if False:
try:
data, addr = self.socket_discover.recvfrom(1500)
except:
if DEBUG: traceback.print_exc()
else:
print(repr(data), addr)
##self.hermes_ip = adr
time.sleep(1.0)
st += ', IP %s' % self.hermes_ip
break
if not found and self.conf.udp_rx_ip:
self.hermes_ip = self.conf.udp_rx_ip
code = 62
bid = 6
self.hermes_code_version = code
self.hermes_board_id = bid
QS.set_hermes_id(code, bid)
st = 'Capture from Hermes device at specified IP %s' % self.hermes_ip
found = True
if found:
# Open a socket for communication with the hardware
msg = QS.open_rx_udp(self.hermes_ip, port)
if msg[0:8] != "Capture ":
st = msg # Error
self.socket_discover.close()
self.config_text = st
self.ChangeLNA(2) # Initialize the LNA using the correct LNA code from the FPGA code version
def open(self):
return self.config_text
def GetValue(self, name): # return values stored in the hardware
if name == 'Hware_Hl2_EepromIP':
addr1 = self.ReadEEPROM(0x08)
addr2 = self.ReadEEPROM(0x09)
addr3 = self.ReadEEPROM(0x0A)
addr4 = self.ReadEEPROM(0x0B)
if addr1 < 0 or addr2 < 0 or addr3 < 0 or addr4 < 0:
return "Read failed"
else:
return "%d.%d.%d.%d" % (addr1, addr2, addr3, addr4)
elif name == 'Hware_Hl2_EepromIPUse':
use = self.ReadEEPROM(0x06)
if use < 0:
return "Read failed"
if not use & 0b10000000:
return 'Ignore'
elif use & 0b100000:
return 'Use DHCP first'
else:
return 'Set address'
elif name == 'Hware_Hl2_EepromMAC':
addr1 = self.ReadEEPROM(0x0C)
addr2 = self.ReadEEPROM(0x0D)
if addr1 < 0 or addr2 < 0:
return "Read failed"
else:
return "0x%X 0x%X" % (addr1, addr2)
elif name == 'Hware_Hl2_EepromMACUse':
use = self.ReadEEPROM(0x06)
if use < 0:
return "Read failed"
if use & 0b1000000:
return 'Set address'
else:
return 'Ignore'
return "Name failed"
def SetValue(self, ctrl):
name = ctrl.quisk_data_name
value = ctrl.GetValue()
if name == 'Hware_Hl2_EepromIP':
try:
addr1, addr2, addr3, addr4 = value.split('.')
addr1 = int(addr1)
addr2 = int(addr2)
addr3 = int(addr3)
addr4 = int(addr4)
except:
pass
else:
self.WriteEEPROM(0x08, addr1)
self.WriteEEPROM(0x09, addr2)
self.WriteEEPROM(0x0A, addr3)
self.WriteEEPROM(0x0B, addr4)
elif name == 'Hware_Hl2_EepromIPUse':
use = self.ReadEEPROM(0x06)
if use >= 0:
self.eeprom_valid = use
if value == 'Ignore':
self.eeprom_valid &= ~0b0010000000
elif value == 'Use DHCP first':
self.eeprom_valid |= 0b0010100000
elif value == 'Set address':
self.eeprom_valid |= 0b0010000000
self.eeprom_valid &= ~0b0000100000
self.WriteEEPROM(0x06, self.eeprom_valid)
elif name == 'Hware_Hl2_EepromMAC':
try:
addr1, addr2 = value.split()
addr1 = int(addr1, base=0)
addr2 = int(addr2, base=0)
except:
pass
else:
self.WriteEEPROM(0x0C, addr1)
self.WriteEEPROM(0x0D, addr2)
elif name == 'Hware_Hl2_EepromMACUse':
use = self.ReadEEPROM(0x06)
if use >= 0:
self.eeprom_valid = use
if value == 'Ignore':
self.eeprom_valid &= ~0b0001000000
elif value == 'Set address':
self.eeprom_valid |= 0b0001000000
self.WriteEEPROM(0x06, self.eeprom_valid)
def GetControlByte(self, C0_index, byte_index):
# Get the control byte at C0 index and byte index. The bytes are C0, C1, C2, C3, C4.
# The C0 index is 0 to 16 inclusive. The byte index is 1 to 4. The byte index of C2 is 2.
return self.pc2hermes[C0_index * 4 + byte_index - 1]
def SetControlByte(self, C0_index, byte_index, value): # Set the control byte as above.
self.pc2hermes[C0_index * 4 + byte_index - 1] = value
QS.pc_to_hermes(self.pc2hermes)
if DEBUG: print ("SetControlByte C0_index %d byte_index %d to 0x%X" % (C0_index, byte_index, value))
def ChangeFrequency(self, tx_freq, vfo_freq, source='', band='', event=None):
if tx_freq and tx_freq > 0:
if source == 'BtnBand' or abs(tx_freq - self.tx_frequency) > 1000000:
self.ChangeAlexFilters(tx_freq=tx_freq)
self.tx_frequency = tx_freq
tx = int(tx_freq - self.transverter_offset)
self.pc2hermes[ 4] = tx >> 24 & 0xff # C0 index == 1, C1, C2, C3, C4: Tx freq, MSB in C1
self.pc2hermes[ 5] = tx >> 16 & 0xff
self.pc2hermes[ 6] = tx >> 8 & 0xff
self.pc2hermes[ 7] = tx & 0xff
if self.vfo_frequency != vfo_freq:
self.vfo_frequency = vfo_freq
vfo = int(vfo_freq - self.transverter_offset)
self.pc2hermes[ 8] = vfo >> 24 & 0xff # C0 index == 2, C1, C2, C3, C4: Rx freq, MSB in C1
self.pc2hermes[ 9] = vfo >> 16 & 0xff
self.pc2hermes[10] = vfo >> 8 & 0xff
self.pc2hermes[11] = vfo & 0xff
if DEBUG > 1: print("Change freq Tx", tx_freq, "Rx", vfo_freq)
QS.pc_to_hermes(self.pc2hermes)
return tx_freq, vfo_freq
def ChangeAlexFilters(self, tx_freq=None, edit=False):
if tx_freq is None:
tx_freq = self.tx_frequency
fmin = tx_freq
fmax = tx_freq
for pane in self.application.multi_rx_screen.receiver_list:
freq = pane.VFO + pane.txFreq
if freq < fmin:
fmin = freq
if freq > fmax:
fmax = freq
if DEBUG: print ('fmin', fmin, 'fmax', fmax)
if edit or not (self.alex_hpf_f1 <= fmin < self.alex_hpf_f2): # Within same HP filter?
self.alex_hpf_f1, self.alex_hpf_f2, rx, tx = self.FreqAlexFilters(fmin, self.conf.AlexHPF, self.conf.AlexHPF_TxEn)
if DEBUG: print ("Change HP filter fmin, f1, f2", fmin, self.alex_hpf_f1, self.alex_hpf_f2, "rx, tx", rx, tx)
QS.set_alex_hpf(rx, tx)
if edit or not (self.alex_lpf_f1 <= fmax < self.alex_lpf_f2): # Within same LP filter?
self.alex_lpf_f1, self.alex_lpf_f2, rx, tx = self.FreqAlexFilters(fmax, self.conf.AlexLPF, self.conf.AlexLPF_TxEn)
if DEBUG: print ("Change LP filter fmax, f1, f2", fmax, self.alex_lpf_f1, self.alex_lpf_f2, "rx, tx", rx, tx)
QS.set_alex_lpf(rx, tx)
def FreqAlexFilters(self, tx_freq, filt, enabl):
# Find the new frequency band
gap1 = 0.0 # If we are in a gap in the filter frequencies, this is f1 and f2 for the gap.
gap2 = 1E20
for f1, f2, rx, tx in filt: # f1 and f2 are strings in MHz
try:
f1 = float(f1) * 1E6
f2 = float(f2) * 1E6
except:
continue
if f1 >= f2:
continue
if f1 <= tx_freq < f2:
if not enabl:
tx = rx
return f1, f2, rx, tx
if tx_freq >= f2 and f2 > gap1:
gap1 = f2
if tx_freq <= f1 and f1 < gap2:
gap2 = f1
return gap1, gap2, 0, 0
def Freq2Phase(self, freq=None): # Return the phase increment as calculated by the FPGA
# This code attempts to duplicate the calculation of phase increment in the FPGA code.
clock = ((int(self.conf.rx_udp_clock) + 24000) // 48000) * 48000 # this assumes the nominal clock is a multiple of 48kHz
M2 = 2 ** 57 // clock
M3 = 2 ** 24
if freq is None:
freqcomp = int(self.vfo_frequency - self.transverter_offset) * M2 + M3
else:
freqcomp = int(freq) * M2 + M3
phase = (freqcomp // 2 ** 25) & 0xFFFFFFFF
return phase
def ReturnVfoFloat(self, freq=None): # Return the accurate VFO as a float
phase = self.Freq2Phase(freq)
freq = float(phase) * self.conf.rx_udp_clock / 2.0**32
return freq
def ReturnFrequency(self): # Return the current tuning and VFO frequency
return None, None # frequencies have not changed
def HeartBeat(self):
self.hermes_temperature, self.hermes_fwd_power, self.hermes_rev_power, self.hermes_pa_current = QS.get_hermes_TFRC()
if self.application.bottom_widgets:
self.application.bottom_widgets.UpdateText()
def RepeaterOffset(self, offset=None): # Change frequency for repeater offset during Tx
if offset is None: # Return True if frequency change is complete
if time.time() > self.repeater_time0 + self.repeater_delay:
return True
elif offset == 0: # Change back to the original frequency
if self.repeater_freq is not None:
self.repeater_time0 = time.time()
self.ChangeFrequency(self.repeater_freq, self.vfo_frequency, 'repeater')
self.repeater_freq = None
else: # Shift to repeater input frequency
self.repeater_freq = self.tx_frequency
offset = int(offset * 1000) # Convert kHz to Hz
self.repeater_time0 = time.time()
self.ChangeFrequency(self.tx_frequency + offset, self.vfo_frequency, 'repeater')
return False
def ChangeBand(self, band):
# band is a string: "60", "40", "WWV", etc.
BaseHardware.ChangeBand(self, band)
self.band = band
self.ChangeBandFilters()
self.SetTxLevel()
def ChangeBandFilters(self):
highest = self.band
freq = self.conf.BandEdge.get(highest, (0, 0))[0]
for pane in self.application.multi_rx_screen.receiver_list:
f = self.conf.BandEdge.get(pane.band, (0, 0))[0]
if freq < f:
freq = f
highest = pane.band
Rx = self.conf.Hermes_BandDict.get(highest, 0)
self.SetControlByte(0, 2, Rx << 1) # C0 index == 0, C2[7:1]: user output
if self.conf.Hermes_BandDictEnTx:
Tx = self.conf.Hermes_BandDictTx.get(self.band, 0) # Use Tx filter
else:
Tx = self.conf.Hermes_BandDict.get(self.band, 0) # Use Rx filter
QS.set_hermes_filters(Rx, Tx)
self.ChangeAlexFilters()
def ChangeMode(self, mode):
# mode is a string: "USB", "AM", etc.
BaseHardware.ChangeMode(self, mode)
self.mode = mode
self.SetTxLevel()
def OnButtonPTT(self, event):
btn = event.GetEventObject()
if btn.GetValue():
QS.set_PTT(1)
QS.set_key_down(1)
else:
QS.set_PTT(0)
QS.set_key_down(0)
def OnSpot(self, level):
# level is -1 for Spot button Off; else the Spot level 0 to 1000.
pass
def VarDecimGetChoices(self): # return text labels for the control
return self.var_rates
def VarDecimGetLabel(self): # return a text label for the control
return "Sample rate ksps"
def VarDecimGetIndex(self): # return the current index
return self.var_index
def VarDecimSet(self, index=None): # set decimation, return sample rate
if index is None: # initial call to set rate before the call to open()
rate = self.application.vardecim_set # May be None or from different hardware
else:
rate = int(self.var_rates[index]) * 1000
if rate == 48000:
self.var_index = 0
elif rate == 96000:
self.var_index = 1
elif rate == 192000:
self.var_index = 2
elif rate == 384000:
self.var_index = 3
else:
self.var_index = 0
rate = 48000
self.pc2hermes[0] = self.var_index # C0 index == 0, C1[1:0]: rate
QS.pc_to_hermes(self.pc2hermes)
if DEBUG: print ("Change sample rate to", rate)
return rate
def VarDecimRange(self):
return (48000, 384000)
## Hardware AGC is no longer supported in HL2 identifying as version >=40
def ChangeAGC(self, value):
if value:
self.pc2hermes[2] |= 0x10 # C0 index == 0, C3[4]: AGC enable
else:
self.pc2hermes[2] &= ~0x10
QS.pc_to_hermes(self.pc2hermes)
if DEBUG: print ("Change AGC to", value)
## Simpler LNA setting for HL2 identifying as version >=40, see HL2 wiki for details
def ChangeLNA(self, value): # LNA for Rx
# value is -12 to +48
if self.hermes_code_version < 40: # Is this correct ??
if value < 20:
self.pc2hermes[2] |= 0x08 # C0 index == 0, C3[3]: LNA +32 dB disable == 1
value = 19 - value
else:
self.pc2hermes[2] &= ~0x08 # C0 index == 0, C3[3]: LNA +32 dB enable == 0
value = 51 - value
else:
value = ((value+12) & 0x3f) | 0x40
self.pc2hermes[4 * 10 + 3] = value # C0 index == 0x1010, C4[4:0] LNA 0-32 dB gain
QS.pc_to_hermes(self.pc2hermes)
if DEBUG: print ("Change LNA to", value)
def ChangeTxLNA(self, value): # LNA for Tx
# value is -12 to +48
value = ((value+12) & 0x3f) | 0x40 | 0x80
self.SetControlByte(0x0e, 3, value) # C0 index == 0x0e, C3
QS.pc_to_hermes(self.pc2hermes)
if DEBUG: print ("Change Tx LNA to", value)
def SetTxLevel(self):
try:
tx_level = self.conf.tx_level[self.band]
except KeyError:
tx_level = self.conf.tx_level.get(None, 127) # The default
if self.mode[0:3] in ('DGT', 'FDV'): # Digital modes; change power by a percentage
reduc = self.application.digital_tx_level
else:
reduc = self.application.tx_level
tx_level = int(tx_level *reduc/100.0)
if tx_level < 0:
tx_level = 0
elif tx_level > 255:
tx_level = 255
self.pc2hermes[4 * 9] = tx_level # C0 index == 0x1001, C1[7:0] Tx level
QS.pc_to_hermes(self.pc2hermes)
if DEBUG: print("Change tx_level to", tx_level)
def MultiRxCount(self, count): # count == number of additional receivers besides the Tx/Rx receiver: 1, 2, 3
# C0 index == 0, C4[5:3]: number of receivers 0b000 -> one receiver; C4[2] duplex on
self.pc2hermes[3] = 0x04 | count << 3
QS.pc_to_hermes(self.pc2hermes)
if DEBUG: print("Change MultiRx count to", count)
def MultiRxFrequency(self, index, vfo): # index of multi rx receiver: 0, 1, 2, ...
if DEBUG: print("Change MultiRx %d frequency to %d" % (index, vfo))
index = index * 4 + 12 # index does not include first Tx/Rx receiver in C0 index == 1, 2
self.pc2hermes[index ] = vfo >> 24 & 0xff
self.pc2hermes[index + 1] = vfo >> 16 & 0xff # C1, C2, C3, C4: Rx freq, MSB in C1
self.pc2hermes[index + 2] = vfo >> 8 & 0xff
self.pc2hermes[index + 3] = vfo & 0xff
QS.pc_to_hermes(self.pc2hermes)
def SetVNA(self, key_down=None, vna_start=None, vna_stop=None, vna_count=None, do_tx=False):
if vna_count is not None: # must be called first
self.vna_count = vna_count
if vna_start is None:
start = 0
stop = 0
else: # Set the start and stop frequencies and the frequency change for each point
# vna_start and vna_stop must be specified together
self.pc2hermes[ 4] = vna_start >> 24 & 0xff # C0 index == 1, C1, C2, C3, C4: Tx freq, MSB in C1
self.pc2hermes[ 5] = vna_start >> 16 & 0xff # used for vna starting frequency
self.pc2hermes[ 6] = vna_start >> 8 & 0xff
self.pc2hermes[ 7] = vna_start & 0xff
N = self.vna_count - 1
ph_start = self.Freq2Phase(vna_start) # Calculate using phases
ph_stop = self.Freq2Phase(vna_stop)
delta = (ph_stop - ph_start + N // 2) // N
delta = int(float(delta) * self.conf.rx_udp_clock / 2.0**32 + 0.5)
self.pc2hermes[ 8] = delta >> 24 & 0xff # C0 index == 2, C1, C2, C3, C4: Rx freq, MSB in C1
self.pc2hermes[ 9] = delta >> 16 & 0xff # used for the frequency to add for each point
self.pc2hermes[10] = delta >> 8 & 0xff
self.pc2hermes[11] = delta & 0xff
self.pc2hermes[4 * 9 + 2] = (self.vna_count >> 8) & 0xff # C0 index == 0b1001, C3
self.pc2hermes[4 * 9 + 3] = self.vna_count & 0xff # C0 index == 0b1001, C4
QS.pc_to_hermes(self.pc2hermes)
start = self.ReturnVfoFloat(vna_start)
phase = ph_start + self.Freq2Phase(delta) * N
stop = float(phase) * self.conf.rx_udp_clock / 2.0**32
start = int(start + 0.5)
stop = int(stop + 0.5)
if DEBUG: print ("Change VNA start", vna_start, start, "stop", vna_stop, stop, 'count', self.vna_count)
if key_down is None:
pass
elif key_down:
if not self.vna_started:
self.vna_started = True
self.SetControlByte(9, 2, 0x80) # turn on VNA mode
QS.set_PTT(1)
else:
QS.set_PTT(0)
return start, stop # Return actual frequencies after all phase rounding
def EnablePowerAmp(self, enable):
if enable:
self.pc2hermes[4 * 9 + 1] |= 0x08 # C0 index == 9, C2
else:
self.pc2hermes[4 * 9 + 1] &= ~0x08
QS.pc_to_hermes(self.pc2hermes)
if DEBUG: print ("Change PwrAmp 0x%X" % (self.pc2hermes[4*9 + 1]))
def SetLowPwrEnable(self, enable):
if enable:
self.pc2hermes[4 * 9 + 1] |= 0x04 # C0 index == 9, C2
else:
self.pc2hermes[4 * 9 + 1] &= ~0x04
QS.pc_to_hermes(self.pc2hermes)
if DEBUG: print ("Change LpPwrEnable 0x%X" % (self.pc2hermes[4*9 + 1]))
def DisableSyncFreq(self, value): # Thanks to Steve, KF7O
if value:
self.pc2hermes[4 * 0 + 2] |= 0x10 # C0 index == 0, C3
else:
self.pc2hermes[4 * 0 + 2] &= ~0x10
QS.pc_to_hermes(self.pc2hermes)
if DEBUG: print ("Change SyncFreq 0x%X" % (self.pc2hermes[4*0 + 2]))
def EnableBiasChange(self, enable):
# Bias settings are in location 12, 13, 14, 15, and are not sent unless C1 == 0x06
if enable:
for base in (12, 13, 14, 15):
self.pc2hermes[4 * base] = 0x06 # C1
self.pc2hermes[4 * base + 1] = 0xA8 # C2
self.pc2hermes[4 * 12 + 2] = 0x00 # C3 bias 1, volitile
self.pc2hermes[4 * 13 + 2] = 0x20 # C3 bias 1, non-volitile
self.pc2hermes[4 * 14 + 2] = 0x10 # C3 bias 2, volitile
self.pc2hermes[4 * 15 + 2] = 0x30 # C3 bias 2, non-volitile
else:
for base in (12, 13, 14, 15):
self.pc2hermes[4 * base] = 0x00 # C1
QS.pc_to_hermes(self.pc2hermes)
if DEBUG: print ("Enable bias change", enable)
## Bias is 0 indexed to match schematic
## Changes for HermesLite v2 thanks to Steve, KF7O
def ChangeBias0(self, value):
if self.hermes_code_version >= 60:
i2caddr,value = 0xac,(value%256)
else:
i2caddr,value = 0xa8,(255-(value%256))
self.pc2hermeslitewritequeue[0:5] = 0x7d,0x06,i2caddr,0x00,value
self.WriteQueue(1)
if DEBUG: print ("Change bias 0", value)
def ChangeBias1(self, value):
if self.hermes_code_version >= 60:
i2caddr,value = 0xac,(value%256)
else:
i2caddr,value = 0xa8,(255-(value%256))
self.pc2hermeslitewritequeue[0:5] = 0x7d,0x06,i2caddr,0x10,value
self.WriteQueue(1)
if DEBUG: print ("Change bias 1", value)
def WriteBias(self, value0, value1):
if self.hermes_code_version >= 60:
i2caddr,value0 = 0xac,(value0%256)
else:
i2caddr,value0 = 0xa8,(255-(value0%256))
self.pc2hermeslitewritequeue[0:5] = 0x7d,0x06,i2caddr,0x20,value0
self.WriteQueue(1)
## Wait >10ms as that is the longest EEPROM write cycle time
time.sleep(0.015)
value1 = (value1%256) if self.hermes_code_version >= 60 else (255-(value1%256))
self.pc2hermeslitewritequeue[0:5] = 0x7d,0x06,i2caddr,0x30,value1
self.WriteQueue(1)
## Double write bias to EEPROM
time.sleep(0.030)
self.pc2hermeslitewritequeue[0:5] = 0x7d,0x06,i2caddr,0x30,value1
self.WriteQueue(1)
time.sleep(0.015)
self.pc2hermeslitewritequeue[0:5] = 0x7d,0x06,i2caddr,0x20,value0
self.WriteQueue(1)
if DEBUG: print ("Write bias", value0, value1)
def WriteQueue(self,qlen):
## Make sure last write(s) went through
dt = 0.005 ## 5 ms initial delay
while QS.get_hermeslite_writepointer() > 0:
time.sleep(dt)
dt *= 2
if dt > 0.300:
print("ERROR: Hermes-Lite write queue timeout")
return
## Send next write(s)
QS.pc_to_hermeslite_writequeue(self.pc2hermeslitewritequeue)
QS.set_hermeslite_writepointer(qlen)
if DEBUG:
if dt > 0.005: print("Final dt in Hermes-Lite write queue was",dt)
## In HL2 firmware identifying as version >=40, AD9866 access is available
## See AD9866 datasheet for details, some examples:
## self.writeAD9866(0x08,0xff) ## Set LPF target frequency
## self.WriteAD9866(0x07,0x01) ## Enable RX LPF, RX to high power usage
## self.WriteAD9866(0x07,0x00) ## Disable RX LPF, RX to high power usage
## self.WriteAD9866(0x0e,0x81) ## Low digital drive strength
## self.WriteAD9866(0x0e,0x01) ## High digital drive strength
## Set RX bias to default levels
## cpga = 0
## spga = 0
## adcb = 0
## self.WriteAD9866(0x13,((cpga & 0x07) << 5) | ((spga & 0x03) << 3) | (adcb & 0x07))
def WriteAD9866(self,addr,data):
addr = addr & 0x01f
data = data & 0x0ff
self.pc2hermeslitewritequeue[0:5] = 0x7b,0x06,addr,0x00,data
self.WriteQueue(1)
if DEBUG: print ("Write AD9866 addr={0:06x} data={1:06x}".format(addr,data))
def MakePowerCalibration(self):
# Use spline interpolation to convert the ADC power sensor value to power in watts
name = self.conf.power_meter_calib_name
try: # look in config file
table = self.conf.power_meter_std_calibrations[name]
except:
try: # look in local name space
table = self.application.local_conf.GetRadioDict().get('power_meter_local_calibrations', {})[name]
except: # not found
self.power_interpolator = None
return
if len(table) < 3:
self.power_interpolator = None
return
table.sort()
if table[0][0] > 0: # Add zero code at zero power
table.insert(0, [0, 0.0])
# fill out the table to the maximum code 4095
l = len(table) - 1
x = table[l][0] * 1.1 # voltage increase
y = table[l][1] * 1.1**2 # square law power increase
while 1:
table.append([x, y])
if x > 4095:
break
x *= 1.1
y *= 1.1**2
self.power_interpolator = quisk_utils.SplineInterpolator(table)
def InterpolatePower(self, x):
if not self.power_interpolator:
return 0.0
y = self.power_interpolator.Interpolate(x)
if y < 0.0:
y = 0.0
return y
def VersaOut2(self, divisor): # Use the VersaClock output 2 with a floating point divisor
div = int(divisor * 2**24 + 0.1)
intgr = div >> 24
frac = (div & 0xFFFFFF) << 2
self.WriteVersa5(0x62,0x3b) # Clock2 CMOS1 output, 3.3V
self.WriteVersa5(0x2c,0x00) # Disable aux output on clock 1
self.WriteVersa5(0x31,0x81) # Use divider for clock2
# Integer portion
self.WriteVersa5(0x3d, intgr >> 4)
self.WriteVersa5(0x3e, intgr << 4)
# Fractional portion
self.WriteVersa5(0x32,frac >> 24) # [29:22]
self.WriteVersa5(0x33,frac >> 16) # [21:14]
self.WriteVersa5(0x34,frac >> 8) # [13:6]
self.WriteVersa5(0x35,(frac & 0xFF)<<2) # [5:0] and disable ss
self.WriteVersa5(0x63,0x01) # Enable clock2
# Thanks to Steve Haynal for VersaClock code:
def WriteVersa5(self,addr,data):
data = data & 0x0ff
addr = addr & 0x0ff
## i2caddr is 7 bits, no read write
## Bit 8 is set to indicate stop to HL2
## i2caddr = 0x80 | (0xd4 >> 1) ## ea
self.pc2hermeslitewritequeue[0:5] = 0x7c,0x06,0xea,addr,data
self.WriteQueue(1)
def EnableCL2_sync76p8MHz(self):
self.WriteVersa5(0x62,0x3b) ## Clock2 CMOS1 output, 3.3V
self.WriteVersa5(0x2c,0x01) ## Enable aux output on clock 1
self.WriteVersa5(0x31,0x0c) ## Use clock1 aux output as input for clock2
self.WriteVersa5(0x63,0x01) ## Enable clock2
def EnableCL2_61p44MHz(self):
self.WriteVersa5(0x62,0x3b) ## Clock2 CMOS1 output, 3.3V
self.WriteVersa5(0x2c,0x00) ## Disable aux output on clock 1
self.WriteVersa5(0x31,0x81) ## Use divider for clock2
## VCO multiplier is shared for all outputs, set to 68 by firmware
## VCO = 38.4*68 = 2611.2 MHz
## There is a hardwired divide by 2 in the Versa 5 at the VCO output
## VCO to Dividers = 2611.2 MHZ/2 = 1305.6
## Target frequency of 61.44 requires dividers of 1305.6/61.44 = 21.25
## Frational dividers are supported
## Set integer portion of divider 21 = 0x15, 12 bits split across 2 registers
self.WriteVersa5(0x3d,0x01)
self.WriteVersa5(0x3e,0x50)
## Set fractional portion, 30 bits, 2**24 * .25 = 0x400000
self.WriteVersa5(0x32,0x01) ## [29:22]
self.WriteVersa5(0x33,0x00) ## [21:14]
self.WriteVersa5(0x34,0x00) ## [13:6]
self.WriteVersa5(0x35,0x00) ## [5:0] and disable ss
self.WriteVersa5(0x63,0x01) ## Enable clock2
def WriteEEPROM(self, addr, value):
## Write values into the MCP4662 EEPROM registers
## For example, to set a fixed IP of 192.168.33.20
## hw.WriteEEPROM(8,192)
## hw.WriteEEPROM(9,168)
## hw.WriteEEPROM(10,33)
## hw.WriteEEPROM(11,20)
## To set the last two values of the MAC to 55:66
## hw.WriteEEPROM(12,55)
## hw.WriteEEPROM(13,66)
## To enable the fixed IP and alternate MAC, and favor DHCP
## hw.WriteEEPROM(6, 0x80 | 0x40 | 0x20)
## See https://github.com/softerhardware/Hermes-Lite2/wiki/Protocol
if self.hermes_code_version >= 60:
i2caddr,value = 0xac,(value%256)
else:
i2caddr,value = 0xa8,(255-(value%256))
addr = (addr << 4)%256
self.pc2hermeslitewritequeue[0:5] = 0x7d,0x06,i2caddr,addr,value
self.WriteQueue(1)
if DEBUG: print ("Write EEPROM", addr, value)
def ReadEEPROM(self, addr):
## To read the bias settings for bias0 and bias1
## hw.ReadEEPROM(2)
## hw.ReadEEPROM(3)
if self.hermes_code_version >= 60:
i2caddr = 0xac
else:
i2caddr = 0xa8
faddr = ((addr << 4)%256) | 0xc
QS.clear_hermeslite_response()
self.pc2hermeslitewritequeue[0:5] = 0x7d,0x07,i2caddr,faddr,0
self.WriteQueue(1)
for j in range(50):
time.sleep(0.001)
resp = QS.get_hermeslite_response()
##print("RESP:",j,resp[0],resp[1],resp[2],resp[3],resp[4])
if resp[0] != 0: break
if resp[0] == 0:
if DEBUG: print("EEPROM read did not return a value")
return -1
else:
## MCP4662 does not autoincrement when reading 8 bytes
## MCP4662 stores 9 bit values, msb came first and is in lower order byte
v0 = (resp[4] << 8) | resp[3]
v1 = (resp[2] << 8) | resp[1]
if (resp[0] >> 1) != 0x7d:
## Response mismatch
if DEBUG: print("EEPROM read response mismatch",resp[0] >> 1)
return -1
elif v0 != v1:
if DEBUG: print("EEPROM read values do not agree",v0,v1)
return -1
else:
if DEBUG: print("EEPROM read {0:#x} from address {1:#x}".format(v0,addr))
return v0
def ProgramGateware(self, event): # Program the Gateware (FPGA firmware) over Ethernet
title = "Program the Gateware"
main_frame = self.application.main_frame
dlg = wx.FileDialog(main_frame, message='Choose an RBF file for programming the Gateware',
style=wx.FD_OPEN, wildcard="RBF files (*.rbf)|*.rbf")
if dlg.ShowModal() == wx.ID_OK:
path = dlg.GetPath()
dlg.Destroy()
else:
dlg.Destroy()
return
timeout = 0.2 # socket timeout in seconds
erase_time = 50 # in units of timeout
hermes_ip = self.hermes_ip
hermes_mac = self.hermes_mac
if not hermes_ip:
msg = wx.MessageDialog(main_frame, "No Hermes hardware was found.", title, wx.OK|wx.ICON_ERROR)
msg.ShowModal()
msg.Destroy()
return
try:
fp = open(path, "rb")
size = os.stat(path).st_size
except:
msg = wx.MessageDialog(main_frame, "Can not read the RBF file specified.", title, wx.OK|wx.ICON_ERROR)
msg.ShowModal()
msg.Destroy()
return
for i in range(10):
state = QS.set_params(hermes_pause=1)
#print ("state", state)
if state == 23:
break
else:
time.sleep(0.05)
else:
msg = wx.MessageDialog(main_frame, "Failure to find a running Hermes and stop the samples.", title, wx.OK|wx.ICON_ERROR)
msg.ShowModal()
msg.Destroy()
fp.close()
return
blocks = (size + 255) // 256
dlg = wx.ProgressDialog(title, "Erase old program...", blocks + 1, main_frame, wx.PD_APP_MODAL)
program_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
program_socket.settimeout(timeout)
port = self.conf.rx_udp_port
program_socket.connect((hermes_ip, port))
cmd = bytearray(64) # Erase command
cmd[0] = 0xEF
cmd[1] = 0xFE
cmd[2] = 0x03
cmd[3] = 0x02
program_socket.send(cmd)
success = False
for i in range(erase_time):
dlg.Update(i * blocks // erase_time)
try:
reply = program_socket.recv(1500)
except socket.timeout:
pass
else:
reply = bytearray(reply)
if reply[0:3] == bytearray(b"\xEF\xFE\03") and reply[3:9] == hermes_mac:
success = True
break
if not success:
dlg.Destroy()
self.application.Yield()
fp.close()
msg = wx.MessageDialog(main_frame, "Failure to erase the old program. Please push the Program button again.", title, wx.OK|wx.ICON_ERROR)
msg.ShowModal()
msg.Destroy()
program_socket.close()
return
dlg.Update(0, "Programming...")
cmd = bytearray(8)
cmd[0] = 0xEF
cmd[1] = 0xFE
cmd[2] = 0x03
cmd[3] = 0x01
cmd[4] = (blocks >> 24) & 0xFF
cmd[5] = (blocks >> 16) & 0xFF
cmd[6] = (blocks >> 8) & 0xFF
cmd[7] = (blocks ) & 0xFF
for block in range(blocks):
dlg.Update(block)
prog = fp.read(256)
if block == blocks - 1: # last block may have an odd number of bytes
prog = prog + bytearray(b"\xFF" * (256 - len(prog)))
if len(prog) != 256:
print ("read wrong number of bytes for block", block)
success = False
break
try:
program_socket.send(cmd + prog)
reply = program_socket.recv(1500)
except socket.timeout:
print ("Socket timeout while programming block", block)
success = False
break
else:
reply = bytearray(reply)
if reply[0:3] != bytearray(b"\xEF\xFE\04") or reply[3:9] != hermes_mac:
print ("Program failed at block", block)
success = False
break
fp.close()
for i in range(10): # throw away extra packets
try:
program_socket.recv(1500)
except socket.timeout:
break
if success:
dlg.Update(0, "Waiting for Hermes to start...")
wait_secs = 15 # number of seconds to wait for restart
cmd = bytearray(63) # Discover
cmd[0] = 0xEF
cmd[1] = 0xFE
cmd[2] = 0x02
program_socket.settimeout(1.0)
for i in range(wait_secs):
dlg.Update(i * blocks // wait_secs)
if i < 5:
time.sleep(1.0)
continue
program_socket.send(cmd)
try:
reply = program_socket.recv(1500)
except socket.timeout:
pass
else:
reply = bytearray(reply)
#print ("0x%X 0x%X %d 0x%X 0x%X 0x%X 0x%X 0x%X 0x%X %d %d" % tuple(reply[0:11]))
if reply[0] == 0xEF and reply[1] == 0xFE and reply[10] == 6:
self.hermes_mac = reply[3:9]
self.hermes_code_version = reply[9]
st = 'Capture from Hermes device: Mac %2x:%2x:%2x:%2x:%2x:%2x, Code version %d, ID %d' % tuple(reply[3:11])
st += ', IP %s' % self.hermes_ip
self.config_text = st
#print (st)
self.application.config_text = st
self.application.main_frame.SetConfigText(st)
QS.set_params(hermes_pause=0)
break
dlg.Destroy()
self.application.Yield()
else:
dlg.Destroy()
self.application.Yield()
msg = wx.MessageDialog(main_frame, "Programming failed. Please push the Program button again.", title, wx.OK|wx.ICON_ERROR)
msg.ShowModal()
msg.Destroy()
program_socket.close()