from __future__ import print_function from __future__ import absolute_import from __future__ import division import sys, struct, socket, traceback from quisk_hardware_model import Hardware as BaseHardware import _quisk as QS DEBUG = 0 class Adf4351: # class to hold adf4351 attributes def __init__(self, receiver, clock, r_counter): self.receiver = receiver self.clock = clock self.r_counter = r_counter self.int_mode = 1 # integer one, fractional zero self.band_sel_clock_div = 40 self.aux_rf_out = 0b000 # enable 1/0, power 00 to 11 self.frac_value = 0 self.modulus = 23 self.changed = 0 class Hardware(BaseHardware): def __init__(self, app, conf): BaseHardware.__init__(self, app, conf) self.use_sidetone = 0 self.vfo_frequency = 52000000 self.vfo_sample_rate = conf.sample_rate self.vfo_test = 0 # JIM self.tx_frequency = 0 self.CorrectTxDc = { '23cm':(1270.0, 0.167081, 0.150557), '2':(146.0, 0.018772, 0.038658), '33cm':(915.0, 0.140150, 0.051967), '6':(52.0, 0.020590, 0.024557), '70cm':(435.0, 0.004495, 0.096879), '1.25':(223.5, 0.042958, 0.055212), } self.rx_clock38 = 38880000 - 30 # master clock frequency, 38880 kHz nominal #rx_udp_clock = rx_clock38 * 32 // 2 // 9 # ADC sample rate in Hertz self.rx_udp_clock_nominal = 69120000 # rate to display self.tx_clock80 = 80000000 + 14 self.firmware_version = None # firmware version is initially unknown self.rx_udp_socket = None self.tx_udp_socket = None self.got_rx_udp_status = '' self.got_tx_udp_status = '' self.band = '' self.rx_phase0 = self.rx_phase1 = 0 self.tx_phase = 0 self.button_PTT = 0 self.mode_is_cw = 0 self.scan_enable = 0 self.scan_blocks = 0 self.scan_samples = 1 self.scan_phase = 0 self.fft_scan_valid = 0.84 self.Rx4351 = Adf4351(True, self.rx_clock38, 8) self.Tx4351 = Adf4351(False, 10700000, 2) self.Tx4351.aux_rf_out = 0b000 # enable aux RF out 0b111 or turn off 0b000 self.decim3 = 10 self.SetDecim(192000) self.var_rates = ['31X', '19X', '9X', '5X', '3X', '2X', '1728', '1152', '768', '384', '192', '96', '48'] # supported sample rates as strings self.index = 0 self.repeater_freq = None self.DcI, self.DcQ = (0.0, 0.0) self.NewAdf4351(self.Rx4351, 146E6) self.NewAdf4351(self.Tx4351, 146E6) self.NewAd9951(52e6) self.NewUdpStatus() def ChangeFrequency(self, tx_freq, vfo_freq, source='', band='', event=None): self.tx_frequency = tx_freq if not self.Rx4351.frequency - 3E6 < vfo_freq < self.Rx4351.frequency + 3E6: self.NewAdf4351(self.Rx4351, vfo_freq) self.vfo_frequency = -1 self.NewAd9951(tx_freq) if abs(self.ad9951_freq - 10.7e6) > 15000: self.NewAdf4351(self.Tx4351, tx_freq) self.NewAd9951(tx_freq) self.NewAd9951(tx_freq) if self.vfo_frequency != vfo_freq: self.vfo_frequency = vfo_freq self.scan_deltaf = int(1152E3 * self.fft_scan_valid + 0.5) self.scan_phase = int(1152.E3 * self.fft_scan_valid / self.conf.rx_udp_clock * 2.0**32 + 0.5) self.scan_vfo0 = vfo_freq rx_phase1 = int((vfo_freq - self.Rx4351.frequency) / self.conf.rx_udp_clock * 2.0**32 + 0.5) if self.scan_enable: self.scan_vfo0 = self.scan_vfo0 - self.scan_deltaf * (self.scan_blocks - 1) // 2 rx_phase1 = rx_phase1 - int(self.scan_phase * (self.scan_blocks - 1) / 2.0 + 0.5) self.rx_phase1 = rx_phase1 & 0xFFFFFFFF rx_tune_freq = float(rx_phase1) * self.conf.rx_udp_clock / 2.0**32 QS.change_rates(96000, tx_freq, self.vfo_sample_rate, vfo_freq) QS.change_scan(self.scan_blocks, 1152000, self.fft_scan_valid, self.scan_vfo0, self.scan_deltaf) if DEBUG: #print( "vfo", vfo_freq, "adf4351", self.Rx4351.frequency, "phase", rx_phase1, "rx_tune", self.Rx4351.frequency - vfo_freq, rx_tune_freq) #print ("VFO", self.Rx4351.frequency + rx_tune_freq) print ("Change to Tx %d Vfo %d; VFO %.0f = adf4351_freq %.0f + rx_tune_freq %.0f" % (tx_freq, vfo_freq, self.Rx4351.frequency + rx_tune_freq, self.Rx4351.frequency, rx_tune_freq)) #print ("scan_enable %d, scan_blocks %d, scan_vfo0 %d, scan_deltaf %d" % (self.scan_enable, self.scan_blocks, self.scan_vfo0, self.scan_deltaf)) else: QS.change_rates(96000, tx_freq, self.vfo_sample_rate, self.vfo_frequency) rx_phase0 = int((tx_freq - self.Rx4351.frequency) / self.conf.rx_udp_clock * 2.0**32 + 0.5) self.rx_phase0 = rx_phase0 & 0xFFFFFFFF self.NewUdpStatus() if self.application.bottom_widgets: Rx1 = self.Rx4351.frequency * 1e-6 Rx2 = (self.ReturnVfoFloat() - self.Rx4351.frequency) * 1e-6 t = "Rx Div %d; ADF4351 %.6f + rx_tune %.6f = %.6f Tx Adf4351 %.6f AD9951 %.6f" % ( 2**self.Rx4351.rf_divider, Rx1, Rx2, Rx1 + Rx2, self.Tx4351.frequency * 1e-6, self.ad9951_freq * 1e-6) self.application.bottom_widgets.UpdateText(t) return tx_freq, vfo_freq def RepeaterOffset(self, offset=None): # Change frequency for repeater offset during Tx if offset is None: # Return True if frequency change is complete self.HeartBeat() return self.want_rx_udp_status[16:] == self.got_tx_udp_status[16:] if offset == 0: # Change back to the original frequency if self.repeater_freq is None: # Frequency was already reset return self.want_rx_udp_status[16:] == self.got_tx_udp_status[16:] self.ChangeFrequency(self.repeater_freq, self.vfo_frequency) self.repeater_freq = None else: # Shift to repeater input frequency self.repeater_freq = self.tx_frequency offset = int(offset * 1000) # Convert kHz to Hz self.ChangeFrequency(self.tx_frequency + offset, self.vfo_frequency) return False def ReturnVfoFloat(self): # Return the accurate VFO as a float rx_phase1 = int((self.vfo_frequency - self.Rx4351.frequency) / self.conf.rx_udp_clock * 2.0**32 + 0.5) rx_tune_freq = float(rx_phase1) * self.conf.rx_udp_clock / 2.0**32 return self.Rx4351.frequency + rx_tune_freq def open(self): ##self.application.config_screen.config.tx_phase.Enable(1) # Create the proper broadcast address for rx_udp_ip. nm = self.conf.rx_udp_ip_netmask.split('.') ip = self.conf.rx_udp_ip.split('.') nm = list(map(int, nm)) ip = list(map(int, ip)) bc = '' for i in range(4): x = (ip[i] | ~ nm[i]) & 0xFF bc = bc + str(x) + '.' self.broadcast_addr = bc[:-1] # This socket is used for the Simple Network Discovery Protocol by AE4JY self.socket_sndp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.socket_sndp.setblocking(0) self.socket_sndp.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.sndp_request = chr(56) + chr(0) + chr(0x5A) + chr(0xA5) + chr(0) * 52 self.sndp_rx_active = True # conf.rx_udp_port is used for returning ADC samples # conf.rx_udp_port + 1 is used for control self.rx_udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.rx_udp_socket.setblocking(0) self.rx_udp_socket.connect((self.conf.rx_udp_ip, self.conf.rx_udp_port + 1)) # conf.tx_audio_port + 1 is used for control if self.conf.tx_ip: self.sndp_tx_active = True self.tx_udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.tx_udp_socket.setblocking(0) self.tx_udp_socket.connect((self.conf.tx_ip, self.conf.tx_audio_port + 1)) else: self.sndp_tx_active = False QS.change_rates(96000, 0, 96000, 0) self.application.test1Button.Enable(0) return QS.open_rx_udp(self.conf.rx_udp_ip, self.conf.rx_udp_port) def close(self): if self.rx_udp_socket: self.rx_udp_socket.close() self.rx_udp_socket = None if self.tx_udp_socket: self.tx_udp_socket.close() self.tx_udp_socket = None def PrintStatus(self, msg, string): print (msg, ' ', end=' ') print (string[0:2], end=' ') for c in string[2:]: print ("%2X" % ord(c), end=' ') print () def GetFirmwareVersion(self): return self.firmware_version def ChangeMode(self, mode): # mode is a string: "USB", "AM", etc. if mode in ("CWL", "CWU"): self.mode_is_cw = 1 else: self.mode_is_cw = 0 self.NewUdpStatus() def ChangeBand(self, band): # band is a string: "60", "40", "WWV", etc. self.band = band try: freq, DcI, DcQ = self.CorrectTxDc[band] except KeyError: DcI, DcQ = (0.0, 0.0) self.NewUdpCorrect(DcI, DcQ) def NewUdpCorrect(self, DcI, DcQ): self.DcI = DcI self.DcQ = DcQ QS.set_udp_tx_correct(DcI, DcQ, 0.828) self.NewUdpStatus() def PrintUdpCorrect(self): for band in self.CorrectTxDc: freq, DcI, DcQ = self.CorrectTxDc[band] print ("'%s':(%.1f, %.6f, %.6f)," % (band, freq, DcI, DcQ)) def OnButtonPTT(self, event): btn = event.GetEventObject() if btn.GetValue(): # Turn the software key bit on or off self.button_PTT = 1 else: self.button_PTT = 0 QS.set_key_down(self.button_PTT) self.NewUdpStatus() def OnSpot(self, level): # level is -1 for Spot button Off; else the Spot level 0 to 1000. pass def Sndp(self): # AE4JY Simple Network Discovery Protocol - attempt to set the FPGA IP address try: self.socket_sndp.sendto(self.sndp_request, (self.broadcast_addr, 48321)) except: if DEBUG: traceback.print_exc() return for i in range(5): try: data = self.socket_sndp.recv(1024) except: break if len(data) != 56: continue if data[5:17] == 'QuiskUHFR-v1': ip = self.conf.rx_udp_ip.split('.') ip = list(map(int, ip)) ip = list(map(chr, ip)) if data[37] == ip[3] and data[38] == ip[2] and data[39] == ip[1] and data[40] == ip[0]: self.sndp_rx_active = False if DEBUG: print("SNDP success for Rx") else: t = (data[0:4] + chr(2) + data[5:37] + ip[3] + ip[2] + ip[1] + ip[0] + chr(0) * 12 + chr(self.conf.rx_udp_port & 0xFF) + chr(self.conf.rx_udp_port >> 8) + chr(0)) self.socket_sndp.sendto(t, (self.broadcast_addr, 48321)) elif data[5:17] == 'QuiskUHFT-v1': if self.conf.tx_ip: ip = self.conf.tx_ip.split('.') ip = list(map(int, ip)) ip = list(map(chr, ip)) if data[37] == ip[3] and data[38] == ip[2] and data[39] == ip[1] and data[40] == ip[0]: self.sndp_tx_active = False if DEBUG: print("SNDP success for Tx") else: t = (data[0:4] + chr(2) + data[5:37] + ip[3] + ip[2] + ip[1] + ip[0] + chr(0) * 12 + chr(self.conf.tx_audio_port & 0xFF) + chr(self.conf.tx_audio_port >> 8) + chr(0)) self.socket_sndp.sendto(t, (self.broadcast_addr, 48321)) def HeartBeat(self): if self.sndp_rx_active or self.sndp_tx_active: self.Sndp() return # SNDP is required for i in range(10): try: # receive the Rx status if any data = self.rx_udp_socket.recv(1024) if DEBUG > 1: self.PrintStatus(' gotRx ', data) except: break else: if data[0:2] == 'Sx': self.got_rx_udp_status = data if self.tx_udp_socket: for i in range(10): try: # receive the Tx status if any data = self.tx_udp_socket.recv(1024) if DEBUG > 1: self.PrintStatus(' gotTx ', data) except: break else: if data[0:2] == 'Sx': self.got_tx_udp_status = data if self.want_rx_udp_status[16:] == self.got_rx_udp_status[16:]: # The first part returns information from the hardware self.firmware_version = ord(self.got_rx_udp_status[2]) # Firmware version is returned here self.Rx4351.changed = 0 else: if DEBUG > 1: self.PrintStatus('HaveRx', self.got_rx_udp_status[0:20]) self.PrintStatus('sendRx', self.want_rx_udp_status[0:20]) try: self.rx_udp_socket.send(self.want_rx_udp_status) except: #traceback.print_exc() pass if not self.tx_udp_socket: pass elif self.want_rx_udp_status[16:] == self.got_tx_udp_status[16:]: # The first part returns information from the hardware self.Tx4351.changed = 0 self.Tx9951_changed = 0 else: if DEBUG > 1: self.PrintStatus('HaveTx', self.got_rx_udp_status[0:20]) self.PrintStatus('sendTx', self.want_rx_udp_status[0:20]) try: self.tx_udp_socket.send(self.want_rx_udp_status) except: #traceback.print_exc() pass if 0: self.rx_udp_socket.send('Qs') def VarDecimGetChoices(self): # return text labels for the control return self.var_rates def VarDecimGetLabel(self): # return a text label for the control return "Sample rate ksps" def VarDecimGetIndex(self): # return the current index return self.index def VarDecimRange(self): return (48000, 1152000) def VarDecimSet(self, index=None): # set decimation, return sample rate if index is None: # initial call to set decimation before the call to open() rate = self.application.vardecim_set # May be None or from different hardware try: rate = rate // 1000 if rate > 1152: rate = 1152 index = self.var_rates.index(str(rate)) except: rate = 192 index = self.var_rates.index(str(rate)) self.index = index rate = self.var_rates[index] if rate[-1] == 'X': self.scan_enable = 1 self.scan_blocks = int(rate[0:-1]) self.scan_samples = self.application.fft_size self.decim1 = 2 self.decim2 = 3 rate = 1152000 * self.scan_blocks else: self.scan_enable = 0 self.scan_blocks = 0 rate = int(rate) rate = rate * 1000 self.SetDecim(rate) vfo = self.vfo_frequency self.vfo_frequency = -1 self.vfo_sample_rate = rate self.ChangeFrequency(self.tx_frequency, vfo) self.NewUdpStatus() return rate def SetDecim(self, rate): # self.decim1, decim2, decim3 are the first, second, third decimations in the hardware if rate >= 1152000: self.decim1 = 2 elif rate >= 192000: self.decim1 = 3 elif rate == 96000: self.decim1 = 6 else: self.decim1 = 12 self.decim2 = self.rx_udp_clock_nominal // rate // self.decim1 // self.decim3 def NewUdpStatus(self): # Start of 16 bytes sent to the hardware: s = "Sx" # 0:2 Fixed string s += chr(0) # 2 Version number is returned here s += chr(0) # 3 s += chr(0) * 12 # 4:16 # Start of 80 bytes of data sent to the hardware: s += chr( 6 - 1) # 0 Variable decimation less one channel 0 first s += chr(12 - 1) # 1 Variable decimation less one channel 0 second s += struct.pack("