#!/usr/bin/python3 import asyncio import collections.abc import datetime import json import os import random import re import sqlite3 from subprocess import Popen import time import discord from discord.ext import commands class Robottas(commands.Bot): # The following section is adapted from code by theOehrly on GitHub FastF1 project # and is subject to the MIT License under which it was released. Specifically # the Livetiming client. @staticmethod def convert_message(raw): data = raw.replace("'", '"') \ .replace('True', 'true') \ .replace('False', 'false') try: data = json.loads(data) return data except json.JSONDecodeError: return "" # End section adapted from FastF1 async def on_ready(self): print('Logged in as: {}'.format(self.user)) def run_robottas(self): self.run(self.token) async def send_message(self, message): if self.channel is None: return await self.channel.send(message) @staticmethod async def send_image(ctx, image_file): file_name = os.path.dirname(os.path.realpath(__file__)) file_name = os.path.join(file_name, image_file) with open(file_name, "rb") as handle: df = discord.File(handle, filename=file_name) await ctx.send(file=df) def send_delay_message(self, message): self.message_queue.append((time.time(), message)) async def process_delay_messages(self): while len(self.message_queue) > 0 and \ self.message_queue[0][0] < time.time() - self.delay: message = self.message_queue.pop(0)[1] await self.send_message(message) await asyncio.sleep(1) async def send_status_report(self, report): self.send_delay_message(report) async def send_lap_report(self, driver): self.send_delay_message(driver + " laps remaining") async def load_flag_message(self, message): flag = message['Flag'] message_text = message['Message'] report = None if flag == 'GREEN': report = f"{self.flag_dict['GREEN_FLAG']}" + \ f"{message_text}{self.flag_dict['GREEN_FLAG']}" elif flag == 'RED': report = f"{self.flag_dict['RED_FLAG']}{message_text}" + \ f"{self.flag_dict['RED_FLAG']}" elif flag == 'YELLOW': report = f"{self.flag_dict['YELLOW_FLAG']}{message_text}" + \ f"{self.flag_dict['YELLOW_FLAG']}" elif flag == 'DOUBLE YELLOW': report = f"{self.flag_dict['YELLOW_FLAG']}" + \ f"{self.flag_dict['YELLOW_FLAG']}" + \ f"{message_text}" + \ f"{self.flag_dict['YELLOW_FLAG']}" + \ f"{self.flag_dict['YELLOW_FLAG']}" elif flag == 'BLACK AND WHITE' and self.report_deleted_lap: report = f"{message['Message']}" elif flag == 'CHEQUERED': report = f"{self.flag_dict['CHECKERED']}" + \ f"{self.flag_dict['CHECKERED']}" + \ f"{self.flag_dict['CHECKERED']}" # Return None or flag message return report async def load_rcm_messages(self, data): if "Messages" in data.keys(): report = None for key in data['Messages'].keys(): message = data['Messages'][key] if 'Category' in message.keys(): category = message["Category"] category = category.upper() if category == "FLAG": report = await self.load_flag_message(message) elif category == "OTHER": if self.session_type == "RACE" and "DELETED" in message['Message']: pass elif "SLIPPERY" in message['Message'] and \ not self.is_slippery_reported: self.is_slippery_reported = True report = "It's slippery out there!" else: report = message['Message'] elif category == "DRS": report = message['Message'] elif category == "CAREVENT" and \ (self.session_type == "PRACTICE" or self.session_type == "QUALI"): report = message['Message'] elif category == "SAFETYCAR": if message["Mode"] == 'VIRTUAL SAFETY CAR': report = f"{self.flag_dict['VSC']}" + \ f"{message['Message']}" + \ f"{self.flag_dict['VSC']}" elif message["Mode"] == 'SAFETY CAR': report = f"{self.flag_dict['SC']}" + \ f"{message['Message']}" + \ f"{self.flag_dict['SC']}" if report is not None: await self.send_status_report(report) async def load_lap_data(self, data): if "CurrentLap" in data.keys(): current_lap = data["CurrentLap"] if self.current_lap != current_lap: self.current_lap = current_lap # Re-evaluate total laps in case it has changed # i.e. shortened due to rain. if "TotalLaps" in data.keys(): self.total_laps = int(data["TotalLaps"]) # Notify on lap change if matches a driver key = str(self.total_laps - int(current_lap)) if key in self.driver_dict.keys(): await self.send_lap_report(self.driver_dict[key]) def load_podium_data(self, data): if "Lines" in data.keys(): for position in data["Lines"].keys(): if 'RacingNumber' in data["Lines"][position].keys(): self.podium[int(position)] = data["Lines"][position]['RacingNumber'] def get_podium(self): if len(self.podium) == 3: try: if "?" in self.podium: return message = "" for i in range(3): pos = 'P' + str(i + 1) driver = self.driver_dict[self.podium[i]] message += f":champagne:{driver} " + \ f"{self.flag_dict[pos]}" if driver == self.fastest_lap: if driver == self.name_dict['ALO']: message += " EL" message += ' ' + self.flag_dict['FLAP'] # Put a new line at the end of each row message += "\n" return message except: pass return "I don't know the podium yet :(" def get_q1_cut(self): message = "" for i in range(15, 20): try: message += self.driver_list[i] + " " except: message += "? " message = message.strip() return message def get_q2_cut(self): message = "" for i in range(10, 15): try: message += self.driver_list[i] + " " except: message += "? " message = message.strip() return message def get_weather(self): if self.weather == "": return "No weather info yet..." else: return self.weather async def process_race_events(self, session_data, status_data): # Hold the next event to report, and the type event = None event_type = None # Hold report report = None while (len(session_data) > 0 or len(status_data) > 0) and self.is_reporting: # If only one of them has data, use that one if len(session_data) == 0: event = status_data.pop(0) event_type = "STATUS" elif len(status_data) == 0: event = session_data.pop(0) event_type = "SESSION" # If they both have data, use the one with the oldest timestamp else: session_time = session_data[0]["Utc"] status_time = status_data[0]["Utc"] if session_time < status_time: event = session_data.pop(0) event_type = "SESSION" else: event = status_data.pop(0) event_type = "STATUS" # At this point we have the event. Generate the report based on the type. report = None if event_type == "SESSION": # Get the lap from the event cur_lap = event["Lap"] # Make sure we haven't already reported on this lap if self.current_lap < cur_lap: self.current_lap = cur_lap # Get the laps remaining key to see if there's a driver that matches laps_key = str(self.total_laps - cur_lap) # If there's a matching driver, then send a message if laps_key in self.driver_dict.keys(): await self.send_lap_report(self.driver_dict[laps_key]) # Must be a status event else: key = None if "TrackStatus" in event.keys(): key = "TrackStatus" elif "SessionStatus" in event.keys(): key = "SessionStatus" if key is not None: track_status = event[key].upper() # Check the track status to determine what report to send if track_status == "ALLCLEAR": report = f"{self.flag_dict['GREEN_FLAG']}" + \ f"{self.flag_dict['GREEN_FLAG']}{self.flag_dict['GREEN_FLAG']}" elif track_status == "YELLOW": report = f"{self.flag_dict['YELLOW_FLAG']}" + \ f"{self.flag_dict['YELLOW_FLAG']}{self.flag_dict['YELLOW_FLAG']}" elif track_status == "RED": report = f"{self.flag_dict['RED_FLAG']}" + \ f"{self.flag_dict['RED_FLAG']}{self.flag_dict['RED_FLAG']}" elif track_status == "FINISHED": report = f"{self.flag_dict['RACE_FINISHED']}" + \ f"{self.flag_dict['RACE_FINISHED']}{self.flag_dict['RACE_FINISHED']}" if report is not None: await self.send_status_report(report) def load_weather_data(self, data): weather_txt = "Track Weather Report\n" for k in data.keys(): if k != "_kf": weather_txt += f"{k}: {data[k]}\n" self.weather = weather_txt def load_timing_stats_data(self, data): if "Lines" in data.keys(): lines = data["Lines"] for driver_num in lines.keys(): line = lines[driver_num] if "PersonalBestLapTime" in line.keys(): position = -1 try: position = line["PersonalBestLapTime"]["Position"] except: pass if position == 1: self.fastest_lap = self.driver_dict[driver_num] return def load_driver_data(self, data): for driver in data.keys(): position = data[driver]["Line"] if driver in self.driver_dict.keys(): self.driver_list[position - 1] = self.driver_dict[driver] else: self.driver_list[position - 1] = driver async def print_driver_range(self, ctx, start, stop): message = "" for i in range(start, stop): driver = self.driver_list[i] message += f"P{i + 1}: {driver}" if driver == self.fastest_lap: if driver == self.name_dict['ALO']: message += " EL" message += ' ' + self.flag_dict['FLAP'] message += "\n" await ctx.send(message) async def print_driver_list(self, ctx): # Send 1-10, then 11-20 await self.print_driver_range(ctx, 0, 10) await self.print_driver_range(ctx, 10, 20) def load_initial(self, message): # Load podium data if 'R' in message.keys(): if 'TopThree' in message['R'].keys(): top_three = message['R']['TopThree'] if 'Lines' in top_three.keys() and \ len(top_three['Lines']) == 3: for i in range(3): self.podium[i] = top_three['Lines'][i]['RacingNumber'] # Load driver list if 'DriverList' in message['R'].keys(): for driver_num in message['R']['DriverList'].keys(): if driver_num == '_kf': continue position = message['R']['DriverList'][driver_num]['Line'] driver = '???' if driver_num in self.driver_dict.keys(): driver = self.driver_dict[driver_num] #self.driver_list[int(position) - 1] = self.driver_dict[driver_num] self.driver_list[int(position) - 1] = driver # Load lap data if 'LapCount' in message['R'].keys(): if 'TotalLaps' in message['R']['LapCount'].keys(): self.total_laps = int(message['R']['LapCount']['TotalLaps']) # Load weather data if 'WeatherData' in message['R'].keys(): weather_obj = message['R']['WeatherData'] weather_text = "Track Weather Report\n" for k in weather_obj.keys(): if k != "_kf": weather_text += f"{k}: {weather_obj[k]}\n" self.weather = weather_text # Load fastest lap data if 'TimingStats' in message['R'].keys(): flap_obj = message['R']['TimingStats'] self.load_timing_stats_data(flap_obj) async def process_message(self, message): try: if isinstance(message, collections.abc.Sequence): if message[0] == 'Heartbeat': return elif message[0] == 'DriverList': self.load_driver_data(message[1]) elif message[0] == 'TopThree': self.load_podium_data(message[1]) elif message[0] == 'RaceControlMessages': await self.load_rcm_messages(message[1]) elif message[0] == 'LapCount': await self.load_lap_data(message[1]) elif message[0] == 'WeatherData': self.load_weather_data(message[1]) elif message[0] == 'TimingStats': self.load_timing_stats_data(message[1]) else: pass # Check to see if this is the initial "R" record from the response elif "R" in message.keys(): if "R" in message: self.load_initial(message) except Exception as e: pass def get_messages_from_db(self): try: messages = [] con = sqlite3.connect(self.dbfile) cur = con.cursor() cur2 = con.cursor() for row in cur.execute('select id, message from messages order by id asc'): messages.append(self.convert_message(row[1])) # Now that we have the message, delete this row from the dbfile cur2.execute(f"delete from messages where id = {row[0]}") con.commit() cur.close() cur2.close() con.close() return messages except: return [] async def _race_report(self, ctx): self.report_deleted_lap = False self.session_type = 'RACE' await self._report(ctx) async def _quali_report(self, ctx): self.report_deleted_lap = True self.session_type = 'QUALI' await self._report(ctx) async def _practice_report(self, ctx): self.report_deleted_lap = True self.session_type = 'PRACTICE' await self._report(ctx) async def _report(self, ctx): self.is_reporting = True self.channel = ctx.channel await self.start_collect() while self.is_reporting: # Do processing # process any new messages in the db messages = self.get_messages_from_db() try: for message in messages: await self.process_message(message) await asyncio.sleep(3) except: pass # process any messages in the delay queue await self.process_delay_messages() # If collecting, make sure the collection process is running if self.is_collecting: if self.collector_proc == None or \ self.collector_proc.poll() != None: await self.start_collect() @staticmethod def get_token(token_file): with open(token_file) as tok: return tok.readline().strip() async def start_collect(self): await self.stop_collect() self.is_collecting = True self.is_slippery_reported = False dir_path = os.path.dirname(os.path.realpath(__file__)) command_txt = os.path.join(dir_path, self.collector_command) command_txt += self.collector_params self.collector_proc = Popen(command_txt.split()) async def stop_collect(self): self.is_collecting = False try: if self.collector_proc != None: self.collector_proc.kill() except: pass def decode_watched(self, w): if w == 0: return 'Not Watched Yet' else: return 'Watched Already' async def report_next_event(self,ctx): try: con = sqlite3.connect('schedule.db') cur = con.cursor() now_secs = datetime.datetime.now().timestamp() for row in cur.execute('select * from schedule ' + \ 'where date_start > ?' + \ 'order by date_start ascending limit 1', (now_secs)): next_secs = row[2] delta_seconds = next_secs - now_secs td = datetime.timedelta(seconds=delta_seconds) next_date = datetime.datetime.fromtimestamp(next_secs) message = f"Next event is {row[0]} at {next_date}. " + \ f"Time remaining {td}" await ctx.send(message) break # There should only be one row anyway except: ctx.send("Sorry, hit the wall trying to find the answer...") async def report_next_race(self,ctx): try: con = sqlite3.connect('schedule.db') cur = con.cursor() now_secs = datetime.datetime.now().timestamp() for row in cur.execute('select * from schedule ' + \ 'where date_start > ? and ' + \ 'session_type = "Race" ' + \ 'order by date_start ascending limit 1', (now_secs)): next_secs = row[2] delta_seconds = next_secs - now_secs td = datetime.timedelta(seconds=delta_seconds) next_date = datetime.datetime.fromtimestamp(next_secs) message = f"Next event is {row[0]} at {next_date}. " + \ f"Time remaining {td}" await ctx.send(message) break # There should only be one row anyway except: ctx.send("Sorry, hit the wall trying to find the answer...") def __init__(self): # Set debug or not self.debug = True # Discord authentication token self.token = self.get_token("token.txt") self.collector_command = "robottas_collector.py" self.collector_params = " save dummy.txt" self.collector_proc = None self.is_collecting = False # Preface messages with the following self.report_preamble = ':robot::peach: Alert!' # Holds processing thread self.bg_task = None # Hold db file self.dbfile = "messages.db" # Holds current lap self.current_lap = 0 # Hold podium places self.podium = ['?', '?', '?'] # Hold driver list self.driver_list = ['?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?'] # Hold weather info self.weather = "" # Hold weather we have reported on slippery track. self.is_slippery_reported = False # Hold lap info self.current_lap = -1000 self.total_laps = -1000 self.lap_reported = 0 # Hold the driver with the fastest lap self.fastest_lap = '' # Holds dictionary for number to icon self.driver_dict = { '1': '<:VER:1067541523748630570>', '3': '<:RIC:1067870312949108887>', '5': '<:VET:1067964065516884079>', '11': '<:PER:1067822335123525732>', '14': '<:ALO:1067876094033793054>', '40': ':kiwi:', '44': '<:HAM:1067828533746991165>', '55': '<:SAI:1067824776502067270>', '63': '<:RUS:1067831294748274728>', '16': '<:LEC:1067544797050585198>', '18': '<:STR:1067871582854336663>', '4': '<:NOR:1067840487593082941>', '10': '<:GAS:1067836596495327283>', '27': '<:HUL:1067880110918742187>', '31': '<:OCO:1067834157465612398>', '77': '<:BOT:1067819716527276032>', '81': '<:PIA:1067844998369914961>', '24': '<:ZHO:1067865955117568010>', '22': '<:TSU:1067888851676315660>', '20': '<:MAG:1067883814992486510>', '23': '<:ALB:1067874026871074887>', '2': '<:SAR:1067890949197414410>', } # Holds dictionary for driver 3-letter code to icon self.name_dict = { 'VER': '<:VER:1067541523748630570>', 'RIC': '<:RIC:1067870312949108887>', 'VET': '<:VET:1067964065516884079>', 'PER': '<:PER:1067822335123525732>', 'ALO': '<:ALO:1067876094033793054>', 'HAM': '<:HAM:1067828533746991165>', 'SAI': '<:SAI:1067824776502067270>', 'RUS': '<:RUS:1067831294748274728>', 'LEC': '<:LEC:1067544797050585198>', 'STR': '<:STR:1067871582854336663>', 'NOR': '<:NOR:1067840487593082941>', 'GAS': '<:GAS:1067836596495327283>', 'HUL': '<:HUL:1067880110918742187>', 'LAW': ':kiwi:', 'OCO': '<:OCO:1067834157465612398>', 'BOT': '<:BOT:1067819716527276032>', 'PIA': '<:PIA:1067844998369914961>', 'ZHO': '<:ZHO:1067865955117568010>', 'TSU': '<:TSU:1067888851676315660>', 'MAG': '<:MAG:1067883814992486510>', 'ALB': '<:ALB:1067874026871074887>', 'SAR': '<:SAR:1067890949197414410>', } # Holds dictionary for race states to icons self.flag_dict = { 'GREEN_FLAG': '<:GREEN:1107401338993782804>', 'YELLOW_FLAG': '<:YELLOW:1091801442714652815>', 'RED_FLAG': '<:RED:1091801383998586912>', 'VSC': '<:VSC:1107401410183704596>', 'VIRTUAL_SAFETY_CAR': '<:VSC:1107401410183704596>', 'SC': '<:SC:1107401472041304104>', 'SAFETY_CAR': '<:SC:1107401472041304104>', 'CHECKERED': '<:CHECKERED:1091801509039194152>', 'RACE_FINISHED': '<:CHECKERED:1091801509039194152>', 'STARTED': '<:CHECKERED:1091801509039194152>', 'P1': ':first_place:', 'P2': ':second_place:', 'P3': ':third_place:', 'FLAP': '<:FLap4:1122601036952129547>' } # Bot configuration info self.bot_intents = discord.Intents.default() self.bot_intents.message_content = True self.description = "robotas - f1 bot - !rbhelp for commands" self.command_prefix = '!' # Track whether to report, and what channel to report to self.is_reporting = False self.report_id = None self.started = False # Hold message delay self.delay = 45 self.message_queue = [] # Hold whether to report deleted lap messages self.report_deleted_lap = False self.session_type = '' # Test file self.test_file = "test.json" # Race db patterns self.loc_pattern = re.compile('[\w\s\-\d]+') self.yr_pattern = re.compile('\d{4}') ### END FastF1 adapted section ### super().__init__(command_prefix=self.command_prefix, description=self.description, intents=self.bot_intents) # Setup commands @self.command() async def rbhelp(ctx): await ctx.send("commands: \n" + "!rbhelp - Print this help message " + "(but you knew that already)\n" + "!rbname - I will tell you my name.\n" + "!rbroot - I will tell you who I root for.\n" + "!rbreport - I will start race reporting in this channel. " + "I will try to tell you about current flags, laps left,\n" + "and safety cars.\n" + "!rbstop - Stop reporting.\n" + "!podium - Display podium positions.\n" + "!q1cut - Show drivers in Q1 cut positions.\n" + "!q2cut - Show drivers in Q2 cut positions.\n" + "!rbdelay - Set the race messaging delay in seconds.\n" + "The following display race control messages:\n" + " !calm\n" + " !forecast\n" + " !undercut\n" ) @self.command() async def rbroot(ctx): await ctx.send("Rooting for :ferry::peach: of course!\n" + self.name_dict["BOT"] + ":smiling_face_with_3_hearts:") @self.command() async def rbname(ctx): await ctx.send("Hello, my name is Robottas, pronounced :robot::peach:") @self.command() async def rbdelay(ctx, delay): try: secs = max(0, int(delay)) if secs < 10 or secs > 300: await ctx.send("Delay must be between 10 and 300.") else: self.delay = secs await ctx.send(f"Delay set to {secs} seconds.") except: await ctx.send(f"Invalid delay value passed to rbdelay.") @self.command() async def rbstop(ctx): self.is_reporting = False self.report_id = None await self.stop_collect() await ctx.send(":robot::peach: powering down") @self.command() async def podium(ctx): message = self.get_podium() await ctx.send(message) @self.command() async def driverlist(ctx): await self.print_driver_list(ctx) @self.command() async def q1cut(ctx): message = self.get_q1_cut() await ctx.send(message) @self.command() async def q2cut(ctx): message = self.get_q2_cut() await ctx.send(message) @self.command() async def weather(ctx): message = self.get_weather() await ctx.send(message) @self.command() async def race(ctx): if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator: await ctx.send(":robot::peach: Ready to report for the race!") await self._race_report(ctx) @self.command() async def quali(ctx): if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator: await ctx.send(":robot::peach: Ready to report on quali!") await self._quali_report(ctx) @self.command() async def practice(ctx): if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator: await ctx.send(":robot::peach: Ready to report on practice!") await self._practice_report(ctx) @self.command() async def flap(ctx): if self.fastest_lap != '': await ctx.send(self.flag_dict['FLAP'] + self.fastest_lap) else: await ctx.send("No " + self.flag_dict['FLAP'] + " yet.") # @self.command() # async def rbtestfile(ctx): # self.is_reporting = True # if str(ctx.author) == "tamservo#0": # await self._test_file(ctx) @self.command() async def start_collect(ctx): if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator: # if an authorized user, start the collection script that # puts records into the database await self.start_collect() @self.command() async def danger(ctx): await ctx.send("That's some dangerous driving! " + self.name_dict["HAM"]) @self.command() async def dotd(ctx): await ctx.send("I don't know, probably " + self.name_dict["ALB"]) @self.command() async def flip(ctx): await ctx.send(random.choice(["Heads", "Tails"])) @self.command() async def gp2(ctx): await ctx.send("GP2 engine! GP2! ARGHHH! " + self.name_dict["ALO"]) @self.command() async def quote(ctx): try: con = sqlite3.connect('races.db') cur = con.cursor() for row in cur.execute('select * from quotes order by Random() limit 1'): message = row[0] + " -- " + row[1] + " " + row[2] await ctx.send(message) break # There should only be one row anyway cur.close() con.close() except: ctx.send("Can't think of a quote for some reason...") @self.command() @commands.has_permissions(administrator=True) async def race_watched(ctx, loc, yr): if re.match(self.loc_pattern, loc) and \ re.match(self.yr_pattern, yr): try: con = sqlite3.connect('races.db') cur = con.cursor() cur.execute('update races ' + 'set watched = 1 ' + 'where location = ? ' + 'year = ?', (loc, yr)) con.commit() cur.close() con.close() await ctx.send(f"{loc} {yr} marked as watched.") except: await ctx.send(f"Couldn't mark {loc} {yr} as watched for some reason.") @self.command() async def rand_race(ctx): try: con = sqlite3.connect('races.db') cur = con.cursor() for row in cur.execute('select * from races ' + 'order by Random() ' + 'limit 1'): watched = self.decode_watched(row[2]) await ctx.send(f"Location: {row[0]} Year: {row[1]} {watched}") break cur.close() con.close() except: ctx.send("Can't find a random race for some reason.") @self.command() async def rand_race_new(ctx): try: con = sqlite3.connect('races.db') cur = con.cursor() for row in cur.execute('select * from races ' + 'where watched = 0 ' + 'order by Random() ' + 'limit 1'): await ctx.send(f"Location: {row[0]} Year: {row[1]}") break cur.close() con.close() except: ctx.send("Can't pick a race for some reason.") @self.command() async def tyres(ctx): await ctx.send("Bono, my tyres are gone " + self.name_dict["HAM"]) # Commands that send images @self.command() async def calm(ctx): await self.send_image(ctx, "images/calm.png") @self.command() async def forecast(ctx): await self.send_image(ctx, "images/forecast.png") @self.command() async def penalty(ctx): await self.send_image(ctx, "images/penalty.png") @self.command() async def undercut(ctx): await self.send_image(ctx, "images/undercut.png") ## Calendar Commands # Give days, hours, minutes until the next event @self.command() async def next_event(ctx): await self.report_next_event(ctx) # Give days, hours, minutes until the next race @self.command() async def next_race(ctx): await self.report_next_race(ctx) if __name__ == '__main__': rb = Robottas() rb.run_robottas()