robottas/robottas.py

839 lines
30 KiB
Python
Raw Normal View History

2023-07-15 10:46:39 -04:00
#!/usr/bin/python3
import asyncio
from asyncio.subprocess import PIPE, STDOUT
2023-07-15 10:46:39 -04:00
import collections.abc
import json
import os
2023-07-15 10:46:39 -04:00
import sqlite3
from subprocess import Popen
2023-07-15 10:46:39 -04:00
import time
import discord
from discord.ext import commands
class Robottas(commands.Bot):
# The following section is adapted from code by theOehrly on GitHub FastF1 project
# and is subject to the MIT License under which it was released. Specifically
# the Livetiming client.
def convert_message(self, raw):
data = raw.replace("'", '"') \
.replace('True', 'true') \
.replace('False', 'false')
try:
data = json.loads(data)
return data
except json.JSONDecodeError:
return ""
# End section adapted from FastF1
async def on_ready(self):
print('Logged in as: {}'.format(self.user))
def run_robottas(self):
self.run(self.token)
async def send_message(self, message):
print(f"in send_message {message} {self.channel}")
if self.channel is None:
return
await self.channel.send(message)
def send_delay_message(self, message):
print("in send_delay_message")
self.message_queue.append((time.time(), message))
print(f"adding message: {message} at {time.time()}")
async def process_delay_messages(self):
print("in process_delay_queue")
while len(self.message_queue) > 0 and \
self.message_queue[0][0] < time.time() - self.delay:
print(f"removing at {time.time()}")
message = self.message_queue.pop(0)[1]
await self.send_message(message)
await asyncio.sleep(1)
async def send_status_report(self, report):
self.send_delay_message(report)
async def send_lap_report(self, driver):
self.send_delay_message(driver + " laps remaining")
async def load_flag_message(self, message):
flag = message['Flag']
message_text = message['Message']
report = None
if flag == 'GREEN':
report = f"{self.flag_dict['GREEN_FLAG']}" + \
f"{message_text}{self.flag_dict['GREEN_FLAG']}"
elif flag == 'RED':
report = f"{self.flag_dict['RED_FLAG']}{message_text}" + \
f"{self.flag_dict['RED_FLAG']}"
elif flag == 'YELLOW':
report = f"{self.flag_dict['YELLOW_FLAG']}{message_text}" + \
f"{self.flag_dict['YELLOW_FLAG']}"
elif flag == 'DOUBLE YELLOW':
report = f"{self.flag_dict['YELLOW_FLAG']}" + \
f"{self.flag_dict['YELLOW_FLAG']}" + \
f"{message_text}" + \
f"{self.flag_dict['YELLOW_FLAG']}" + \
f"{self.flag_dict['YELLOW_FLAG']}"
elif flag == 'BLACK AND WHITE' and self.report_deleted_lap:
report = f"{message['Message']}"
elif flag == 'CHEQUERED':
report = f"{self.flag_dict['CHECKERED']}" + \
f"{self.flag_dict['CHECKERED']}" + \
f"{self.flag_dict['CHECKERED']}"
# Return None or flag message
return report
async def load_rcm_messages(self, data):
if "Messages" in data.keys():
report = None
for key in data['Messages'].keys():
message = data['Messages'][key]
if 'Category' in message.keys():
category = message["Category"]
category = category.upper()
if category == "FLAG":
report = await self.load_flag_message( message )
elif category == "OTHER":
if self.session_type == "RACE" and "DELETED" in message['Message']:
pass
else:
report = message['Message']
elif category == "DRS":
report = message['Message']
elif category == "CAREVENT" and \
(self.session_type == "PRACTICE" or \
self.session_type == "QUALI"):
report = message['Message']
elif category == "SAFETYCAR":
if message["Mode"] == 'VIRTUAL SAFETY CAR':
report = f"{self.flag_dict['VSC']}" + \
f"{message['Message']}" + \
f"{self.flag_dict['VSC']}"
elif message["Mode"] == 'SAFETY CAR':
report = f"{self.flag_dict['SC']}" + \
f"{message['Message']}" + \
f"{self.flag_dict['SC']}"
if report is not None:
await self.send_status_report(report)
async def load_lap_data(self, data):
print(f"load_lap_data: {str(data)}")
if "CurrentLap" in data.keys():
current_lap = data["CurrentLap"]
if self.current_lap != current_lap:
self.current_lap = current_lap
#Re-evaluate total laps in case it has changed
#i.e. shortened due to rain.
if "TotalLaps" in data.keys():
self.total_laps = int(data["TotalLaps"])
2023-07-15 10:46:39 -04:00
#Notify on lap change if matches a driver
key = str(self.total_laps - int(current_lap))
if key in self.driver_dict.keys():
await self.send_lap_report(self.driver_dict[key])
def load_podium_data(self, data):
print("in load_podium_data")
if "Lines" in data.keys():
for position in data["Lines"].keys():
if 'RacingNumber' in data["Lines"][position].keys():
self.podium[int(position)] = data["Lines"][position]['RacingNumber']
def get_podium(self):
if len(self.podium) == 3:
try:
if "?" in self.podium:
return
message = ""
for i in range(3):
pos = 'P' + str(i + 1)
driver = self.driver_dict[self.podium[i]]
message += f":champagne:{driver} " + \
f"{self.flag_dict[pos]}"
if driver == self.fastest_lap:
if driver == self.name_dict['ALO']:
message += " EL"
message += ' ' + self.flag_dict['FLAP']
# Put a new line at the end of each row
message += "\n"
return message
except:
print("Error in sending podium message.")
return "I don't know the podium yet :("
def get_q1_cut(self):
message = ""
for i in range(15,20):
try:
message += self.driver_list[i] + " "
except:
message += "? "
message = message.strip()
return message
def get_q2_cut(self):
message = ""
for i in range(10,15):
try:
message += self.driver_list[i] + " "
except:
message += "? "
message = message.strip()
return message
def get_weather(self):
if self.weather == "":
return "No weather info yet..."
else:
return self.weather
async def process_race_events(self, session_data, status_data):
# Hold the next event to report, and the type
event = None
event_type = None
# Hold report
report = None
while (len(session_data) > 0 or len(status_data) > 0) and self.is_reporting:
# If only one of them has data, use that one
print("starting loop")
if len(session_data) == 0:
event = status_data.pop(0)
event_type = "STATUS"
elif len(status_data) == 0:
event = session_data.pop(0)
event_type = "SESSION"
# If they both have data, use the one with the oldest timestamp
else:
session_time = session_data[0]["Utc"]
status_time = status_data[0]["Utc"]
if session_time < status_time:
event = session_data.pop(0)
event_type = "SESSION"
else:
event = status_data.pop(0)
event_type = "STATUS"
# At this point we have the event. Generate the report based on the type.
report = None
if event_type == "SESSION":
print("Session event")
# Get the lap from the event
cur_lap = event["Lap"]
# Make sure we haven't already reported on this lap
if self.current_lap < cur_lap:
self.current_lap = cur_lap
# Get the laps remaining key to see if there's a driver that matches
laps_key = str( self.total_laps - cur_lap )
# If there's a matching driver, then send a message
if laps_key in self.driver_dict.keys():
await self.send_lap_report(self.driver_dict[laps_key])
# Must be a status event
else:
print("Status event")
key = None
if "TrackStatus" in event.keys():
key = "TrackStatus"
elif "SessionStatus" in event.keys():
key = "SessionStatus"
if key is not None:
track_status = event[key].upper()
# Check the track status to determine what report to send
if track_status == "ALLCLEAR":
report = f"{self.flag_dict['GREEN_FLAG']}" + \
f"{self.flag_dict['GREEN_FLAG']}{self.flag_dict['GREEN_FLAG']}"
elif track_status == "YELLOW":
report = f"{self.flag_dict['YELLOW_FLAG']}" + \
f"{self.flag_dict['YELLOW_FLAG']}{self.flag_dict['YELLOW_FLAG']}"
elif track_status == "RED":
report = f"{self.flag_dict['RED_FLAG']}" + \
f"{self.flag_dict['RED_FLAG']}{self.flag_dict['RED_FLAG']}"
elif track_status == "FINISHED":
report = f"{self.flag_dict['RACE_FINISHED']}" + \
f"{self.flag_dict['RACE_FINISHED']}{self.flag_dict['RACE_FINISHED']}"
if report is not None:
await self.send_status_report(report)
def load_weather_data(self, data):
weather_txt = "Track Weather Report\n"
for k in data.keys():
if k != "_kf":
weather_txt += f"{k}: {data[k]}\n"
self.weather = weather_txt
def load_timing_stats_data(self, data):
print("in timing stats")
if "Lines" in data.keys():
lines = data["Lines"]
for driver_num in lines.keys():
line = lines[driver_num]
print(f"driver_num {driver_num}")
if "PersonalBestLapTime" in line.keys():
position = -1
try:
position = line["PersonalBestLapTime"]["Position"]
print(f"got position {position}")
except:
pass
if position == 1:
print(f"setting fastest_lap {driver_num}")
self.fastest_lap = self.driver_dict[driver_num]
print(f"flap {driver_num} {self.fastest_lap}")
return
def load_driver_data(self, data):
for driver in data.keys():
position = data[driver]["Line"]
self.driver_list[position - 1] = self.driver_dict[driver]
async def print_driver_range(self, ctx, start, stop):
message = ""
for i in range(start, stop):
driver = self.driver_list[i]
message += f"P{i + 1}: {driver}"
if driver == self.fastest_lap:
if driver == self.name_dict['ALO']:
message += " EL"
message += ' ' + self.flag_dict['FLAP']
message += "\n"
await ctx.send(message)
async def print_driver_list(self, ctx):
#Send 1-10, then 11-20
await self.print_driver_range(ctx, 0, 10)
await self.print_driver_range(ctx, 10, 20)
def load_initial(self, message):
print( f"in load_initial: {message['R'].keys()}" )
# Load podium data
if 'R' in message.keys():
if 'TopThree' in message['R'].keys():
top_three = message['R']['TopThree']
if 'Lines' in top_three.keys() and \
len(top_three['Lines']) == 3:
for i in range(3):
self.podium[i] = top_three['Lines'][i]['RacingNumber']
# Load driver list
if 'DriverList' in message['R'].keys():
for driver_num in message['R']['DriverList'].keys():
if driver_num == '_kf':
continue
position = message['R']['DriverList'][driver_num]['Line']
self.driver_list[int(position) - 1] = self.driver_dict[driver_num]
# Load lap data
if 'LapCount' in message['R'].keys():
if 'TotalLaps' in message['R']['LapCount'].keys():
self.total_laps = int(message['R']['LapCount']['TotalLaps'])
print(f"self.total_laps: {self.total_laps}")
# Load weather data
if 'WeatherData' in message['R'].keys():
print( 'WeatherData in keys' )
weather_obj = message['R']['WeatherData']
weather_text = "Track Weather Report\n"
for k in weather_obj.keys():
if k != "_kf":
weather_text += f"{k}: {weather_obj[k]}\n"
self.weather = weather_text
# Load fastest lap data
if 'TimingStats' in message['R'].keys():
print( 'Flap in keys' )
flap_obj = message['R']['TimingStats']
self.load_timing_stats_data(flap_obj)
async def process_message(self, message):
try:
if isinstance(message, collections.abc.Sequence):
print("process_message - in isinstance")
if message[0] == 'Heartbeat':
return
elif message[0] == 'DriverList':
self.load_driver_data(message[1])
elif message[0] == 'TopThree':
self.load_podium_data(message[1])
elif message[0] == 'RaceControlMessages':
await self.load_rcm_messages(message[1])
elif message[0] == 'LapCount':
await self.load_lap_data(message[1])
elif message[0] == 'WeatherData':
self.load_weather_data(message[1])
elif message[0] == 'TimingStats':
self.load_timing_stats_data(message[1])
else:
print(f"Not sure how to handle message:{message[0]}")
# Check to see if this is the initial "R" record from the response
elif "R" in message.keys():
if "R" in message:
self.load_initial(message)
except Exception as e:
print(f"process_message error {e}\n\n")
def get_messages_from_db(self):
try:
messages = []
con = sqlite3.connect(self.dbfile)
cur = con.cursor()
cur2 = con.cursor()
for row in cur.execute('select id, message from messages order by id asc'):
messages.append(self.convert_message(row[1]))
# Now that we have the message, delete this row from the dbfile
cur2.execute(f"delete from messages where id = {row[0]}")
con.commit()
cur.close()
cur2.close()
con.close()
return messages
except:
print("db error... continuing")
return []
async def _race_report(self,ctx):
self.report_deleted_lap = False
self.session_type = 'RACE'
await self._report(ctx)
async def _quali_report(self, ctx):
self.report_deleted_lap = True
self.session_type = 'QUALI'
await self._report(ctx)
async def _practice_report(self,ctx):
self.report_deleted_lap = True
self.session_type = 'PRACTICE'
await self._report(ctx)
async def _report(self, ctx):
self.is_reporting = True
self.channel = ctx.channel
await self.start_collect()
2023-07-15 10:46:39 -04:00
while self.is_reporting:
# Do processing
print("reporting loop")
# process any new messages in the db
messages = self.get_messages_from_db()
try:
for message in messages:
await self.process_message(message)
await asyncio.sleep(3)
except:
print(f"problem with messages")
# process any messages in the delay queue
await self.process_delay_messages()
#If collecting, make sure the collection process is running
if self.is_collecting:
if self.collector_proc == None or \
self.collector_proc.poll() != None:
await self.start_collect()
2023-07-15 10:46:39 -04:00
def get_token(self, token_file):
with open(token_file) as tok:
return tok.readline().strip()
async def start_collect(self):
self.is_collecting = True
dir_path = os.path.dirname(os.path.realpath(__file__))
command_txt = os.path.join(dir_path, self.collector_command)
command_txt += self.collector_params
print(f"command_txt: {command_txt}")
self.collector_proc = Popen(command_txt.split())
async def stop_collect(self):
self.is_collecting = False
try:
if self.collector_proc != None:
self.collector_proc.kill()
except:
print("Tried to kill collection process")
2023-07-15 10:46:39 -04:00
def __init__(self):
# Set debug or not
self.debug = True
# Discord authentication token
self.token = self.get_token("token.txt")
self.collector_command = "robottas_collector.py"
self.collector_params = " save dummy.txt"
self.collector_proc = None
self.is_collecting = False
2023-07-15 10:46:39 -04:00
# Preface messages with the following
self.report_preamble = ':robot::peach: Alert!'
# Holds processing thread
self.bg_task = None
# Hold db file
self.dbfile = "messages.db"
# Holds current lap
self.current_lap = 0
# Hold podium places
self.podium = ['?', '?', '?']
# Hold driver list
self.driver_list = ['?','?','?','?','?','?','?','?','?','?', \
'?','?','?','?','?','?','?','?','?','?']
# Hold weather info
self.weather = ""
# Hold lap info
self.current_lap = -1000
self.total_laps = -1000
self.lap_reported = 0
# Hold the driver with the fastest lap
self.fastest_lap = ''
# Holds dictionary for number to icon
self.driver_dict = {
'1': '<:VER:1067541523748630570>',
'3': '<:RIC:1067870312949108887>',
'5': '<:VET:1067964065516884079>',
'11': '<:PER:1067822335123525732>',
'14': '<:ALO:1067876094033793054>',
'44': '<:HAM:1067828533746991165>',
'55': '<:SAI:1067824776502067270>',
'63': '<:RUS:1067831294748274728>',
'16': '<:LEC:1067544797050585198>',
'18': '<:STR:1067871582854336663>',
'4': '<:NOR:1067840487593082941>',
'10': '<:GAS:1067836596495327283>',
'27': '<:HUL:1067880110918742187>',
'31': '<:OCO:1067834157465612398>',
'77': '<:BOT:1067819716527276032>',
'81': '<:PIA:1067844998369914961>',
'24': '<:ZHO:1067865955117568010>',
'22': '<:TSU:1067888851676315660>',
'20': '<:MAG:1067883814992486510>',
'23': '<:ALB:1067874026871074887>',
'2': '<:SAR:1067890949197414410>',
}
# Holds dictionary for driver 3 letter code to icon
self.name_dict = {
'VER': '<:VER:1067541523748630570>',
'RIC': '<:RIC:1067870312949108887>',
'VET': '<:VET:1067964065516884079>',
'PER': '<:PER:1067822335123525732>',
'ALO': '<:ALO:1067876094033793054>',
'HAM': '<:HAM:1067828533746991165>',
'SAI': '<:SAI:1067824776502067270>',
'RUS': '<:RUS:1067831294748274728>',
'LEC': '<:LEC:1067544797050585198>',
'STR': '<:STR:1067871582854336663>',
'NOR': '<:NOR:1067840487593082941>',
'GAS': '<:GAS:1067836596495327283>',
'HUL': '<:HUL:1067880110918742187>',
'OCO': '<:OCO:1067834157465612398>',
'BOT': '<:BOT:1067819716527276032>',
'PIA': '<:PIA:1067844998369914961>',
'ZHO': '<:ZHO:1067865955117568010>',
'TSU': '<:TSU:1067888851676315660>',
'MAG': '<:MAG:1067883814992486510>',
'ALB': '<:ALB:1067874026871074887>',
'SAR': '<:SAR:1067890949197414410>',
}
# Holds dictionary for race states to icons
self.flag_dict = {
'GREEN_FLAG': '<:GREEN:1107401338993782804>',
'YELLOW_FLAG': '<:YELLOW:1091801442714652815>',
'RED_FLAG': '<:RED:1091801383998586912>',
'VSC': '<:VSC:1107401410183704596>',
'VIRTUAL_SAFETY_CAR': '<:VSC:1107401410183704596>',
'SC': '<:SC:1107401472041304104>',
'SAFETY_CAR': '<:SC:1107401472041304104>',
'CHECKERED': '<:CHECKERED:1091801509039194152>',
'RACE_FINISHED': '<:CHECKERED:1091801509039194152>',
'STARTED': '<:CHECKERED:1091801509039194152>',
'P1': ':first_place:',
'P2': ':second_place:',
'P3': ':third_place:',
'FLAP': '<:FLap4:1122601036952129547>'
}
# Bot configuration info
self.bot_intents = discord.Intents.default()
self.bot_intents.message_content = True
self.description = "robotas - f1 bot - !rbhelp for commands"
self.command_prefix = '!'
# Track whether to report, and what channel to report to
self.is_reporting = False
self.report_id = None
self.started = False
# Hold message delay
self.delay = 45
self.message_queue = []
# Hold whether to report deleted lap messages
self.report_deleted_lap = False
self.session_type = ''
# Test file
self.test_file = "test.json"
### END FastF1 adapted section ###
super().__init__(command_prefix=self.command_prefix,
description=self.description,
intents=self.bot_intents)
# Setup commands
@self.command()
async def rbhelp(ctx):
if not self.passed_filter(ctx):
return
await ctx.send("commands: \n" +
"!rbhelp - Print this help message " +
"(but you knew that already)\n" +
"!rbname - I will tell you my name.\n" +
"!rbroot - I will tell you who I root for.\n" +
"!rbreport - I will start race reporting in this channel. " +
"I will try to tell you about current flags, laps left,\n" +
"and safety cars.\n" +
"!rbstop - Stop reporting.\n" +
"!podium - Display podium positions.\n" +
"!q1cut - Show drivers in Q1 cut positions.\n" +
"!q2cut - Show drivers in Q2 cut positions.\n" +
"!rbdelay - Set the race messaging delay in seconds."
)
@self.command()
async def rbroot(ctx):
if not self.passed_filter(ctx):
return
await ctx.send("Rooting for :ferry::peach: of course!\n" +
":BOT::smiling_face_with_3_hearts:")
@self.command()
async def rbname(ctx):
if not self.passed_filter(ctx):
return
await ctx.send("Hello, my name is Robottas, pronounced :robot::peach:")
@self.command()
async def rbdelay(ctx, delay):
try:
secs = int(delay)
if secs < 10 or secs > 300:
await ctx.send("delay must be between 10 and 300")
else:
self.delay = secs
await ctx.send(f"delay set to {secs}")
except:
await ctx.send("invalid delay value")
@self.command()
async def rbstop(ctx):
self.is_reporting = False
self.report_id = None
await self.stop_collect()
2023-07-15 10:46:39 -04:00
await ctx.send(":robot::peach: powering down")
@self.command()
async def podium(ctx):
message = self.get_podium()
await ctx.send(message)
@self.command()
async def driverlist(ctx):
await self.print_driver_list(ctx)
@self.command()
async def q1cut(ctx):
message = self.get_q1_cut()
await ctx.send(message)
@self.command()
async def q2cut(ctx):
message = self.get_q2_cut()
await ctx.send(message)
@self.command()
async def weather(ctx):
message = self.get_weather()
await ctx.send(message)
@self.command()
async def race(ctx):
if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator:
await ctx.send( ":robot::peach: Ready to report for the race!" )
await self._race_report(ctx)
@self.command()
async def quali(ctx):
if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator:
await ctx.send( ":robot::peach: Ready to report on quali!" )
await self._quali_report(ctx)
@self.command()
async def practice(ctx):
if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator:
await ctx.send( ":robot::peach: Ready to report on practice!" )
await self._practice_report(ctx)
@self.command()
async def flap(ctx):
if self.fastest_lap != '':
await ctx.send( self.fastest_lap + self.flag_dict['FLAP'] )
else:
await ctx.send( "No " + self.flag_dict['FLAP'] + " yet." )
@self.command()
async def rbtestfile(ctx):
self.is_reporting = True
if str(ctx.author) == "tamservo#0":
await self._test_file(ctx)
@self.command()
async def start_collect(ctx):
if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator:
# if an authorized user, start the collection script that
# puts records into the database
await self.start_collect()
@self.command()
async def calm(ctx):
file_name = os.path.dirname(os.path.realpath(__file__))
file_name = os.path.join(file_name, "images/calm.png")
with open(file_name, "rb") as handle:
df = discord.File(handle, filename=file_name)
await ctx.send(file = df)
2023-07-15 10:46:39 -04:00
if __name__ == '__main__':
rb = Robottas()
rb.run_robottas()