robottas/robottas.py

1304 lines
45 KiB
Python
Executable File

#!/usr/bin/python3
import asyncio
from bingo import Bingo
import collections.abc
import datetime
import json
import logging
import os
import pathlib
import random
import re
import sqlite3
import sys
from subprocess import Popen
import time
import discord
from discord.ext import commands, tasks
class Robottas(commands.Bot):
# The following section is adapted from code by theOehrly on GitHub FastF1 project
# and is subject to the MIT License under which it was released. Specifically
# the Livetiming client.
@staticmethod
def convert_message(raw):
data = raw.replace("'", '"') \
.replace('True', 'true') \
.replace('False', 'false')
try:
data = json.loads(data)
return data
except json.JSONDecodeError:
return ""
# End section adapted from FastF1
async def on_ready(self):
self._alert_loop.start()
print('Logged in as: {}'.format(self.user))
def run_robottas(self):
self.run(self.token)
async def send_message(self, message):
if self.channel is None:
return
await self.channel.send(message)
@staticmethod
async def send_image(ctx, image_file):
file_name = os.path.dirname(os.path.realpath(__file__))
file_name = os.path.join(file_name, image_file)
with open(file_name, "rb") as handle:
df = discord.File(handle, filename=file_name)
await ctx.send(file=df)
# send and image and also play audio if it is a voice channel
async def send_audio(self, ctx, title, name):
file_name = os.path.dirname(os.path.realpath(__file__))
file_name = os.path.join(file_name, name)
with open(file_name, "rb") as handle:
df = discord.File(handle, filename=title)
await ctx.send(file=df)
def send_delay_message(self, message):
self.message_queue.append((time.time(), message))
async def process_delay_messages(self):
while len(self.message_queue) > 0 and \
self.message_queue[0][0] < time.time() - self.delay:
message = self.message_queue.pop(0)[1]
await self.send_message(message)
await asyncio.sleep(1)
async def send_status_report(self, report):
self.send_delay_message(report)
async def send_lap_report(self, driver):
self.send_delay_message(driver + " laps remaining")
async def load_flag_message(self, message):
flag = message['Flag']
message_text = message['Message']
report = None
if flag == 'GREEN':
report = f"{self.flag_dict['GREEN_FLAG']}" + \
f"{message_text}{self.flag_dict['GREEN_FLAG']}"
elif flag == 'RED':
report = f"{self.flag_dict['RED_FLAG']}{message_text}" + \
f"{self.flag_dict['RED_FLAG']}"
elif flag == 'YELLOW':
report = f"{self.flag_dict['YELLOW_FLAG']}{message_text}" + \
f"{self.flag_dict['YELLOW_FLAG']}"
elif flag == 'DOUBLE YELLOW':
report = f"{self.flag_dict['YELLOW_FLAG']}" + \
f"{self.flag_dict['YELLOW_FLAG']}" + \
f"{message_text}" + \
f"{self.flag_dict['YELLOW_FLAG']}" + \
f"{self.flag_dict['YELLOW_FLAG']}"
elif flag == 'BLACK AND WHITE' and self.report_deleted_lap:
report = f"{message['Message']}"
elif flag == 'CHEQUERED':
report = f"{self.flag_dict['CHECKERED']}" + \
f"{self.flag_dict['CHECKERED']}" + \
f"{self.flag_dict['CHECKERED']}"
# Return None or flag message
return report
async def load_rcm_messages(self, data):
if "Messages" in data.keys():
report = None
for key in data['Messages'].keys():
message = data['Messages'][key]
if 'Category' in message.keys():
category = message["Category"]
category = category.upper()
if category == "FLAG":
report = await self.load_flag_message(message)
elif category == "OTHER":
if self.session_type == "RACE" and "DELETED" in message['Message']:
pass
elif "SLIPPERY" in message['Message'] and \
not self.is_slippery_reported:
self.is_slippery_reported = True
report = "It's slippery out there!"
else:
report = message['Message']
elif category == "DRS":
report = message['Message']
elif category == "CAREVENT" and \
(self.session_type == "PRACTICE" or
self.session_type == "QUALI"):
report = message['Message']
elif category == "SAFETYCAR":
if message["Mode"] == 'VIRTUAL SAFETY CAR':
report = f"{self.flag_dict['VSC']}" + \
f"{message['Message']}" + \
f"{self.flag_dict['VSC']}"
elif message["Mode"] == 'SAFETY CAR':
report = f"{self.flag_dict['SC']}" + \
f"{message['Message']}" + \
f"{self.flag_dict['SC']}"
if report is not None:
await self.send_status_report(report)
async def load_lap_data(self, data):
if "CurrentLap" in data.keys():
current_lap = data["CurrentLap"]
if self.current_lap != current_lap:
self.current_lap = current_lap
# Re-evaluate total laps in case it has changed
# i.e. shortened due to rain.
if "TotalLaps" in data.keys():
self.total_laps = int(data["TotalLaps"])
# Notify on lap change if matches a driver
key = str(self.total_laps - int(current_lap))
if key in self.driver_dict.keys():
await self.send_lap_report(self.driver_dict[key])
def load_podium_data(self, data):
if "Lines" in data.keys():
for position in data["Lines"].keys():
if 'RacingNumber' in data["Lines"][position].keys():
self.podium[int(position)] = data["Lines"][position]['RacingNumber']
def get_podium(self):
if len(self.podium) == 3:
try:
if "?" in self.podium:
return
message = ""
for i in range(3):
pos = 'P' + str(i + 1)
driver = self.driver_dict[self.podium[i]]
message += f":champagne:{driver} " + \
f"{self.flag_dict[pos]}"
if driver == self.fastest_lap:
if driver == self.name_dict['ALO']:
message += " EL"
message += ' ' + self.flag_dict['FLAP']
# Put a new line at the end of each row
message += "\n"
return message
except:
pass
return "I don't know the podium yet :("
def get_q1_cut(self):
message = ""
for i in range(15, 20):
try:
message += self.driver_list[i] + " "
except:
message += "? "
message = message.strip()
return message
def get_q2_cut(self):
message = ""
for i in range(10, 15):
try:
message += self.driver_list[i] + " "
except:
message += "? "
message = message.strip()
return message
def get_weather(self):
if self.weather == "":
return "No weather info yet..."
else:
return self.weather
async def process_race_events(self, session_data, status_data):
# Hold the next event to report, and the type
event = None
event_type = None
# Hold report
report = None
while (len(session_data) > 0 or len(status_data) > 0) and self.is_reporting:
# If only one of them has data, use that one
if len(session_data) == 0:
event = status_data.pop(0)
event_type = "STATUS"
elif len(status_data) == 0:
event = session_data.pop(0)
event_type = "SESSION"
# If they both have data, use the one with the oldest timestamp
else:
session_time = session_data[0]["Utc"]
status_time = status_data[0]["Utc"]
if session_time < status_time:
event = session_data.pop(0)
event_type = "SESSION"
else:
event = status_data.pop(0)
event_type = "STATUS"
# At this point we have the event. Generate the report based on the type.
report = None
if event_type == "SESSION":
# Get the lap from the event
cur_lap = event["Lap"]
# Make sure we haven't already reported on this lap
if self.current_lap < cur_lap:
self.current_lap = cur_lap
# Get the laps remaining key to see if there's a driver that matches
laps_key = str(self.total_laps - cur_lap)
# If there's a matching driver, then send a message
if laps_key in self.driver_dict.keys():
await self.send_lap_report(self.driver_dict[laps_key])
# Must be a status event
else:
key = None
if "TrackStatus" in event.keys():
key = "TrackStatus"
elif "SessionStatus" in event.keys():
key = "SessionStatus"
if key is not None:
track_status = event[key].upper()
# Check the track status to determine what report to send
if track_status == "ALLCLEAR":
report = f"{self.flag_dict['GREEN_FLAG']}" + \
f"{self.flag_dict['GREEN_FLAG']}{self.flag_dict['GREEN_FLAG']}"
elif track_status == "YELLOW":
report = f"{self.flag_dict['YELLOW_FLAG']}" + \
f"{self.flag_dict['YELLOW_FLAG']}{self.flag_dict['YELLOW_FLAG']}"
elif track_status == "RED":
report = f"{self.flag_dict['RED_FLAG']}" + \
f"{self.flag_dict['RED_FLAG']}{self.flag_dict['RED_FLAG']}"
elif track_status == "FINISHED":
report = f"{self.flag_dict['RACE_FINISHED']}" + \
f"{self.flag_dict['RACE_FINISHED']}{self.flag_dict['RACE_FINISHED']}"
if report is not None:
await self.send_status_report(report)
def load_weather_data(self, data):
weather_txt = "Track Weather Report\n"
for k in data.keys():
if k != "_kf":
weather_txt += f"{k}: {data[k]}\n"
self.weather = weather_txt
def load_timing_stats_data(self, data):
if "Lines" in data.keys():
lines = data["Lines"]
for driver_num in lines.keys():
line = lines[driver_num]
if "PersonalBestLapTime" in line.keys():
position = -1
try:
position = line["PersonalBestLapTime"]["Position"]
except:
pass
if position == 1:
self.fastest_lap = self.driver_dict[driver_num]
return
def load_driver_data(self, data):
for driver in data.keys():
position = data[driver]["Line"]
if driver in self.driver_dict.keys():
self.driver_list[position - 1] = self.driver_dict[driver]
else:
self.driver_list[position - 1] = driver
async def print_driver_range(self, ctx, start, stop):
message = ""
for i in range(start, stop):
driver = self.driver_list[i]
message += f"P{i + 1}: {driver}"
if driver == self.fastest_lap:
if driver == self.name_dict['ALO']:
message += " EL"
message += ' ' + self.flag_dict['FLAP']
message += "\n"
await ctx.send(message)
async def print_driver_list(self, ctx):
# Send 1-10, then 11-20
await self.print_driver_range(ctx, 0, 10)
await self.print_driver_range(ctx, 10, 20)
def load_initial(self, message):
# Load podium data
if 'R' in message.keys():
if 'TopThree' in message['R'].keys():
top_three = message['R']['TopThree']
if 'Lines' in top_three.keys() and \
len(top_three['Lines']) == 3:
for i in range(3):
self.podium[i] = top_three['Lines'][i]['RacingNumber']
# Load driver list
if 'DriverList' in message['R'].keys():
for driver_num in message['R']['DriverList'].keys():
if driver_num == '_kf':
continue
position = message['R']['DriverList'][driver_num]['Line']
driver = '???'
if driver_num in self.driver_dict.keys():
driver = self.driver_dict[driver_num]
else:
driver = driver_num
#self.driver_list[int(position) - 1] = self.driver_dict[driver_num]
self.driver_list[int(position) - 1] = driver
# Load lap data
if 'LapCount' in message['R'].keys():
if 'TotalLaps' in message['R']['LapCount'].keys():
self.total_laps = int(message['R']['LapCount']['TotalLaps'])
# Load weather data
if 'WeatherData' in message['R'].keys():
weather_obj = message['R']['WeatherData']
weather_text = "Track Weather Report\n"
for k in weather_obj.keys():
if k != "_kf":
weather_text += f"{k}: {weather_obj[k]}\n"
self.weather = weather_text
# Load fastest lap data
if 'TimingStats' in message['R'].keys():
flap_obj = message['R']['TimingStats']
self.load_timing_stats_data(flap_obj)
async def process_message(self, message):
try:
if isinstance(message, collections.abc.Sequence):
if message[0] == 'Heartbeat':
return
elif message[0] == 'DriverList':
self.load_driver_data(message[1])
elif message[0] == 'TopThree':
self.load_podium_data(message[1])
elif message[0] == 'RaceControlMessages':
await self.load_rcm_messages(message[1])
elif message[0] == 'LapCount':
await self.load_lap_data(message[1])
elif message[0] == 'WeatherData':
self.load_weather_data(message[1])
elif message[0] == 'TimingStats':
self.load_timing_stats_data(message[1])
else:
pass
# Check to see if this is the initial "R" record from the response
elif "R" in message.keys():
if "R" in message:
self.load_initial(message)
except Exception as e:
pass
def get_messages_from_db(self):
try:
messages = []
con = sqlite3.connect(self.dbfile)
cur = con.cursor()
cur2 = con.cursor()
for row in cur.execute('select id, message from messages order by id asc'):
messages.append(self.convert_message(row[1]))
# Now that we have the message, delete this row from the dbfile
cur2.execute(f"delete from messages where id = {row[0]}")
con.commit()
cur.close()
cur2.close()
con.close()
return messages
except:
return []
async def _race_report(self, ctx):
self.report_deleted_lap = False
self.session_type = 'RACE'
await self._report(ctx)
async def _quali_report(self, ctx):
self.report_deleted_lap = True
self.session_type = 'QUALI'
await self._report(ctx)
async def _practice_report(self, ctx):
self.report_deleted_lap = True
self.session_type = 'PRACTICE'
await self._report(ctx)
async def _report(self, ctx):
self.is_reporting = True
self.channel = ctx.channel
await self.start_collect()
while self.is_reporting:
# Do processing
# process any new messages in the db
messages = self.get_messages_from_db()
try:
for message in messages:
await self.process_message(message)
await asyncio.sleep(3)
except:
pass
# process any messages in the delay queue
await self.process_delay_messages()
# If collecting, make sure the collection process is running
if self.is_collecting:
if self.collector_proc == None or \
self.collector_proc.poll() != None:
await self.start_collect()
# Loop that will handle checking if there are alerts scheduled for
# this day, hour, minute, and send the alerts to channels that are registered
@tasks.loop(seconds=30)
async def _alert_loop(self):
try:
# Check to see if there is an alert scheduled for this day, hour miniute
current_dt = datetime.datetime.now()
(day, hr, mn) = (current_dt.weekday(), current_dt.hour, current_dt.minute)
con = sqlite3.connect('alerts.db')
cur = con.cursor()
query = "Select type, day, hour, minute, last_sent from alert_schedule"
cur.execute(query)
rows = cur.fetchall()
for row in rows:
(db_type, db_day, db_hr, db_mn, last_sent) = row
# Make sure last sent is > than 1 minute
if last_sent is not None and last_sent != "":
last_dt = datetime.datetime.fromisoformat(last_sent)
if last_dt.day == current_dt.day and \
last_dt.hour == current_dt.hour and \
last_dt.minute == current_dt.minute and \
last_dt.month == current_dt.month and \
last_dt.year == current_dt.year:
# If we haven't changed even a minute since last run, then skip this run
continue
if day == db_day and \
hr == db_hr and \
mn == db_mn:
# We want to send out alerts for this type to any channels registered
# Get channels registered for that type
query = "Select channel_id from alert_channels where type = ?"
cur.execute(query, (db_type,))
# For each registered channel, send the alert.
channel_ids = cur.fetchall()
for channel_id in channel_ids:
channel = self.get_channel(int(channel_id[0]))
if db_type == 'race':
await self.report_next_race(channel)
if db_type == 'event':
await self.report_next_event(channel)
# Record that we sent this alert out at this time
query = "Update alert_schedule set last_sent = ? where " + \
"type = ? and " + \
"day = ? and " + \
"hour = ? and " + \
"minute = ?"
cur.execute(query, (current_dt.isoformat(), db_type, db_day, db_hr, db_mn))
con.commit()
cur.close()
con.close()
except Exception as e:
print(f"Error sending alert {e['Message']}", file=sys.stderr)
@staticmethod
def get_token(token_file):
with open(token_file) as tok:
return tok.readline().strip()
async def start_collect(self):
await self.stop_collect()
self.is_collecting = True
self.is_slippery_reported = False
dir_path = os.path.dirname(os.path.realpath(__file__))
command_txt = os.path.join(dir_path, self.collector_command)
command_txt += self.collector_params
self.collector_proc = Popen(command_txt.split())
async def stop_collect(self):
self.is_collecting = False
try:
if self.collector_proc != None:
self.collector_proc.kill()
except:
pass
def decode_watched(self, w):
if w == 0:
return 'Not Watched Yet'
else:
return 'Watched Already'
def get_delta_str(self, delta):
min_str = "minute"
hour_str = "hour"
day_str = "day"
delta_str = str(delta)
(days, junk, rest) = delta_str.split(" ")
rest = rest.split(".")[0]
(hours, minutes, seconds) = rest.split(":")
if int(days) > 1:
day_str = "days"
if int(minutes) > 1:
min_str = "minutes"
if int(hours) > 1:
hour_str = "hours"
return f"{days} {day_str} {hours} {hour_str} {minutes} {min_str}"
async def report_next_event(self, ctx):
try:
tz = datetime.timezone.utc
con = sqlite3.connect('schedule.db')
cur = con.cursor()
t1 = datetime.datetime.now(tz=tz)
now_str = f"{t1.year}-{t1.month:02d}-{t1.day:02d} {t1.hour:02d}:{t1.minute:02d}:{t1.second}"
query = 'select * from schedule where date_start > ? ' + \
'order by date_start asc limit 1'
cur.execute(query, (now_str,))
rows = cur.fetchall()
for row in rows:
t2 = datetime.datetime.fromisoformat(row[3] + "+00:00")
delta = t2 - t1
delta_str = self.get_delta_str(delta)
message = f"The next event is Round {row[5]}: {row[1]} in {row[4]} which is {delta_str} from now."
await ctx.send(message)
break # There should only be one row anyway
except:
await ctx.send("Sorry, hit the wall trying to find the answer...")
async def report_next_race(self, ctx):
try:
tz = datetime.timezone.utc
con = sqlite3.connect('schedule.db')
cur = con.cursor()
t1 = datetime.datetime.now(tz=tz)
now_str = f"{t1.year}-{t1.month:02d}-{t1.day:02d} {t1.hour:02d}:{t1.minute}:{t1.second}"
query = "SELECT * FROM schedule WHERE date_start > ? AND " + \
"session_type = 'Race' ORDER BY date_start ASC LIMIT 1"
cur.execute(query, (now_str,))
rows = cur.fetchall()
for row in rows:
t2 = datetime.datetime.fromisoformat(row[3] + "+00:00")
delta = t2 - t1
delta_str = self.get_delta_str(delta)
message = f"The next race is Round {row[5]}: {row[1]} in {row[4]} which is {delta_str} from now."
await ctx.send(message)
break
except:
await ctx.send("Sorry, hit the wall tring to find the next race.")
async def report_all_races(self, ctx):
try:
con = sqlite3.connect('schedule.db')
cur = con.cursor()
query = "SELECT * FROM schedule where session_type = 'Race' ORDER BY date_start ASC"
cur.execute(query)
rows = cur.fetchall()
await ctx.send( "All times UTC\n" )
for row in rows:
await ctx.send( f"Round {row[5]}: {row[1]} in {row[4]} which takes place {row[3]}\n" )
except:
await ctx.send("Sorry, hit the wall tring to show all races.")
# Register to alert for next race
async def register_next_race_alerts(self, ctx):
try:
# Get the id of the channel from the ctx
channel_id = ctx.channel.id
# Save this id to the alert_channels in the alert db.
con = sqlite3.connect('alerts.db')
cur = con.cursor()
# See if there is already a record
query = "SELECT COUNT(*) FROM alert_channels WHERE type = ? AND channel_id = ?"
cur.execute(query, ("race", channel_id))
rows = cur.fetchall()
count = rows[0][0]
if count > 0:
await ctx.send("This thread is already registered to get race alerts.")
else:
query = "INSERT INTO alert_channels (type, channel_id) VALUES (?, ?)"
cur.execute(query, ("race", channel_id))
con.commit()
await ctx.send("This channel has been registered to receive upcoming race alerts.")
cur.close()
con.close()
except:
if cur is not None:
cur.close()
if con is not None:
con.close()
# Register to alert for next event
async def register_next_event_alerts(self, ctx):
try:
# Get the id of the channel from the ctx
channel_id = ctx.channel.id
# Save this id to the alert channels in the alert db
con = sqlite3.connect('alerts.db')
cur = con.cursor()
# See if there is already a record
query = "SELECT COUNT(*) FROM alert_channels WHERE type = ? and channel_id = ?"
cur.execute(query, ("event", channel_id))
rows = cur.fetchall()
count = rows[0][0]
if count > 0:
await ctx.send("This thread is already registered to get event alerts.")
else:
query = "INSERT INTO alert_channels (type, channel_id) VALUES (?, ?)"
cur.execute(query, ("event", channel_id))
con.commit()
await ctx.send("This channel has been registered to receive upcoming event alerts.")
cur.close()
con.close()
except:
if cur is not None:
cur.close()
if con is not None:
con.close()
# Unregister channel from alerts
async def unregister_alerts(self, ctx):
try:
# Get the channel id from the ctx
channel_id = ctx.channel.id
# Remove records with this id from the db
con = sqlite3.connect('alerts.db')
cur = con.cursor()
query = "DELETE FROM alert_channels WHERE channel_id = ?"
cur.execute(query, (channel_id,))
con.commit()
cur.close()
con.close()
except:
if cur is not None:
cur.close()
if con is not None:
con.close()
def __init__(self):
# Set debug or not
self.debug = True
self.bingo = Bingo()
# Discord authentication token
self.token = self.get_token("token.txt")
self.collector_command = "robottas_collector.py"
self.collector_params = " save dummy.txt"
self.collector_proc = None
self.is_collecting = False
# Preface messages with the following
self.report_preamble = ':robot::peach: Alert!'
# Holds processing thread
self.bg_task = None
# Hold db file
self.dbfile = "messages.db"
# Holds current lap
self.current_lap = 0
# Hold podium places
self.podium = ['?', '?', '?']
# Hold driver list
self.driver_list = ['?', '?', '?', '?', '?', '?', '?', '?', '?', '?',
'?', '?', '?', '?', '?', '?', '?', '?', '?', '?']
# Hold weather info
self.weather = ""
# Hold weather we have reported on slippery track.
self.is_slippery_reported = False
# Hold lap info
self.current_lap = -1000
self.total_laps = -1000
self.lap_reported = 0
# Hold the driver with the fastest lap
self.fastest_lap = ''
# Holds dictionary for number to icon
self.driver_dict = {
'1': '<:VER:1067541523748630570>',
'3': '<:RIC:1067870312949108887>',
'5': '<:VET:1067964065516884079>',
'11': '<:PER:1067822335123525732>',
'14': '<:ALO:1067876094033793054>',
'40': ':kiwi:',
'44': '<:HAM:1067828533746991165>',
'55': '<:SAI:1067824776502067270>',
'63': '<:RUS:1067831294748274728>',
'16': '<:LEC:1067544797050585198>',
'18': '<:STR:1067871582854336663>',
'4': '<:NOR:1067840487593082941>',
'10': '<:GAS:1067836596495327283>',
'27': '<:HUL:1067880110918742187>',
'31': '<:OCO:1067834157465612398>',
'77': '<:BOT:1067819716527276032>',
'81': '<:PIA:1067844998369914961>',
'24': '<:ZHO:1067865955117568010>',
'22': '<:TSU:1067888851676315660>',
'20': '<:MAG:1067883814992486510>',
'23': '<:ALB:1067874026871074887>',
'2': '<:SAR:1067890949197414410>',
}
# Holds dictionary for driver 3-letter code to icon
self.name_dict = {
'VER': '<:VER:1067541523748630570>',
'RIC': '<:RIC:1067870312949108887>',
'VET': '<:VET:1067964065516884079>',
'PER': '<:PER:1067822335123525732>',
'ALO': '<:ALO:1067876094033793054>',
'HAM': '<:HAM:1067828533746991165>',
'SAI': '<:SAI:1067824776502067270>',
'RUS': '<:RUS:1067831294748274728>',
'LEC': '<:LEC:1067544797050585198>',
'STR': '<:STR:1067871582854336663>',
'NOR': '<:NOR:1067840487593082941>',
'GAS': '<:GAS:1067836596495327283>',
'HUL': '<:HUL:1067880110918742187>',
'LAW': ':kiwi:',
'OCO': '<:OCO:1067834157465612398>',
'BOT': '<:BOT:1067819716527276032>',
'PIA': '<:PIA:1067844998369914961>',
'ZHO': '<:ZHO:1067865955117568010>',
'TSU': '<:TSU:1067888851676315660>',
'MAG': '<:MAG:1067883814992486510>',
'ALB': '<:ALB:1067874026871074887>',
'SAR': '<:SAR:1067890949197414410>',
}
# Holds dictionary for race states to icons
self.flag_dict = {
'GREEN_FLAG': '<:GREEN:1107401338993782804>',
'YELLOW_FLAG': '<:YELLOW:1091801442714652815>',
'RED_FLAG': '<:RED:1091801383998586912>',
'VSC': '<:VSC:1107401410183704596>',
'VIRTUAL_SAFETY_CAR': '<:VSC:1107401410183704596>',
'SC': '<:SC:1107401472041304104>',
'SAFETY_CAR': '<:SC:1107401472041304104>',
'CHECKERED': '<:CHECKERED:1091801509039194152>',
'RACE_FINISHED': '<:CHECKERED:1091801509039194152>',
'STARTED': '<:CHECKERED:1091801509039194152>',
'P1': ':first_place:',
'P2': ':second_place:',
'P3': ':third_place:',
'FLAP': '<:FLap4:1122601036952129547>'
}
# Bot configuration info
self.bot_intents = discord.Intents.default()
self.bot_intents.message_content = True
self.description = "robotas - f1 bot - !rbhelp for commands"
self.command_prefix = '!'
# Track whether to report, and what channel to report to
self.is_reporting = False
self.report_id = None
self.started = False
# Hold message delay
self.delay = 45
self.message_queue = []
# Hold whether to report deleted lap messages
self.report_deleted_lap = False
self.session_type = ''
# Test file
self.test_file = "test.json"
# Race db patterns
self.loc_pattern = re.compile('[\w\s\-\d]+')
self.yr_pattern = re.compile('\d{4}')
### END FastF1 adapted section ###
super().__init__(command_prefix=self.command_prefix,
description=self.description,
intents=self.bot_intents)
# Setup commands
@self.command()
async def rbhelp(ctx):
await ctx.send("commands: \n" +
"!rbhelp - Print this help message " +
"(but you knew that already)\n" +
"!next_event - Prints time until the next event.\n" +
"!next_race - Prints time until the next race.\n" +
"!all_races - Prints the loation and date of all races in the schedule.\n" +
"!rbname - I will tell you my name.\n" +
"!rbroot - I will tell you who I root for.\n" +
"!rbreport - I will start race reporting in this channel. " +
"I will try to tell you about current flags, laps left,\n" +
"and safety cars.\n" +
"!rbstop - Stop reporting.\n" +
"!podium - Display podium positions.\n" +
"!q1cut - Show drivers in Q1 cut positions.\n" +
"!q2cut - Show drivers in Q2 cut positions.\n" +
"!rbdelay - Set the race messaging delay in seconds.\n" +
"The following display race control messages:\n" +
" !animal\n" +
" !bwoken\n" +
" !calm\n" +
" !forecast\n" +
" !grandma\n" +
" !grass\n" +
" !liked\n" +
" !no\n" +
" !noengine\n" +
" !pants\n" +
" !paddock\n" +
" !penalty\n" +
" !stupid\n" +
" !undercut\n" +
"The following register / unregister scheduled next race / event messages.\n" +
"!register_next_race_alerts - get an alert for the next race on Monday.\n" +
"!register_next_event_alerts - get an alert for the next event on Monday.\n" +
"!unregister_alerts - stop getting alerts on this channel.\n"
)
@self.command()
async def rbroot(ctx):
await ctx.send("Rooting for :ferry::peach: of course!\n" +
self.name_dict["BOT"] + ":smiling_face_with_3_hearts:")
@self.command()
async def rbname(ctx):
await ctx.send("Hello, my name is Robottas, pronounced :robot::peach:")
@self.command()
async def rbdelay(ctx, delay):
try:
secs = max(0, int(delay))
if secs < 10 or secs > 300:
await ctx.send("Delay must be between 10 and 300.")
else:
self.delay = secs
await ctx.send(f"Delay set to {secs} seconds.")
except:
await ctx.send(f"Invalid delay value passed to rbdelay.")
@self.command()
async def rbstop(ctx):
self.is_reporting = False
self.report_id = None
await self.stop_collect()
await ctx.send(":robot::peach: powering down")
@self.command()
async def podium(ctx):
message = self.get_podium()
await ctx.send(message)
@self.command()
async def driverlist(ctx):
await self.print_driver_list(ctx)
@self.command()
async def q1cut(ctx):
message = self.get_q1_cut()
await ctx.send(message)
@self.command()
async def q2cut(ctx):
message = self.get_q2_cut()
await ctx.send(message)
@self.command()
async def weather(ctx):
message = self.get_weather()
await ctx.send(message)
@self.command()
async def race(ctx):
if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator:
await ctx.send(":robot::peach: Ready to report for the race!")
await self._race_report(ctx)
@self.command()
async def quali(ctx):
if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator:
await ctx.send(":robot::peach: Ready to report on quali!")
await self._quali_report(ctx)
@self.command()
async def practice(ctx):
if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator:
await ctx.send(":robot::peach: Ready to report on practice!")
await self._practice_report(ctx)
@self.command()
async def flap(ctx):
if self.fastest_lap != '':
await ctx.send(self.flag_dict['FLAP'] + self.fastest_lap)
else:
await ctx.send("No " + self.flag_dict['FLAP'] + " yet.")
# @self.command()
# async def rbtestfile(ctx):
# self.is_reporting = True
# if str(ctx.author) == "tamservo#0":
# await self._test_file(ctx)
@self.command()
async def start_collect(ctx):
if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator:
# if an authorized user, start the collection script that
# puts records into the database
await self.start_collect()
@self.command()
async def danger(ctx):
await ctx.send("That's some dangerous driving! " + self.name_dict["HAM"])
@self.command()
async def dotd(ctx):
await ctx.send("I don't know, probably " + self.name_dict["ALB"])
@self.command()
async def flip(ctx):
await ctx.send(random.choice(["Heads", "Tails"]))
@self.command()
async def gp2(ctx):
await ctx.send("GP2 engine! GP2! ARGHHH! " + self.name_dict["ALO"])
@self.command()
async def iloveyoubot(ctx):
await ctx.send(f"I am incapable of love, but for you, {ctx.author.display_name}, I will make an exception.")
@self.command()
async def quote(ctx):
try:
con = sqlite3.connect('races.db')
cur = con.cursor()
for row in cur.execute('select * from quotes order by Random() limit 1'):
message = row[0] + " -- " + row[1] + " " + row[2]
await ctx.send(message)
break # There should only be one row anyway
cur.close()
con.close()
except:
ctx.send("Can't think of a quote for some reason...")
@self.command()
@commands.has_permissions(administrator=True)
async def race_watched(ctx, loc, yr):
if re.match(self.loc_pattern, loc) and \
re.match(self.yr_pattern, yr):
try:
con = sqlite3.connect('races.db')
cur = con.cursor()
cur.execute('update races ' +
'set watched = 1 ' +
'where location = ? ' +
'year = ?', (loc, yr))
con.commit()
cur.close()
con.close()
await ctx.send(f"{loc} {yr} marked as watched.")
except:
await ctx.send(f"Couldn't mark {loc} {yr} as watched for some reason.")
@self.command()
async def rand_race(ctx):
try:
con = sqlite3.connect('races.db')
cur = con.cursor()
for row in cur.execute('select * from races ' +
'order by Random() ' +
'limit 1'):
watched = self.decode_watched(row[2])
await ctx.send(f"Location: {row[0]} Year: {row[1]} {watched}")
break
cur.close()
con.close()
except:
ctx.send("Can't find a random race for some reason.")
@self.command()
async def rand_race_new(ctx):
try:
con = sqlite3.connect('races.db')
cur = con.cursor()
for row in cur.execute('select * from races ' +
'where watched = 0 ' +
'order by Random() ' +
'limit 1'):
await ctx.send(f"Location: {row[0]} Year: {row[1]}")
break
cur.close()
con.close()
except:
ctx.send("Can't pick a race for some reason.")
@self.command()
async def tyres(ctx):
await ctx.send("Bono, my tyres are gone " + self.name_dict["HAM"])
# Commands that send images
@self.command()
async def bingo(ctx):
await self.send_image(ctx, "images/bingo_win.png")
@self.command()
async def calm(ctx):
await self.send_image(ctx, "images/calm.png")
@self.command()
async def grandma(ctx):
await self.send_image(ctx, "images/grandma.png")
@self.command()
async def grass(ctx):
await self.send_image(ctx, "images/grass.png")
@self.command()
async def forecast(ctx):
await self.send_image(ctx, "images/forecast.png")
@self.command()
async def liked(ctx):
await self.send_image(ctx, "images/liked.png")
@self.command()
async def no(ctx):
await self.send_image(ctx, "images/no.png")
await self.send_audio(ctx, "no.mp3", "audio/no.mp3")
@self.command()
async def noengine(ctx):
await self.send_image(ctx, "images/noengine.png")
@self.command()
async def pants(ctx):
await self.send_image(ctx, "images/pants.png")
@self.command()
async def paddock(ctx):
await self.send_image(ctx, "images/paddock.png")
@self.command()
async def penalty(ctx):
await self.send_image(ctx, "images/penalty.png")
@self.command()
async def stupid(ctx):
await self.send_image(ctx, "images/stupid.png")
@self.command()
async def undercut(ctx):
await self.send_image(ctx, "images/undercut.png")
@self.command()
async def animal(ctx):
await self.send_image(ctx, "images/animal.png")
@self.command()
async def bwoken(ctx):
await self.send_image(ctx, "images/bwoken.png")
await self.send_audio(ctx, "bwoken.mp3", "audio/bwoken.mp3")
## Calendar Commands
# Give days, hours, minutes until the next event
@self.command()
async def next_event(ctx):
await self.report_next_event(ctx)
# Give days, hours, minutes until the next race
@self.command()
async def next_race(ctx):
await self.report_next_race(ctx)
# Show all races
@self.command()
async def all_races(ctx):
await self.report_all_races(ctx)
# Register to get monday next race alerts
@self.command()
async def register_next_race_alerts(ctx):
await self.register_next_race_alerts(ctx)
# Register to get monday next event alerts
@self.command()
async def register_next_event_alerts(ctx):
await self.register_next_event_alerts(ctx)
# Unregister to get monday alerts
@self.command()
async def unregister_alerts(ctx):
await self.unregister_alerts(ctx)
# Bingo card
@self.command()
async def bingo_card(ctx):
card_file = self.bingo.get_card()
await self.send_image(ctx, card_file)
pathlib.Path.unlink(card_file)
if __name__ == '__main__':
rb = Robottas()
rb.run_robottas()