From f34501046e99a31614db94ec2504183f1675a3ed Mon Sep 17 00:00:00 2001 From: tamservo Date: Sat, 15 Jul 2023 15:46:39 +0100 Subject: [PATCH] Adding initial code. --- RobottasSignalr.py | 198 ++++++++++++ robottas.py | 784 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 982 insertions(+) create mode 100644 RobottasSignalr.py create mode 100755 robottas.py diff --git a/RobottasSignalr.py b/RobottasSignalr.py new file mode 100644 index 0000000..aebf555 --- /dev/null +++ b/RobottasSignalr.py @@ -0,0 +1,198 @@ +import asyncio +import concurrent.futures +import json +import logging +import requests +import time +import sqlite3 + +from fastf1.signalr_aio import Connection + +import fastf1 + +""" ADOPTED FROM FASTF1 client """ + +def messages_from_raw(r): + """Extract data messages from raw recorded SignalR data. + This function can be used to extract message data from raw SignalR data + which was saved using :class:`SignalRClient` in debug mode. + Args: + r (iterable) : Iterable containing raw SignalR responses. + + """ + ret = list() + errorcount = 0 + for data in r: + # fix F1's not json compliant data + data = data.replace("'", '"') \ + .replace('True', 'true') \ + .replace('False', 'false') + try: + data = json.loads(data) + except json.JSONDecodeError: + errorcount += 1 + continue + messages = data['M'] if 'M' in data and len(data['M']) > 0 else {} + for inner_data in messages: + hub = inner_data['H'] if 'H' in inner_data else '' + if hub.lower() == 'streaming': + # method = inner_data['M'] + message = inner_data['A'] + ret.append(message) + + return ret, errorcount + + +class SignalRClient: + """A client for receiving and saving F1 timing data which is streamed + live over the SignalR protocol. + During an F1 session, timing data and telemetry data are streamed live + using the SignalR protocol. This class can be used to connect to the + stream and save the received data into a file. + The data will be saved in a raw text format without any postprocessing. + It is **not** possible to use this data during a session. Instead, the + data can be processed after the session using the :mod:`fastf1.api` and + :mod:`fastf1.core` + Args: + filename (str) : filename (opt. with path) for the output file + filemode (str, optional) : one of 'w' or 'a'; append to or overwrite + file content it the file already exists. Append-mode may be useful + if the client is restarted during a session. + debug (bool, optional) : When set to true, the complete SignalR + message is saved. By default, only the actual data from a + message is saved. + timeout (int, optional) : Number of seconds after which the client + will automatically exit when no message data is received. + Set to zero to disable. + logger (Logger or None) : By default, errors are logged to the + console. If you wish to customize logging, you can pass an + instance of :class:`logging.Logger` (see: :mod:`logging`). + """ + _connection_url = 'https://livetiming.formula1.com/signalr' + + def __init__(self, filename, filemode='w', debug=False, + timeout=60, logger=None): + + self.headers = {'User-agent': 'BestHTTP', + 'Accept-Encoding': 'gzip, identity', + 'Connection': 'keep-alive, Upgrade'} + + self.topics = ["Heartbeat", "TopThree", "RcmSeries", + "TimingStats", "WeatherData", + "TrackStatus", "DriverList", + "RaceControlMessages", "SessionInfo", + "SessionData", "LapCount"] + + self.debug = debug + self.filename = filename + self.filemode = filemode + self.timeout = timeout + self._connection = None + + if not logger: + logging.basicConfig( + format="%(asctime)s - %(levelname)s: %(message)s" + ) + self.logger = logging.getLogger('SignalR') + else: + self.logger = logger + + self._output_file = None + self._t_last_message = None + + def _to_file(self, msg): + """ + self._output_file.write(msg + '\n') + self._output_file.flush() + """ + print(msg) + con = sqlite3.connect('messages.db') + cur = con.cursor() + cur.execute("insert into messages (message) values(?)", (msg,)) + con.commit() + cur.close() + con.close() + + async def _on_do_nothing(self, msg): + # just do nothing with the message; intended for debug mode where some + # callback method still needs to be provided + pass + + async def _on_message(self, msg): + self._t_last_message = time.time() + loop = asyncio.get_running_loop() + try: + with concurrent.futures.ThreadPoolExecutor() as pool: + await loop.run_in_executor( + pool, self._to_file, str(msg) + ) + except Exception: + self.logger.exception("Exception while writing message to file") + + async def _on_debug(self, **data): + if 'M' in data and len(data['M']) > 0: + self._t_last_message = time.time() + + loop = asyncio.get_running_loop() + try: + with concurrent.futures.ThreadPoolExecutor() as pool: + await loop.run_in_executor( + pool, self._to_file, str(data) + ) + except Exception: + self.logger.exception("Exception while writing message to file") + + async def _run(self): + self._output_file = open(self.filename, self.filemode) + # Create connection + session = requests.Session() + session.headers = self.headers + self._connection = Connection(self._connection_url, session=session) + + # Register hub + hub = self._connection.register_hub('Streaming') + + if self.debug: + # Assign error handler + self._connection.error += self._on_debug + # Assign debug message handler to save raw responses + self._connection.received += self._on_debug + hub.client.on('feed', self._on_do_nothing) # need to connect an async method + else: + # Assign hub message handler + self._connection.received += self._on_debug + hub.client.on('feed', self._on_message) + + hub.server.invoke("Subscribe", self.topics) + + # Start the client + loop = asyncio.get_event_loop() + with concurrent.futures.ThreadPoolExecutor() as pool: + await loop.run_in_executor(pool, self._connection.start) + + async def _supervise(self): + self._t_last_message = time.time() + while True: + if (self.timeout != 0 + and time.time() - self._t_last_message > self.timeout): + self.logger.warning(f"Timeout - received no data for more " + f"than {self.timeout} seconds!") + self._connection.close() + return + await asyncio.sleep(1) + + async def _async_start(self): + self.logger.info(f"Starting FastF1 live timing client " + f"[v{fastf1.__version__}]") + await asyncio.gather(asyncio.ensure_future(self._supervise()), + asyncio.ensure_future(self._run())) + self._output_file.close() + self.logger.warning("Exiting...") + + def start(self): + """Connect to the data stream and start writing the data to a file.""" + try: + asyncio.run(self._async_start()) + except KeyboardInterrupt: + self.logger.warning("Keyboard interrupt - exiting...") + return diff --git a/robottas.py b/robottas.py new file mode 100755 index 0000000..4149fb1 --- /dev/null +++ b/robottas.py @@ -0,0 +1,784 @@ +#!/usr/bin/python3 + +import asyncio +import collections.abc +import json +import sqlite3 +import time +import discord + +from discord.ext import commands + + +class Robottas(commands.Bot): + + # The following section is adapted from code by theOehrly on GitHub FastF1 project + # and is subject to the MIT License under which it was released. Specifically + # the Livetiming client. + + def convert_message(self, raw): + data = raw.replace("'", '"') \ + .replace('True', 'true') \ + .replace('False', 'false') + + try: + data = json.loads(data) + return data + except json.JSONDecodeError: + return "" + + # End section adapted from FastF1 + + + async def on_ready(self): + print('Logged in as: {}'.format(self.user)) + + + def run_robottas(self): + self.run(self.token) + + + async def send_message(self, message): + print(f"in send_message {message} {self.channel}") + if self.channel is None: + return + + await self.channel.send(message) + + + def send_delay_message(self, message): + print("in send_delay_message") + self.message_queue.append((time.time(), message)) + print(f"adding message: {message} at {time.time()}") + + + async def process_delay_messages(self): + print("in process_delay_queue") + while len(self.message_queue) > 0 and \ + self.message_queue[0][0] < time.time() - self.delay: + + print(f"removing at {time.time()}") + message = self.message_queue.pop(0)[1] + await self.send_message(message) + await asyncio.sleep(1) + + + async def send_status_report(self, report): + self.send_delay_message(report) + + + async def send_lap_report(self, driver): + self.send_delay_message(driver + " laps remaining") + + + async def load_flag_message(self, message): + flag = message['Flag'] + message_text = message['Message'] + report = None + + if flag == 'GREEN': + report = f"{self.flag_dict['GREEN_FLAG']}" + \ + f"{message_text}{self.flag_dict['GREEN_FLAG']}" + + elif flag == 'RED': + report = f"{self.flag_dict['RED_FLAG']}{message_text}" + \ + f"{self.flag_dict['RED_FLAG']}" + + elif flag == 'YELLOW': + report = f"{self.flag_dict['YELLOW_FLAG']}{message_text}" + \ + f"{self.flag_dict['YELLOW_FLAG']}" + + elif flag == 'DOUBLE YELLOW': + report = f"{self.flag_dict['YELLOW_FLAG']}" + \ + f"{self.flag_dict['YELLOW_FLAG']}" + \ + f"{message_text}" + \ + f"{self.flag_dict['YELLOW_FLAG']}" + \ + f"{self.flag_dict['YELLOW_FLAG']}" + + elif flag == 'BLACK AND WHITE' and self.report_deleted_lap: + report = f"{message['Message']}" + + elif flag == 'CHEQUERED': + report = f"{self.flag_dict['CHECKERED']}" + \ + f"{self.flag_dict['CHECKERED']}" + \ + f"{self.flag_dict['CHECKERED']}" + + # Return None or flag message + return report + + + async def load_rcm_messages(self, data): + if "Messages" in data.keys(): + report = None + for key in data['Messages'].keys(): + message = data['Messages'][key] + if 'Category' in message.keys(): + category = message["Category"] + category = category.upper() + + if category == "FLAG": + report = await self.load_flag_message( message ) + + elif category == "OTHER": + if self.session_type == "RACE" and "DELETED" in message['Message']: + pass + else: + report = message['Message'] + + elif category == "DRS": + report = message['Message'] + + elif category == "CAREVENT" and \ + (self.session_type == "PRACTICE" or \ + self.session_type == "QUALI"): + report = message['Message'] + + elif category == "SAFETYCAR": + if message["Mode"] == 'VIRTUAL SAFETY CAR': + report = f"{self.flag_dict['VSC']}" + \ + f"{message['Message']}" + \ + f"{self.flag_dict['VSC']}" + + elif message["Mode"] == 'SAFETY CAR': + report = f"{self.flag_dict['SC']}" + \ + f"{message['Message']}" + \ + f"{self.flag_dict['SC']}" + + if report is not None: + await self.send_status_report(report) + + + async def load_lap_data(self, data): + print(f"load_lap_data: {str(data)}") + if "CurrentLap" in data.keys(): + current_lap = data["CurrentLap"] + + if self.current_lap != current_lap: + self.current_lap = current_lap + #Notify on lap change if matches a driver + key = str(self.total_laps - int(current_lap)) + if key in self.driver_dict.keys(): + await self.send_lap_report(self.driver_dict[key]) + + + def load_podium_data(self, data): + print("in load_podium_data") + if "Lines" in data.keys(): + for position in data["Lines"].keys(): + if 'RacingNumber' in data["Lines"][position].keys(): + self.podium[int(position)] = data["Lines"][position]['RacingNumber'] + + + def get_podium(self): + if len(self.podium) == 3: + try: + if "?" in self.podium: + return + + message = "" + + for i in range(3): + pos = 'P' + str(i + 1) + driver = self.driver_dict[self.podium[i]] + message += f":champagne:{driver} " + \ + f"{self.flag_dict[pos]}" + + if driver == self.fastest_lap: + if driver == self.name_dict['ALO']: + message += " EL" + + message += ' ' + self.flag_dict['FLAP'] + + # Put a new line at the end of each row + message += "\n" + + return message + + except: + print("Error in sending podium message.") + + return "I don't know the podium yet :(" + + + def get_q1_cut(self): + message = "" + for i in range(15,20): + try: + message += self.driver_list[i] + " " + except: + message += "? " + + message = message.strip() + return message + + + def get_q2_cut(self): + message = "" + + for i in range(10,15): + try: + message += self.driver_list[i] + " " + except: + message += "? " + + message = message.strip() + return message + + + def get_weather(self): + if self.weather == "": + return "No weather info yet..." + else: + return self.weather + + + async def process_race_events(self, session_data, status_data): + # Hold the next event to report, and the type + event = None + event_type = None + + # Hold report + report = None + while (len(session_data) > 0 or len(status_data) > 0) and self.is_reporting: + # If only one of them has data, use that one + print("starting loop") + if len(session_data) == 0: + event = status_data.pop(0) + event_type = "STATUS" + elif len(status_data) == 0: + event = session_data.pop(0) + event_type = "SESSION" + # If they both have data, use the one with the oldest timestamp + else: + session_time = session_data[0]["Utc"] + status_time = status_data[0]["Utc"] + + if session_time < status_time: + event = session_data.pop(0) + event_type = "SESSION" + + else: + event = status_data.pop(0) + event_type = "STATUS" + + # At this point we have the event. Generate the report based on the type. + report = None + + if event_type == "SESSION": + print("Session event") + # Get the lap from the event + cur_lap = event["Lap"] + + # Make sure we haven't already reported on this lap + if self.current_lap < cur_lap: + self.current_lap = cur_lap + + # Get the laps remaining key to see if there's a driver that matches + laps_key = str( self.total_laps - cur_lap ) + + # If there's a matching driver, then send a message + if laps_key in self.driver_dict.keys(): + await self.send_lap_report(self.driver_dict[laps_key]) + + # Must be a status event + else: + print("Status event") + key = None + if "TrackStatus" in event.keys(): + key = "TrackStatus" + elif "SessionStatus" in event.keys(): + key = "SessionStatus" + + if key is not None: + track_status = event[key].upper() + + # Check the track status to determine what report to send + if track_status == "ALLCLEAR": + report = f"{self.flag_dict['GREEN_FLAG']}" + \ + f"{self.flag_dict['GREEN_FLAG']}{self.flag_dict['GREEN_FLAG']}" + elif track_status == "YELLOW": + report = f"{self.flag_dict['YELLOW_FLAG']}" + \ + f"{self.flag_dict['YELLOW_FLAG']}{self.flag_dict['YELLOW_FLAG']}" + elif track_status == "RED": + report = f"{self.flag_dict['RED_FLAG']}" + \ + f"{self.flag_dict['RED_FLAG']}{self.flag_dict['RED_FLAG']}" + elif track_status == "FINISHED": + report = f"{self.flag_dict['RACE_FINISHED']}" + \ + f"{self.flag_dict['RACE_FINISHED']}{self.flag_dict['RACE_FINISHED']}" + + if report is not None: + await self.send_status_report(report) + + + def load_weather_data(self, data): + weather_txt = "Track Weather Report\n" + for k in data.keys(): + if k != "_kf": + weather_txt += f"{k}: {data[k]}\n" + + self.weather = weather_txt + + + def load_timing_stats_data(self, data): + print("in timing stats") + if "Lines" in data.keys(): + lines = data["Lines"] + for driver_num in lines.keys(): + line = lines[driver_num] + print(f"driver_num {driver_num}") + if "PersonalBestLapTime" in line.keys(): + position = -1 + try: + position = line["PersonalBestLapTime"]["Position"] + print(f"got position {position}") + except: + pass + + if position == 1: + print(f"setting fastest_lap {driver_num}") + self.fastest_lap = self.driver_dict[driver_num] + print(f"flap {driver_num} {self.fastest_lap}") + return + + + def load_driver_data(self, data): + for driver in data.keys(): + position = data[driver]["Line"] + self.driver_list[position - 1] = self.driver_dict[driver] + + + async def print_driver_range(self, ctx, start, stop): + message = "" + for i in range(start, stop): + driver = self.driver_list[i] + message += f"P{i + 1}: {driver}" + if driver == self.fastest_lap: + if driver == self.name_dict['ALO']: + message += " EL" + + message += ' ' + self.flag_dict['FLAP'] + + message += "\n" + + await ctx.send(message) + + + async def print_driver_list(self, ctx): + #Send 1-10, then 11-20 + await self.print_driver_range(ctx, 0, 10) + await self.print_driver_range(ctx, 10, 20) + + + def load_initial(self, message): + print( f"in load_initial: {message['R'].keys()}" ) + # Load podium data + if 'R' in message.keys(): + if 'TopThree' in message['R'].keys(): + top_three = message['R']['TopThree'] + if 'Lines' in top_three.keys() and \ + len(top_three['Lines']) == 3: + + for i in range(3): + self.podium[i] = top_three['Lines'][i]['RacingNumber'] + + # Load driver list + if 'DriverList' in message['R'].keys(): + for driver_num in message['R']['DriverList'].keys(): + if driver_num == '_kf': + continue + position = message['R']['DriverList'][driver_num]['Line'] + self.driver_list[int(position) - 1] = self.driver_dict[driver_num] + + # Load lap data + if 'LapCount' in message['R'].keys(): + if 'TotalLaps' in message['R']['LapCount'].keys(): + self.total_laps = int(message['R']['LapCount']['TotalLaps']) + print(f"self.total_laps: {self.total_laps}") + + + # Load weather data + if 'WeatherData' in message['R'].keys(): + print( 'WeatherData in keys' ) + weather_obj = message['R']['WeatherData'] + weather_text = "Track Weather Report\n" + for k in weather_obj.keys(): + if k != "_kf": + weather_text += f"{k}: {weather_obj[k]}\n" + + self.weather = weather_text + + # Load fastest lap data + if 'TimingStats' in message['R'].keys(): + print( 'Flap in keys' ) + flap_obj = message['R']['TimingStats'] + self.load_timing_stats_data(flap_obj) + + + async def process_message(self, message): + try: + if isinstance(message, collections.abc.Sequence): + print("process_message - in isinstance") + + if message[0] == 'Heartbeat': + return + + elif message[0] == 'DriverList': + self.load_driver_data(message[1]) + + elif message[0] == 'TopThree': + self.load_podium_data(message[1]) + + elif message[0] == 'RaceControlMessages': + await self.load_rcm_messages(message[1]) + + elif message[0] == 'LapCount': + await self.load_lap_data(message[1]) + + elif message[0] == 'WeatherData': + self.load_weather_data(message[1]) + + elif message[0] == 'TimingStats': + self.load_timing_stats_data(message[1]) + + else: + print(f"Not sure how to handle message:{message[0]}") + + # Check to see if this is the initial "R" record from the response + elif "R" in message.keys(): + if "R" in message: + self.load_initial(message) + + except Exception as e: + print(f"process_message error {e}\n\n") + + + def get_messages_from_db(self): + try: + messages = [] + con = sqlite3.connect(self.dbfile) + cur = con.cursor() + cur2 = con.cursor() + for row in cur.execute('select id, message from messages order by id asc'): + messages.append(self.convert_message(row[1])) + + # Now that we have the message, delete this row from the dbfile + cur2.execute(f"delete from messages where id = {row[0]}") + + con.commit() + cur.close() + cur2.close() + con.close() + + return messages + except: + print("db error... continuing") + return [] + + + async def _race_report(self,ctx): + self.report_deleted_lap = False + self.session_type = 'RACE' + await self._report(ctx) + + + async def _quali_report(self, ctx): + self.report_deleted_lap = True + self.session_type = 'QUALI' + + await self._report(ctx) + + + async def _practice_report(self,ctx): + self.report_deleted_lap = True + self.session_type = 'PRACTICE' + await self._report(ctx) + + + async def _report(self, ctx): + self.is_reporting = True + self.channel = ctx.channel + + while self.is_reporting: + # Do processing + print("reporting loop") + + # process any new messages in the db + messages = self.get_messages_from_db() + try: + for message in messages: + await self.process_message(message) + + await asyncio.sleep(3) + except: + print(f"problem with messages") + + # process any messages in the delay queue + await self.process_delay_messages() + + + def get_token(self, token_file): + with open(token_file) as tok: + return tok.readline().strip() + + def __init__(self): + # Set debug or not + self.debug = True + + # Discord authentication token + self.token = self.get_token("token.txt") + + # Preface messages with the following + self.report_preamble = ':robot::peach: Alert!' + + # Holds processing thread + self.bg_task = None + + # Hold db file + self.dbfile = "messages.db" + + # Holds current lap + self.current_lap = 0 + + # Hold podium places + self.podium = ['?', '?', '?'] + + # Hold driver list + self.driver_list = ['?','?','?','?','?','?','?','?','?','?', \ + '?','?','?','?','?','?','?','?','?','?'] + + # Hold weather info + self.weather = "" + + # Hold lap info + self.current_lap = -1000 + self.total_laps = -1000 + self.lap_reported = 0 + + # Hold the driver with the fastest lap + self.fastest_lap = '' + + # Holds dictionary for number to icon + self.driver_dict = { + '1': '<:VER:1067541523748630570>', + '3': '<:RIC:1067870312949108887>', + '5': '<:VET:1067964065516884079>', + '11': '<:PER:1067822335123525732>', + '14': '<:ALO:1067876094033793054>', + '44': '<:HAM:1067828533746991165>', + '55': '<:SAI:1067824776502067270>', + '63': '<:RUS:1067831294748274728>', + '16': '<:LEC:1067544797050585198>', + '18': '<:STR:1067871582854336663>', + '4': '<:NOR:1067840487593082941>', + '10': '<:GAS:1067836596495327283>', + '27': '<:HUL:1067880110918742187>', + '31': '<:OCO:1067834157465612398>', + '77': '<:BOT:1067819716527276032>', + '81': '<:PIA:1067844998369914961>', + '24': '<:ZHO:1067865955117568010>', + '22': '<:TSU:1067888851676315660>', + '20': '<:MAG:1067883814992486510>', + '23': '<:ALB:1067874026871074887>', + '2': '<:SAR:1067890949197414410>', + '21': '<:DEV:1067891622727131248>' + } + + # Holds dictionary for driver 3 letter code to icon + self.name_dict = { + 'VER': '<:VER:1067541523748630570>', + 'RIC': '<:RIC:1067870312949108887>', + 'VET': '<:VET:1067964065516884079>', + 'PER': '<:PER:1067822335123525732>', + 'ALO': '<:ALO:1067876094033793054>', + 'HAM': '<:HAM:1067828533746991165>', + 'SAI': '<:SAI:1067824776502067270>', + 'RUS': '<:RUS:1067831294748274728>', + 'LEC': '<:LEC:1067544797050585198>', + 'STR': '<:STR:1067871582854336663>', + 'NOR': '<:NOR:1067840487593082941>', + 'GAS': '<:GAS:1067836596495327283>', + 'HUL': '<:HUL:1067880110918742187>', + 'OCO': '<:OCO:1067834157465612398>', + 'BOT': '<:BOT:1067819716527276032>', + 'PIA': '<:PIA:1067844998369914961>', + 'ZHO': '<:ZHO:1067865955117568010>', + 'TSU': '<:TSU:1067888851676315660>', + 'MAG': '<:MAG:1067883814992486510>', + 'ALB': '<:ALB:1067874026871074887>', + 'SAR': '<:SAR:1067890949197414410>', + 'DEV': '<:DEV:1067891622727131248>' + } + + # Holds dictionary for race states to icons + self.flag_dict = { + 'GREEN_FLAG': '<:GREEN:1107401338993782804>', + 'YELLOW_FLAG': '<:YELLOW:1091801442714652815>', + 'RED_FLAG': '<:RED:1091801383998586912>', + 'VSC': '<:VSC:1107401410183704596>', + 'VIRTUAL_SAFETY_CAR': '<:VSC:1107401410183704596>', + 'SC': '<:SC:1107401472041304104>', + 'SAFETY_CAR': '<:SC:1107401472041304104>', + 'CHECKERED': '<:CHECKERED:1091801509039194152>', + 'RACE_FINISHED': '<:CHECKERED:1091801509039194152>', + 'STARTED': '<:CHECKERED:1091801509039194152>', + 'P1': ':first_place:', + 'P2': ':second_place:', + 'P3': ':third_place:', + 'FLAP': '<:FLap4:1122601036952129547>' + } + + # Bot configuration info + self.bot_intents = discord.Intents.default() + self.bot_intents.message_content = True + self.description = "robotas - f1 bot - !rbhelp for commands" + self.command_prefix = '!' + + # Track whether to report, and what channel to report to + self.is_reporting = False + self.report_id = None + self.started = False + + # Hold message delay + self.delay = 45 + self.message_queue = [] + + # Hold whether to report deleted lap messages + self.report_deleted_lap = False + self.session_type = '' + + # Test file + self.test_file = "test.json" + + ### END FastF1 adapted section ### + + super().__init__(command_prefix=self.command_prefix, + description=self.description, + intents=self.bot_intents) + + # Setup commands + @self.command() + async def rbhelp(ctx): + if not self.passed_filter(ctx): + return + + await ctx.send("commands: \n" + + "!rbhelp - Print this help message " + + "(but you knew that already)\n" + + "!rbname - I will tell you my name.\n" + + "!rbroot - I will tell you who I root for.\n" + + "!rbreport - I will start race reporting in this channel. " + + "I will try to tell you about current flags, laps left,\n" + + "and safety cars.\n" + + "!rbstop - Stop reporting.\n" + + "!podium - Display podium positions.\n" + + "!q1cut - Show drivers in Q1 cut positions.\n" + + "!q2cut - Show drivers in Q2 cut positions.\n" + + "!rbdelay - Set the race messaging delay in seconds." + ) + + @self.command() + async def rbroot(ctx): + if not self.passed_filter(ctx): + return + + await ctx.send("Rooting for :ferry::peach: of course!\n" + + ":BOT::smiling_face_with_3_hearts:") + + + @self.command() + async def rbname(ctx): + if not self.passed_filter(ctx): + return + + await ctx.send("Hello, my name is Robottas, pronounced :robot::peach:") + + + @self.command() + async def rbdelay(ctx, delay): + try: + secs = int(delay) + if secs < 10 or secs > 300: + await ctx.send("delay must be between 10 and 300") + else: + self.delay = secs + await ctx.send(f"delay set to {secs}") + except: + await ctx.send("invalid delay value") + + + @self.command() + async def rbstop(ctx): + self.is_reporting = False + self.report_id = None + await ctx.send(":robot::peach: powering down") + + + @self.command() + async def podium(ctx): + message = self.get_podium() + await ctx.send(message) + + + @self.command() + async def driverlist(ctx): + await self.print_driver_list(ctx) + + + @self.command() + async def q1cut(ctx): + message = self.get_q1_cut() + await ctx.send(message) + + + @self.command() + async def q2cut(ctx): + message = self.get_q2_cut() + await ctx.send(message) + + + @self.command() + async def weather(ctx): + message = self.get_weather() + await ctx.send(message) + + + @self.command() + async def race(ctx): + if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator: + await ctx.send( ":robot::peach: Ready to report for the race!" ) + await self._race_report(ctx) + + + @self.command() + async def quali(ctx): + if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator: + await ctx.send( ":robot::peach: Ready to report on quali!" ) + await self._quali_report(ctx) + + + @self.command() + async def practice(ctx): + if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator: + await ctx.send( ":robot::peach: Ready to report on practice!" ) + await self._practice_report(ctx) + + + @self.command() + async def flap(ctx): + if self.fastest_lap != '': + await ctx.send( self.fastest_lap + self.flag_dict['FLAP'] ) + else: + await ctx.send( "No " + self.flag_dict['FLAP'] + " yet." ) + + + @self.command() + async def rbtestfile(ctx): + self.is_reporting = True + if str(ctx.author) == "tamservo#0": + await self._test_file(ctx) + + +if __name__ == '__main__': + rb = Robottas() + rb.run_robottas() +