moved config to ~/.config, added CTY dat data enrichment, added lotw upload date, added lotw confirmation check

This commit is contained in:
Michael Clemens 2022-09-05 17:39:16 +02:00
parent 3910e0fc06
commit b8f977f1e2
1 changed files with 203 additions and 8 deletions

View File

@ -26,8 +26,11 @@ import urllib
import re
import datetime
import os
from os.path import exists
import sys
import csv
import configparser
import zipfile
import signal
import atexit
from datetime import timezone
@ -36,6 +39,7 @@ from requests.structures import CaseInsensitiveDict
from prettytable import PrettyTable
import xmltodict
import requests
from pathlib import Path
class QRZLogger():
@ -44,14 +48,33 @@ class QRZLogger():
def __init__(self):
"""initialize things"""
self.version = "0.8.2"
self.version = "0.9.0"
# Define the configuration object
self.config = configparser.ConfigParser()
self.config_file = os.path.expanduser('~/.qrzlogger.ini')
self.home_dir = str(Path.home())
self.config_dir = self.home_dir + "/.config/qrzlogger/"
# Check if config directory exists and else create it
Path(self.config_dir).mkdir(parents=True, exist_ok=True)
self.config_file = os.path.expanduser(self.config_dir + 'qrzlogger.ini')
self.read_config(self.config, self.config_file)
self.check_files()
if self.config['lotw']['user'] != "N0CALL" and self.check_lotw_confirmed:
self.confirmed_entities = self.get_confirmed_entities()
if self.check_cty:
with open(self.config_dir + self.config['files']['cty'], encoding='us-ascii') as csvfile:
self.cty = list(csv.reader(csvfile, delimiter=','))
if self.check_lotw_activity:
with open(self.config_dir + self.config['files']['lotw_activity'], encoding='us-ascii') as csvfile:
self.lotw_activity = list(csv.reader(csvfile, delimiter=','))
if self.config and self.config['log']['log_file']:
self.log_file = self.config['log']['log_file']
else:
@ -124,6 +147,16 @@ class QRZLogger():
'qrz_user': 'MYCALL',
'qrz_pass': 'my_secret_password',
'xml_fields': '("call", "band", "mode", "qso_date", "time_on", "rst_sent", "rst_rcvd", "comment")'}
config['files'] = {
'cty': 'cty.csv',
'cty_url': 'https://www.country-files.com/bigcty/download/bigcty.zip',
'lotw_confirmed': 'lotw.adi',
'lotw_activity': 'lotw-user-activity.csv',
'lotw_activity_url': 'https://lotw.arrl.org/lotw-user-activity.csv'}
config['lotw'] = {
'user': 'N0CALL',
'password': 'CHANGEME',
'mode': 'ssb'}
config['log'] = {
'log_file': '/tmp/qrzlogger.log'}
config['qso_defaults'] = {
@ -172,8 +205,8 @@ class QRZLogger():
# set the return value to the value of "call"
cleaned_call = call
# check if the callsign has a suffix (.e.g. /p)
if call.endswith(("/P","/M","/QRP")):
cleaned_call = re.sub(r'/\w$', "", call)
if call.endswith(("/P","/MM","/M","/QRP")):
cleaned_call = call.rsplit('/', 1)[0]
# check if the callsign has a prefix (e.g. DL/)
if "/" in cleaned_call:
cleaned_call = re.sub(r'^\w+/', "", cleaned_call)
@ -186,6 +219,112 @@ class QRZLogger():
print(tab)
print(attr('reset'))
@staticmethod
def download_file(url, local_filename):
"""downloads a file via HTTP and saves it to a defined file"""
with requests.get(url, stream=True) as request:
request.raise_for_status()
with open(local_filename, 'wb') as file:
for chunk in request.iter_content(chunk_size=8192):
file.write(chunk)
return local_filename
def check_files(self):
"""Checks if all necessary files are in the file system.
Downloads all files and unzips them (if necessary)"""
# check for lotw qsl information file
if self.config['lotw']['user'] != "N0CALL":
self.check_lotw_confirmed = exists(self.config_dir + self.config['files']['lotw_confirmed'])
if not self.check_lotw_confirmed:
print("The file " + self.config_dir + self.config['files']['lotw_confirmed'] + " is missing.")
user = self.config['lotw']['user']
password = self.config['lotw']['password']
mode = self.config['lotw']['mode']
url = "https://lotw.arrl.org/lotwuser/lotwreport.adi?login={}&password={}"\
"&qso_query=1&qso_qsl=yes&qso_mode={}&qso_qsldetail=yes&"\
"qso_qslsince=1970-01-01".format(user, password, mode)
print("Trying to download " + url)
self.download_file(url, self.config_dir + self.config['files']['lotw_confirmed'])
self.check_lotw_confirmed = exists(self.config_dir + self.config['files']['lotw_confirmed'])
if self.check_lotw_confirmed:
print("File successfully downloaded")
else:
print("something went wrong while downloading " + url)
else:
self.check_lotw_confirmed = False
# check for cty.csv file
self.check_cty = exists(self.config_dir + self.config['files']['cty'])
if not self.check_cty:
url = self.config['files']['cty_url']
print("The file " + self.config_dir + self.config['files']['cty'] + " is missing.")
print("Trying to download " + url)
# TODO: pfad?
zip_name = self.download_file(url, self.config_dir + "bigcty.zip" )
with zipfile.ZipFile(zip_name, 'r') as zip_ref:
zip_ref.extract("cty.csv", path=self.config_dir)
os.remove(zip_name)
self.check_cty = exists(self.config_dir + self.config['files']['cty'])
if self.check_cty:
print("File successfully downloaded and extracted.")
else:
print("something went wrong while downloading " + url)
# check for lotw user activity file
self.check_lotw_activity = exists(self.config_dir + self.config['files']['lotw_activity'])
if not self.check_lotw_activity:
url = self.config['files']['lotw_activity_url']
print("The file " + self.config_dir + self.config['files']['lotw_activity'] + " is missing.")
print("Trying to download " + url)
self.download_file(url, self.config_dir + self.config['files']['lotw_activity'])
self.check_lotw_activity = exists(self.config_dir + self.config['files']['lotw_activity'])
if self.check_lotw_activity:
print("File successfully downloaded")
else:
print("something went wrong while downloading " + url)
def get_confirmed_entities(self):
"""Reads the file downlaoded from LotW with all confirmed QSOs,
extracts all confirmed DXCCs and puts them into a list"""
ret = []
with open(self.config_dir + self.config['files']['lotw_confirmed'], encoding='us-ascii') as file:
for row in file:
if re.search("<DXCC:", row):
dxcc = row.partition(">")[2].lower().rstrip()
if dxcc not in ret:
ret.append(dxcc)
return ret
def check_lotw(self, call):
"""Reads the LotW user activity file and returns the date
of the last upload date if a specific call sign"""
ret = ""
for row in self.lotw_activity:
if call == row[0]:
ret = row[1]
return ret
return ret
def get_cty_row(self, call):
"""Parses all CTY records, tries to find the DXCC entity of a
specific call sign and returns the line as a list of strings"""
done = False
while not done:
for row in self.cty:
entities = row[9].replace(";", "").replace("=", "").split(" ")
# TODO: Check if it is a speciall call (=) and mark it in the list
for prefix in entities:
if call == prefix:
return row
call = call[:-1]
if call == "":
return ["-", "-", "-", "-", "-", "-", "-"]
return None
#####################################################
@ -420,6 +559,25 @@ class QRZLogger():
return table
@staticmethod
def get_extra_info_table(extra_info):
"""Print a pretty ascii table containing some
extra info"""
table = PrettyTable(['key', 'value'])
if "cty_country" in extra_info:
table.add_row(["Country:", extra_info["cty_country"]])
if "cty_continent" in extra_info:
table.add_row(["Continent:", extra_info["cty_continent"]])
if "lotw_call_date" in extra_info and extra_info["lotw_call_date"] != "":
table.add_row(["LotW uploaded ({}):".format(extra_info["lotw_call"]), extra_info["lotw_call_date"]])
if "lotw_cleaned_call_date" in extra_info and extra_info["lotw_cleaned_call_date"] != "":
table.add_row(["LotW uploaded ({}):".format(extra_info["lotw_cleaned_call"]), extra_info["lotw_cleaned_call_date"]])
table.align = "l"
table.header = False
return table
@staticmethod
def get_qso_detail_table(qso):
"""Print a pretty ascii table containing all
@ -627,6 +785,7 @@ def main():
qrz.print_table(qrz.get_recent_qso_table(qrz.recent_qsos))
# query a call sign from the user
call = qrz.get_input_callsign()
cleaned_call = qrz.remove_indicators(call)
# query call sign data from QRZ
result = qrz.get_call_data(call, session_key)
# the query was successful
@ -639,7 +798,6 @@ def main():
else:
print ('\n%s%s has no record on QRZ.com ¯\_(ツ)_/¯%s' \
% (qrz.errorcol, call, attr('reset')))
cleaned_call = qrz.remove_indicators(call)
if call != cleaned_call:
# query call sign data from QRZ
result = qrz.get_call_data(cleaned_call, session_key)
@ -648,8 +806,45 @@ def main():
print ('\n%s%sShowing results for %s instead%s' \
% (attr('underlined'), qrz.hlcol, cleaned_call, attr('reset')))
# generate a nice ascii table with the result
qrz.print_table(qrz.get_xml_query_table(result))
qrz.print_table(qrz.get_xml_query_table(result))
print("")
extra_info = {}
# If the CTY file is available, further information will be
# gathered from it, e.g. continent, country
if qrz.check_cty:
cty_details = qrz.get_cty_row(call)
else:
cty_details = ["-","-","-","-","-","-","-","-","-","-"]
extra_info["cty_country"] = cty_details[1]
extra_info["cty_continent"] = cty_details[3]
# If the LotW user activity file is available and the call
# sign in question is actually a LotW user, the lsat upload
# date will be displayed
if qrz.check_lotw_activity:
lotw = qrz.check_lotw(call)
extra_info["lotw_call"] = call
extra_info["lotw_call_date"] = lotw
if call != cleaned_call:
lotw = qrz.check_lotw(cleaned_call)
extra_info["lotw_cleaned_call"] = cleaned_call
extra_info["lotw_cleaned_call_date"] = lotw
# Print the table with additional infos on the call
print ('%s%sExtra (non-QRZ.com) info for %s%s' \
% (attr('underlined'), qrz.hlcol, call, attr('reset')))
qrz.print_table(qrz.get_extra_info_table(extra_info))
if cty_details[2] and cty_details[2] not in qrz.confirmed_entities:
print ('\n%s%s>>> New One! (not confirmed via Lotw) <<<%s\n\n' \
% (attr('bold'), qrz.hlcol, attr('reset')))
# pull all previous QSOs from tzhe QRZ logbook
result = qrz.get_qsos("CALL:"+ call)
# ignore this part if there were no previous QSOs