Adding initial code.

This commit is contained in:
tamservo 2023-07-15 15:46:39 +01:00
parent c9d069e6a3
commit f34501046e
2 changed files with 982 additions and 0 deletions

198
RobottasSignalr.py Normal file
View File

@ -0,0 +1,198 @@
import asyncio
import concurrent.futures
import json
import logging
import requests
import time
import sqlite3
from fastf1.signalr_aio import Connection
import fastf1
""" ADOPTED FROM FASTF1 client """
def messages_from_raw(r):
"""Extract data messages from raw recorded SignalR data.
This function can be used to extract message data from raw SignalR data
which was saved using :class:`SignalRClient` in debug mode.
Args:
r (iterable) : Iterable containing raw SignalR responses.
"""
ret = list()
errorcount = 0
for data in r:
# fix F1's not json compliant data
data = data.replace("'", '"') \
.replace('True', 'true') \
.replace('False', 'false')
try:
data = json.loads(data)
except json.JSONDecodeError:
errorcount += 1
continue
messages = data['M'] if 'M' in data and len(data['M']) > 0 else {}
for inner_data in messages:
hub = inner_data['H'] if 'H' in inner_data else ''
if hub.lower() == 'streaming':
# method = inner_data['M']
message = inner_data['A']
ret.append(message)
return ret, errorcount
class SignalRClient:
"""A client for receiving and saving F1 timing data which is streamed
live over the SignalR protocol.
During an F1 session, timing data and telemetry data are streamed live
using the SignalR protocol. This class can be used to connect to the
stream and save the received data into a file.
The data will be saved in a raw text format without any postprocessing.
It is **not** possible to use this data during a session. Instead, the
data can be processed after the session using the :mod:`fastf1.api` and
:mod:`fastf1.core`
Args:
filename (str) : filename (opt. with path) for the output file
filemode (str, optional) : one of 'w' or 'a'; append to or overwrite
file content it the file already exists. Append-mode may be useful
if the client is restarted during a session.
debug (bool, optional) : When set to true, the complete SignalR
message is saved. By default, only the actual data from a
message is saved.
timeout (int, optional) : Number of seconds after which the client
will automatically exit when no message data is received.
Set to zero to disable.
logger (Logger or None) : By default, errors are logged to the
console. If you wish to customize logging, you can pass an
instance of :class:`logging.Logger` (see: :mod:`logging`).
"""
_connection_url = 'https://livetiming.formula1.com/signalr'
def __init__(self, filename, filemode='w', debug=False,
timeout=60, logger=None):
self.headers = {'User-agent': 'BestHTTP',
'Accept-Encoding': 'gzip, identity',
'Connection': 'keep-alive, Upgrade'}
self.topics = ["Heartbeat", "TopThree", "RcmSeries",
"TimingStats", "WeatherData",
"TrackStatus", "DriverList",
"RaceControlMessages", "SessionInfo",
"SessionData", "LapCount"]
self.debug = debug
self.filename = filename
self.filemode = filemode
self.timeout = timeout
self._connection = None
if not logger:
logging.basicConfig(
format="%(asctime)s - %(levelname)s: %(message)s"
)
self.logger = logging.getLogger('SignalR')
else:
self.logger = logger
self._output_file = None
self._t_last_message = None
def _to_file(self, msg):
"""
self._output_file.write(msg + '\n')
self._output_file.flush()
"""
print(msg)
con = sqlite3.connect('messages.db')
cur = con.cursor()
cur.execute("insert into messages (message) values(?)", (msg,))
con.commit()
cur.close()
con.close()
async def _on_do_nothing(self, msg):
# just do nothing with the message; intended for debug mode where some
# callback method still needs to be provided
pass
async def _on_message(self, msg):
self._t_last_message = time.time()
loop = asyncio.get_running_loop()
try:
with concurrent.futures.ThreadPoolExecutor() as pool:
await loop.run_in_executor(
pool, self._to_file, str(msg)
)
except Exception:
self.logger.exception("Exception while writing message to file")
async def _on_debug(self, **data):
if 'M' in data and len(data['M']) > 0:
self._t_last_message = time.time()
loop = asyncio.get_running_loop()
try:
with concurrent.futures.ThreadPoolExecutor() as pool:
await loop.run_in_executor(
pool, self._to_file, str(data)
)
except Exception:
self.logger.exception("Exception while writing message to file")
async def _run(self):
self._output_file = open(self.filename, self.filemode)
# Create connection
session = requests.Session()
session.headers = self.headers
self._connection = Connection(self._connection_url, session=session)
# Register hub
hub = self._connection.register_hub('Streaming')
if self.debug:
# Assign error handler
self._connection.error += self._on_debug
# Assign debug message handler to save raw responses
self._connection.received += self._on_debug
hub.client.on('feed', self._on_do_nothing) # need to connect an async method
else:
# Assign hub message handler
self._connection.received += self._on_debug
hub.client.on('feed', self._on_message)
hub.server.invoke("Subscribe", self.topics)
# Start the client
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
await loop.run_in_executor(pool, self._connection.start)
async def _supervise(self):
self._t_last_message = time.time()
while True:
if (self.timeout != 0
and time.time() - self._t_last_message > self.timeout):
self.logger.warning(f"Timeout - received no data for more "
f"than {self.timeout} seconds!")
self._connection.close()
return
await asyncio.sleep(1)
async def _async_start(self):
self.logger.info(f"Starting FastF1 live timing client "
f"[v{fastf1.__version__}]")
await asyncio.gather(asyncio.ensure_future(self._supervise()),
asyncio.ensure_future(self._run()))
self._output_file.close()
self.logger.warning("Exiting...")
def start(self):
"""Connect to the data stream and start writing the data to a file."""
try:
asyncio.run(self._async_start())
except KeyboardInterrupt:
self.logger.warning("Keyboard interrupt - exiting...")
return

784
robottas.py Executable file
View File

@ -0,0 +1,784 @@
#!/usr/bin/python3
import asyncio
import collections.abc
import json
import sqlite3
import time
import discord
from discord.ext import commands
class Robottas(commands.Bot):
# The following section is adapted from code by theOehrly on GitHub FastF1 project
# and is subject to the MIT License under which it was released. Specifically
# the Livetiming client.
def convert_message(self, raw):
data = raw.replace("'", '"') \
.replace('True', 'true') \
.replace('False', 'false')
try:
data = json.loads(data)
return data
except json.JSONDecodeError:
return ""
# End section adapted from FastF1
async def on_ready(self):
print('Logged in as: {}'.format(self.user))
def run_robottas(self):
self.run(self.token)
async def send_message(self, message):
print(f"in send_message {message} {self.channel}")
if self.channel is None:
return
await self.channel.send(message)
def send_delay_message(self, message):
print("in send_delay_message")
self.message_queue.append((time.time(), message))
print(f"adding message: {message} at {time.time()}")
async def process_delay_messages(self):
print("in process_delay_queue")
while len(self.message_queue) > 0 and \
self.message_queue[0][0] < time.time() - self.delay:
print(f"removing at {time.time()}")
message = self.message_queue.pop(0)[1]
await self.send_message(message)
await asyncio.sleep(1)
async def send_status_report(self, report):
self.send_delay_message(report)
async def send_lap_report(self, driver):
self.send_delay_message(driver + " laps remaining")
async def load_flag_message(self, message):
flag = message['Flag']
message_text = message['Message']
report = None
if flag == 'GREEN':
report = f"{self.flag_dict['GREEN_FLAG']}" + \
f"{message_text}{self.flag_dict['GREEN_FLAG']}"
elif flag == 'RED':
report = f"{self.flag_dict['RED_FLAG']}{message_text}" + \
f"{self.flag_dict['RED_FLAG']}"
elif flag == 'YELLOW':
report = f"{self.flag_dict['YELLOW_FLAG']}{message_text}" + \
f"{self.flag_dict['YELLOW_FLAG']}"
elif flag == 'DOUBLE YELLOW':
report = f"{self.flag_dict['YELLOW_FLAG']}" + \
f"{self.flag_dict['YELLOW_FLAG']}" + \
f"{message_text}" + \
f"{self.flag_dict['YELLOW_FLAG']}" + \
f"{self.flag_dict['YELLOW_FLAG']}"
elif flag == 'BLACK AND WHITE' and self.report_deleted_lap:
report = f"{message['Message']}"
elif flag == 'CHEQUERED':
report = f"{self.flag_dict['CHECKERED']}" + \
f"{self.flag_dict['CHECKERED']}" + \
f"{self.flag_dict['CHECKERED']}"
# Return None or flag message
return report
async def load_rcm_messages(self, data):
if "Messages" in data.keys():
report = None
for key in data['Messages'].keys():
message = data['Messages'][key]
if 'Category' in message.keys():
category = message["Category"]
category = category.upper()
if category == "FLAG":
report = await self.load_flag_message( message )
elif category == "OTHER":
if self.session_type == "RACE" and "DELETED" in message['Message']:
pass
else:
report = message['Message']
elif category == "DRS":
report = message['Message']
elif category == "CAREVENT" and \
(self.session_type == "PRACTICE" or \
self.session_type == "QUALI"):
report = message['Message']
elif category == "SAFETYCAR":
if message["Mode"] == 'VIRTUAL SAFETY CAR':
report = f"{self.flag_dict['VSC']}" + \
f"{message['Message']}" + \
f"{self.flag_dict['VSC']}"
elif message["Mode"] == 'SAFETY CAR':
report = f"{self.flag_dict['SC']}" + \
f"{message['Message']}" + \
f"{self.flag_dict['SC']}"
if report is not None:
await self.send_status_report(report)
async def load_lap_data(self, data):
print(f"load_lap_data: {str(data)}")
if "CurrentLap" in data.keys():
current_lap = data["CurrentLap"]
if self.current_lap != current_lap:
self.current_lap = current_lap
#Notify on lap change if matches a driver
key = str(self.total_laps - int(current_lap))
if key in self.driver_dict.keys():
await self.send_lap_report(self.driver_dict[key])
def load_podium_data(self, data):
print("in load_podium_data")
if "Lines" in data.keys():
for position in data["Lines"].keys():
if 'RacingNumber' in data["Lines"][position].keys():
self.podium[int(position)] = data["Lines"][position]['RacingNumber']
def get_podium(self):
if len(self.podium) == 3:
try:
if "?" in self.podium:
return
message = ""
for i in range(3):
pos = 'P' + str(i + 1)
driver = self.driver_dict[self.podium[i]]
message += f":champagne:{driver} " + \
f"{self.flag_dict[pos]}"
if driver == self.fastest_lap:
if driver == self.name_dict['ALO']:
message += " EL"
message += ' ' + self.flag_dict['FLAP']
# Put a new line at the end of each row
message += "\n"
return message
except:
print("Error in sending podium message.")
return "I don't know the podium yet :("
def get_q1_cut(self):
message = ""
for i in range(15,20):
try:
message += self.driver_list[i] + " "
except:
message += "? "
message = message.strip()
return message
def get_q2_cut(self):
message = ""
for i in range(10,15):
try:
message += self.driver_list[i] + " "
except:
message += "? "
message = message.strip()
return message
def get_weather(self):
if self.weather == "":
return "No weather info yet..."
else:
return self.weather
async def process_race_events(self, session_data, status_data):
# Hold the next event to report, and the type
event = None
event_type = None
# Hold report
report = None
while (len(session_data) > 0 or len(status_data) > 0) and self.is_reporting:
# If only one of them has data, use that one
print("starting loop")
if len(session_data) == 0:
event = status_data.pop(0)
event_type = "STATUS"
elif len(status_data) == 0:
event = session_data.pop(0)
event_type = "SESSION"
# If they both have data, use the one with the oldest timestamp
else:
session_time = session_data[0]["Utc"]
status_time = status_data[0]["Utc"]
if session_time < status_time:
event = session_data.pop(0)
event_type = "SESSION"
else:
event = status_data.pop(0)
event_type = "STATUS"
# At this point we have the event. Generate the report based on the type.
report = None
if event_type == "SESSION":
print("Session event")
# Get the lap from the event
cur_lap = event["Lap"]
# Make sure we haven't already reported on this lap
if self.current_lap < cur_lap:
self.current_lap = cur_lap
# Get the laps remaining key to see if there's a driver that matches
laps_key = str( self.total_laps - cur_lap )
# If there's a matching driver, then send a message
if laps_key in self.driver_dict.keys():
await self.send_lap_report(self.driver_dict[laps_key])
# Must be a status event
else:
print("Status event")
key = None
if "TrackStatus" in event.keys():
key = "TrackStatus"
elif "SessionStatus" in event.keys():
key = "SessionStatus"
if key is not None:
track_status = event[key].upper()
# Check the track status to determine what report to send
if track_status == "ALLCLEAR":
report = f"{self.flag_dict['GREEN_FLAG']}" + \
f"{self.flag_dict['GREEN_FLAG']}{self.flag_dict['GREEN_FLAG']}"
elif track_status == "YELLOW":
report = f"{self.flag_dict['YELLOW_FLAG']}" + \
f"{self.flag_dict['YELLOW_FLAG']}{self.flag_dict['YELLOW_FLAG']}"
elif track_status == "RED":
report = f"{self.flag_dict['RED_FLAG']}" + \
f"{self.flag_dict['RED_FLAG']}{self.flag_dict['RED_FLAG']}"
elif track_status == "FINISHED":
report = f"{self.flag_dict['RACE_FINISHED']}" + \
f"{self.flag_dict['RACE_FINISHED']}{self.flag_dict['RACE_FINISHED']}"
if report is not None:
await self.send_status_report(report)
def load_weather_data(self, data):
weather_txt = "Track Weather Report\n"
for k in data.keys():
if k != "_kf":
weather_txt += f"{k}: {data[k]}\n"
self.weather = weather_txt
def load_timing_stats_data(self, data):
print("in timing stats")
if "Lines" in data.keys():
lines = data["Lines"]
for driver_num in lines.keys():
line = lines[driver_num]
print(f"driver_num {driver_num}")
if "PersonalBestLapTime" in line.keys():
position = -1
try:
position = line["PersonalBestLapTime"]["Position"]
print(f"got position {position}")
except:
pass
if position == 1:
print(f"setting fastest_lap {driver_num}")
self.fastest_lap = self.driver_dict[driver_num]
print(f"flap {driver_num} {self.fastest_lap}")
return
def load_driver_data(self, data):
for driver in data.keys():
position = data[driver]["Line"]
self.driver_list[position - 1] = self.driver_dict[driver]
async def print_driver_range(self, ctx, start, stop):
message = ""
for i in range(start, stop):
driver = self.driver_list[i]
message += f"P{i + 1}: {driver}"
if driver == self.fastest_lap:
if driver == self.name_dict['ALO']:
message += " EL"
message += ' ' + self.flag_dict['FLAP']
message += "\n"
await ctx.send(message)
async def print_driver_list(self, ctx):
#Send 1-10, then 11-20
await self.print_driver_range(ctx, 0, 10)
await self.print_driver_range(ctx, 10, 20)
def load_initial(self, message):
print( f"in load_initial: {message['R'].keys()}" )
# Load podium data
if 'R' in message.keys():
if 'TopThree' in message['R'].keys():
top_three = message['R']['TopThree']
if 'Lines' in top_three.keys() and \
len(top_three['Lines']) == 3:
for i in range(3):
self.podium[i] = top_three['Lines'][i]['RacingNumber']
# Load driver list
if 'DriverList' in message['R'].keys():
for driver_num in message['R']['DriverList'].keys():
if driver_num == '_kf':
continue
position = message['R']['DriverList'][driver_num]['Line']
self.driver_list[int(position) - 1] = self.driver_dict[driver_num]
# Load lap data
if 'LapCount' in message['R'].keys():
if 'TotalLaps' in message['R']['LapCount'].keys():
self.total_laps = int(message['R']['LapCount']['TotalLaps'])
print(f"self.total_laps: {self.total_laps}")
# Load weather data
if 'WeatherData' in message['R'].keys():
print( 'WeatherData in keys' )
weather_obj = message['R']['WeatherData']
weather_text = "Track Weather Report\n"
for k in weather_obj.keys():
if k != "_kf":
weather_text += f"{k}: {weather_obj[k]}\n"
self.weather = weather_text
# Load fastest lap data
if 'TimingStats' in message['R'].keys():
print( 'Flap in keys' )
flap_obj = message['R']['TimingStats']
self.load_timing_stats_data(flap_obj)
async def process_message(self, message):
try:
if isinstance(message, collections.abc.Sequence):
print("process_message - in isinstance")
if message[0] == 'Heartbeat':
return
elif message[0] == 'DriverList':
self.load_driver_data(message[1])
elif message[0] == 'TopThree':
self.load_podium_data(message[1])
elif message[0] == 'RaceControlMessages':
await self.load_rcm_messages(message[1])
elif message[0] == 'LapCount':
await self.load_lap_data(message[1])
elif message[0] == 'WeatherData':
self.load_weather_data(message[1])
elif message[0] == 'TimingStats':
self.load_timing_stats_data(message[1])
else:
print(f"Not sure how to handle message:{message[0]}")
# Check to see if this is the initial "R" record from the response
elif "R" in message.keys():
if "R" in message:
self.load_initial(message)
except Exception as e:
print(f"process_message error {e}\n\n")
def get_messages_from_db(self):
try:
messages = []
con = sqlite3.connect(self.dbfile)
cur = con.cursor()
cur2 = con.cursor()
for row in cur.execute('select id, message from messages order by id asc'):
messages.append(self.convert_message(row[1]))
# Now that we have the message, delete this row from the dbfile
cur2.execute(f"delete from messages where id = {row[0]}")
con.commit()
cur.close()
cur2.close()
con.close()
return messages
except:
print("db error... continuing")
return []
async def _race_report(self,ctx):
self.report_deleted_lap = False
self.session_type = 'RACE'
await self._report(ctx)
async def _quali_report(self, ctx):
self.report_deleted_lap = True
self.session_type = 'QUALI'
await self._report(ctx)
async def _practice_report(self,ctx):
self.report_deleted_lap = True
self.session_type = 'PRACTICE'
await self._report(ctx)
async def _report(self, ctx):
self.is_reporting = True
self.channel = ctx.channel
while self.is_reporting:
# Do processing
print("reporting loop")
# process any new messages in the db
messages = self.get_messages_from_db()
try:
for message in messages:
await self.process_message(message)
await asyncio.sleep(3)
except:
print(f"problem with messages")
# process any messages in the delay queue
await self.process_delay_messages()
def get_token(self, token_file):
with open(token_file) as tok:
return tok.readline().strip()
def __init__(self):
# Set debug or not
self.debug = True
# Discord authentication token
self.token = self.get_token("token.txt")
# Preface messages with the following
self.report_preamble = ':robot::peach: Alert!'
# Holds processing thread
self.bg_task = None
# Hold db file
self.dbfile = "messages.db"
# Holds current lap
self.current_lap = 0
# Hold podium places
self.podium = ['?', '?', '?']
# Hold driver list
self.driver_list = ['?','?','?','?','?','?','?','?','?','?', \
'?','?','?','?','?','?','?','?','?','?']
# Hold weather info
self.weather = ""
# Hold lap info
self.current_lap = -1000
self.total_laps = -1000
self.lap_reported = 0
# Hold the driver with the fastest lap
self.fastest_lap = ''
# Holds dictionary for number to icon
self.driver_dict = {
'1': '<:VER:1067541523748630570>',
'3': '<:RIC:1067870312949108887>',
'5': '<:VET:1067964065516884079>',
'11': '<:PER:1067822335123525732>',
'14': '<:ALO:1067876094033793054>',
'44': '<:HAM:1067828533746991165>',
'55': '<:SAI:1067824776502067270>',
'63': '<:RUS:1067831294748274728>',
'16': '<:LEC:1067544797050585198>',
'18': '<:STR:1067871582854336663>',
'4': '<:NOR:1067840487593082941>',
'10': '<:GAS:1067836596495327283>',
'27': '<:HUL:1067880110918742187>',
'31': '<:OCO:1067834157465612398>',
'77': '<:BOT:1067819716527276032>',
'81': '<:PIA:1067844998369914961>',
'24': '<:ZHO:1067865955117568010>',
'22': '<:TSU:1067888851676315660>',
'20': '<:MAG:1067883814992486510>',
'23': '<:ALB:1067874026871074887>',
'2': '<:SAR:1067890949197414410>',
'21': '<:DEV:1067891622727131248>'
}
# Holds dictionary for driver 3 letter code to icon
self.name_dict = {
'VER': '<:VER:1067541523748630570>',
'RIC': '<:RIC:1067870312949108887>',
'VET': '<:VET:1067964065516884079>',
'PER': '<:PER:1067822335123525732>',
'ALO': '<:ALO:1067876094033793054>',
'HAM': '<:HAM:1067828533746991165>',
'SAI': '<:SAI:1067824776502067270>',
'RUS': '<:RUS:1067831294748274728>',
'LEC': '<:LEC:1067544797050585198>',
'STR': '<:STR:1067871582854336663>',
'NOR': '<:NOR:1067840487593082941>',
'GAS': '<:GAS:1067836596495327283>',
'HUL': '<:HUL:1067880110918742187>',
'OCO': '<:OCO:1067834157465612398>',
'BOT': '<:BOT:1067819716527276032>',
'PIA': '<:PIA:1067844998369914961>',
'ZHO': '<:ZHO:1067865955117568010>',
'TSU': '<:TSU:1067888851676315660>',
'MAG': '<:MAG:1067883814992486510>',
'ALB': '<:ALB:1067874026871074887>',
'SAR': '<:SAR:1067890949197414410>',
'DEV': '<:DEV:1067891622727131248>'
}
# Holds dictionary for race states to icons
self.flag_dict = {
'GREEN_FLAG': '<:GREEN:1107401338993782804>',
'YELLOW_FLAG': '<:YELLOW:1091801442714652815>',
'RED_FLAG': '<:RED:1091801383998586912>',
'VSC': '<:VSC:1107401410183704596>',
'VIRTUAL_SAFETY_CAR': '<:VSC:1107401410183704596>',
'SC': '<:SC:1107401472041304104>',
'SAFETY_CAR': '<:SC:1107401472041304104>',
'CHECKERED': '<:CHECKERED:1091801509039194152>',
'RACE_FINISHED': '<:CHECKERED:1091801509039194152>',
'STARTED': '<:CHECKERED:1091801509039194152>',
'P1': ':first_place:',
'P2': ':second_place:',
'P3': ':third_place:',
'FLAP': '<:FLap4:1122601036952129547>'
}
# Bot configuration info
self.bot_intents = discord.Intents.default()
self.bot_intents.message_content = True
self.description = "robotas - f1 bot - !rbhelp for commands"
self.command_prefix = '!'
# Track whether to report, and what channel to report to
self.is_reporting = False
self.report_id = None
self.started = False
# Hold message delay
self.delay = 45
self.message_queue = []
# Hold whether to report deleted lap messages
self.report_deleted_lap = False
self.session_type = ''
# Test file
self.test_file = "test.json"
### END FastF1 adapted section ###
super().__init__(command_prefix=self.command_prefix,
description=self.description,
intents=self.bot_intents)
# Setup commands
@self.command()
async def rbhelp(ctx):
if not self.passed_filter(ctx):
return
await ctx.send("commands: \n" +
"!rbhelp - Print this help message " +
"(but you knew that already)\n" +
"!rbname - I will tell you my name.\n" +
"!rbroot - I will tell you who I root for.\n" +
"!rbreport - I will start race reporting in this channel. " +
"I will try to tell you about current flags, laps left,\n" +
"and safety cars.\n" +
"!rbstop - Stop reporting.\n" +
"!podium - Display podium positions.\n" +
"!q1cut - Show drivers in Q1 cut positions.\n" +
"!q2cut - Show drivers in Q2 cut positions.\n" +
"!rbdelay - Set the race messaging delay in seconds."
)
@self.command()
async def rbroot(ctx):
if not self.passed_filter(ctx):
return
await ctx.send("Rooting for :ferry::peach: of course!\n" +
":BOT::smiling_face_with_3_hearts:")
@self.command()
async def rbname(ctx):
if not self.passed_filter(ctx):
return
await ctx.send("Hello, my name is Robottas, pronounced :robot::peach:")
@self.command()
async def rbdelay(ctx, delay):
try:
secs = int(delay)
if secs < 10 or secs > 300:
await ctx.send("delay must be between 10 and 300")
else:
self.delay = secs
await ctx.send(f"delay set to {secs}")
except:
await ctx.send("invalid delay value")
@self.command()
async def rbstop(ctx):
self.is_reporting = False
self.report_id = None
await ctx.send(":robot::peach: powering down")
@self.command()
async def podium(ctx):
message = self.get_podium()
await ctx.send(message)
@self.command()
async def driverlist(ctx):
await self.print_driver_list(ctx)
@self.command()
async def q1cut(ctx):
message = self.get_q1_cut()
await ctx.send(message)
@self.command()
async def q2cut(ctx):
message = self.get_q2_cut()
await ctx.send(message)
@self.command()
async def weather(ctx):
message = self.get_weather()
await ctx.send(message)
@self.command()
async def race(ctx):
if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator:
await ctx.send( ":robot::peach: Ready to report for the race!" )
await self._race_report(ctx)
@self.command()
async def quali(ctx):
if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator:
await ctx.send( ":robot::peach: Ready to report on quali!" )
await self._quali_report(ctx)
@self.command()
async def practice(ctx):
if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator:
await ctx.send( ":robot::peach: Ready to report on practice!" )
await self._practice_report(ctx)
@self.command()
async def flap(ctx):
if self.fastest_lap != '':
await ctx.send( self.fastest_lap + self.flag_dict['FLAP'] )
else:
await ctx.send( "No " + self.flag_dict['FLAP'] + " yet." )
@self.command()
async def rbtestfile(ctx):
self.is_reporting = True
if str(ctx.author) == "tamservo#0":
await self._test_file(ctx)
if __name__ == '__main__':
rb = Robottas()
rb.run_robottas()