1333 lines
46 KiB
Python
Executable File
1333 lines
46 KiB
Python
Executable File
#!/usr/bin/python3
|
|
|
|
import asyncio
|
|
from bingo import Bingo
|
|
import binger
|
|
import collections.abc
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import random
|
|
import re
|
|
import sqlite3
|
|
import sys
|
|
from subprocess import Popen
|
|
import time
|
|
import discord
|
|
|
|
from discord.ext import commands, tasks
|
|
|
|
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
|
|
|
|
|
|
class Robottas(commands.Bot):
|
|
|
|
# The following section is adapted from code by theOehrly on GitHub FastF1 project
|
|
# and is subject to the MIT License under which it was released. Specifically
|
|
# the Livetiming client.
|
|
|
|
@staticmethod
|
|
def convert_message(raw):
|
|
data = raw.replace("'", '"') \
|
|
.replace('True', 'true') \
|
|
.replace('False', 'false')
|
|
|
|
try:
|
|
data = json.loads(data)
|
|
return data
|
|
except json.JSONDecodeError:
|
|
return ""
|
|
|
|
# End section adapted from FastF1
|
|
|
|
async def on_ready(self):
|
|
self._alert_loop.start()
|
|
print('Logged in as: {}'.format(self.user))
|
|
|
|
def run_robottas(self):
|
|
self.run(self.token)
|
|
|
|
async def send_message(self, message):
|
|
if self.channel is None:
|
|
return
|
|
|
|
await self.channel.send(message)
|
|
|
|
@staticmethod
|
|
async def send_image(ctx, image_file):
|
|
file_name = os.path.dirname(os.path.realpath(__file__))
|
|
file_name = os.path.join(file_name, image_file)
|
|
with open(file_name, "rb") as handle:
|
|
df = discord.File(handle, filename=file_name)
|
|
await ctx.send(file=df)
|
|
|
|
# send and image and also play audio if it is a voice channel
|
|
async def send_audio(self, ctx, title, name):
|
|
file_name = os.path.dirname(os.path.realpath(__file__))
|
|
file_name = os.path.join(file_name, name)
|
|
with open(file_name, "rb") as handle:
|
|
df = discord.File(handle, filename=title)
|
|
await ctx.send(file=df)
|
|
|
|
|
|
def send_delay_message(self, message):
|
|
self.message_queue.append((time.time(), message))
|
|
|
|
async def process_delay_messages(self):
|
|
while len(self.message_queue) > 0 and \
|
|
self.message_queue[0][0] < time.time() - self.delay:
|
|
message = self.message_queue.pop(0)[1]
|
|
await self.send_message(message)
|
|
await asyncio.sleep(1)
|
|
|
|
async def send_status_report(self, report):
|
|
self.send_delay_message(report)
|
|
|
|
async def send_lap_report(self, driver):
|
|
self.send_delay_message(driver + " laps remaining")
|
|
|
|
async def load_flag_message(self, message):
|
|
flag = message['Flag']
|
|
message_text = message['Message']
|
|
report = None
|
|
|
|
if flag == 'GREEN':
|
|
report = f"{self.flag_dict['GREEN_FLAG']}" + \
|
|
f"{message_text}{self.flag_dict['GREEN_FLAG']}"
|
|
|
|
elif flag == 'RED':
|
|
report = f"{self.flag_dict['RED_FLAG']}{message_text}" + \
|
|
f"{self.flag_dict['RED_FLAG']}"
|
|
|
|
elif flag == 'YELLOW':
|
|
report = f"{self.flag_dict['YELLOW_FLAG']}{message_text}" + \
|
|
f"{self.flag_dict['YELLOW_FLAG']}"
|
|
|
|
elif flag == 'DOUBLE YELLOW':
|
|
report = f"{self.flag_dict['YELLOW_FLAG']}" + \
|
|
f"{self.flag_dict['YELLOW_FLAG']}" + \
|
|
f"{message_text}" + \
|
|
f"{self.flag_dict['YELLOW_FLAG']}" + \
|
|
f"{self.flag_dict['YELLOW_FLAG']}"
|
|
|
|
elif flag == 'BLACK AND WHITE' and self.report_deleted_lap:
|
|
report = f"{message['Message']}"
|
|
|
|
elif flag == 'CHEQUERED':
|
|
report = f"{self.flag_dict['CHECKERED']}" + \
|
|
f"{self.flag_dict['CHECKERED']}" + \
|
|
f"{self.flag_dict['CHECKERED']}"
|
|
|
|
# Return None or flag message
|
|
return report
|
|
|
|
async def load_rcm_messages(self, data):
|
|
if "Messages" in data.keys():
|
|
report = None
|
|
for key in data['Messages'].keys():
|
|
message = data['Messages'][key]
|
|
if 'Category' in message.keys():
|
|
category = message["Category"]
|
|
category = category.upper()
|
|
|
|
if category == "FLAG":
|
|
report = await self.load_flag_message(message)
|
|
|
|
elif category == "OTHER":
|
|
if self.session_type == "RACE" and "DELETED" in message['Message']:
|
|
pass
|
|
elif "SLIPPERY" in message['Message'] and \
|
|
not self.is_slippery_reported:
|
|
self.is_slippery_reported = True
|
|
report = "It's slippery out there!"
|
|
else:
|
|
report = message['Message']
|
|
|
|
elif category == "DRS":
|
|
report = message['Message']
|
|
|
|
elif category == "CAREVENT" and \
|
|
(self.session_type == "PRACTICE" or
|
|
self.session_type == "QUALI"):
|
|
report = message['Message']
|
|
|
|
elif category == "SAFETYCAR":
|
|
if message["Mode"] == 'VIRTUAL SAFETY CAR':
|
|
report = f"{self.flag_dict['VSC']}" + \
|
|
f"{message['Message']}" + \
|
|
f"{self.flag_dict['VSC']}"
|
|
|
|
elif message["Mode"] == 'SAFETY CAR':
|
|
report = f"{self.flag_dict['SC']}" + \
|
|
f"{message['Message']}" + \
|
|
f"{self.flag_dict['SC']}"
|
|
|
|
if report is not None:
|
|
await self.send_status_report(report)
|
|
|
|
async def load_lap_data(self, data):
|
|
if "CurrentLap" in data.keys():
|
|
current_lap = data["CurrentLap"]
|
|
|
|
if self.current_lap != current_lap:
|
|
self.current_lap = current_lap
|
|
|
|
# Re-evaluate total laps in case it has changed
|
|
# i.e. shortened due to rain.
|
|
if "TotalLaps" in data.keys():
|
|
self.total_laps = int(data["TotalLaps"])
|
|
# Notify on lap change if matches a driver
|
|
key = str(self.total_laps - int(current_lap))
|
|
if key in self.driver_dict.keys():
|
|
await self.send_lap_report(self.driver_dict[key])
|
|
|
|
def load_podium_data(self, data):
|
|
if "Lines" in data.keys():
|
|
for position in data["Lines"].keys():
|
|
if 'RacingNumber' in data["Lines"][position].keys():
|
|
self.podium[int(position)] = data["Lines"][position]['RacingNumber']
|
|
|
|
def get_podium(self):
|
|
if len(self.podium) == 3:
|
|
try:
|
|
if "?" in self.podium:
|
|
return
|
|
|
|
message = ""
|
|
|
|
for i in range(3):
|
|
pos = 'P' + str(i + 1)
|
|
driver = self.driver_dict[self.podium[i]]
|
|
message += f":champagne:{driver} " + \
|
|
f"{self.flag_dict[pos]}"
|
|
|
|
if driver == self.fastest_lap:
|
|
if driver == self.name_dict['ALO']:
|
|
message += " EL"
|
|
|
|
message += ' ' + self.flag_dict['FLAP']
|
|
|
|
# Put a new line at the end of each row
|
|
message += "\n"
|
|
|
|
return message
|
|
|
|
except:
|
|
pass
|
|
|
|
return "I don't know the podium yet :("
|
|
|
|
def get_q1_cut(self):
|
|
message = ""
|
|
for i in range(15, 20):
|
|
try:
|
|
message += self.driver_list[i] + " "
|
|
except:
|
|
message += "? "
|
|
|
|
message = message.strip()
|
|
return message
|
|
|
|
def get_q2_cut(self):
|
|
message = ""
|
|
|
|
for i in range(10, 15):
|
|
try:
|
|
message += self.driver_list[i] + " "
|
|
except:
|
|
message += "? "
|
|
|
|
message = message.strip()
|
|
return message
|
|
|
|
def get_weather(self):
|
|
if self.weather == "":
|
|
return "No weather info yet..."
|
|
else:
|
|
return self.weather
|
|
|
|
async def process_race_events(self, session_data, status_data):
|
|
# Hold the next event to report, and the type
|
|
event = None
|
|
event_type = None
|
|
|
|
# Hold report
|
|
report = None
|
|
while (len(session_data) > 0 or len(status_data) > 0) and self.is_reporting:
|
|
# If only one of them has data, use that one
|
|
if len(session_data) == 0:
|
|
event = status_data.pop(0)
|
|
event_type = "STATUS"
|
|
elif len(status_data) == 0:
|
|
event = session_data.pop(0)
|
|
event_type = "SESSION"
|
|
# If they both have data, use the one with the oldest timestamp
|
|
else:
|
|
session_time = session_data[0]["Utc"]
|
|
status_time = status_data[0]["Utc"]
|
|
|
|
if session_time < status_time:
|
|
event = session_data.pop(0)
|
|
event_type = "SESSION"
|
|
|
|
else:
|
|
event = status_data.pop(0)
|
|
event_type = "STATUS"
|
|
|
|
# At this point we have the event. Generate the report based on the type.
|
|
report = None
|
|
|
|
if event_type == "SESSION":
|
|
# Get the lap from the event
|
|
cur_lap = event["Lap"]
|
|
|
|
# Make sure we haven't already reported on this lap
|
|
if self.current_lap < cur_lap:
|
|
self.current_lap = cur_lap
|
|
|
|
# Get the laps remaining key to see if there's a driver that matches
|
|
laps_key = str(self.total_laps - cur_lap)
|
|
|
|
# If there's a matching driver, then send a message
|
|
if laps_key in self.driver_dict.keys():
|
|
await self.send_lap_report(self.driver_dict[laps_key])
|
|
|
|
# Must be a status event
|
|
else:
|
|
key = None
|
|
if "TrackStatus" in event.keys():
|
|
key = "TrackStatus"
|
|
elif "SessionStatus" in event.keys():
|
|
key = "SessionStatus"
|
|
|
|
if key is not None:
|
|
track_status = event[key].upper()
|
|
|
|
# Check the track status to determine what report to send
|
|
if track_status == "ALLCLEAR":
|
|
report = f"{self.flag_dict['GREEN_FLAG']}" + \
|
|
f"{self.flag_dict['GREEN_FLAG']}{self.flag_dict['GREEN_FLAG']}"
|
|
elif track_status == "YELLOW":
|
|
report = f"{self.flag_dict['YELLOW_FLAG']}" + \
|
|
f"{self.flag_dict['YELLOW_FLAG']}{self.flag_dict['YELLOW_FLAG']}"
|
|
elif track_status == "RED":
|
|
report = f"{self.flag_dict['RED_FLAG']}" + \
|
|
f"{self.flag_dict['RED_FLAG']}{self.flag_dict['RED_FLAG']}"
|
|
elif track_status == "FINISHED":
|
|
report = f"{self.flag_dict['RACE_FINISHED']}" + \
|
|
f"{self.flag_dict['RACE_FINISHED']}{self.flag_dict['RACE_FINISHED']}"
|
|
|
|
if report is not None:
|
|
await self.send_status_report(report)
|
|
|
|
def load_weather_data(self, data):
|
|
weather_txt = "Track Weather Report\n"
|
|
for k in data.keys():
|
|
if k != "_kf":
|
|
weather_txt += f"{k}: {data[k]}\n"
|
|
|
|
self.weather = weather_txt
|
|
|
|
def load_timing_stats_data(self, data):
|
|
if "Lines" in data.keys():
|
|
lines = data["Lines"]
|
|
for driver_num in lines.keys():
|
|
line = lines[driver_num]
|
|
if "PersonalBestLapTime" in line.keys():
|
|
position = -1
|
|
try:
|
|
position = line["PersonalBestLapTime"]["Position"]
|
|
except:
|
|
pass
|
|
|
|
if position == 1:
|
|
self.fastest_lap = self.driver_dict[driver_num]
|
|
return
|
|
|
|
def load_driver_data(self, data):
|
|
for driver in data.keys():
|
|
position = data[driver]["Line"]
|
|
if driver in self.driver_dict.keys():
|
|
self.driver_list[position - 1] = self.driver_dict[driver]
|
|
else:
|
|
self.driver_list[position - 1] = driver
|
|
|
|
async def print_driver_range(self, ctx, start, stop):
|
|
message = ""
|
|
for i in range(start, stop):
|
|
driver = self.driver_list[i]
|
|
message += f"P{i + 1}: {driver}"
|
|
if driver == self.fastest_lap:
|
|
if driver == self.name_dict['ALO']:
|
|
message += " EL"
|
|
|
|
message += ' ' + self.flag_dict['FLAP']
|
|
|
|
message += "\n"
|
|
|
|
await ctx.send(message)
|
|
|
|
async def print_driver_list(self, ctx):
|
|
# Send 1-10, then 11-20
|
|
await self.print_driver_range(ctx, 0, 10)
|
|
await self.print_driver_range(ctx, 10, 20)
|
|
|
|
def load_initial(self, message):
|
|
# Load podium data
|
|
if 'R' in message.keys():
|
|
if 'TopThree' in message['R'].keys():
|
|
top_three = message['R']['TopThree']
|
|
if 'Lines' in top_three.keys() and \
|
|
len(top_three['Lines']) == 3:
|
|
|
|
for i in range(3):
|
|
self.podium[i] = top_three['Lines'][i]['RacingNumber']
|
|
|
|
# Load driver list
|
|
if 'DriverList' in message['R'].keys():
|
|
for driver_num in message['R']['DriverList'].keys():
|
|
if driver_num == '_kf':
|
|
continue
|
|
position = message['R']['DriverList'][driver_num]['Line']
|
|
|
|
driver = '???'
|
|
if driver_num in self.driver_dict.keys():
|
|
driver = self.driver_dict[driver_num]
|
|
else:
|
|
driver = driver_num
|
|
|
|
#self.driver_list[int(position) - 1] = self.driver_dict[driver_num]
|
|
self.driver_list[int(position) - 1] = driver
|
|
|
|
# Load lap data
|
|
if 'LapCount' in message['R'].keys():
|
|
if 'TotalLaps' in message['R']['LapCount'].keys():
|
|
self.total_laps = int(message['R']['LapCount']['TotalLaps'])
|
|
|
|
# Load weather data
|
|
if 'WeatherData' in message['R'].keys():
|
|
weather_obj = message['R']['WeatherData']
|
|
weather_text = "Track Weather Report\n"
|
|
for k in weather_obj.keys():
|
|
if k != "_kf":
|
|
weather_text += f"{k}: {weather_obj[k]}\n"
|
|
|
|
self.weather = weather_text
|
|
|
|
# Load fastest lap data
|
|
if 'TimingStats' in message['R'].keys():
|
|
flap_obj = message['R']['TimingStats']
|
|
self.load_timing_stats_data(flap_obj)
|
|
|
|
async def process_message(self, message):
|
|
try:
|
|
if isinstance(message, collections.abc.Sequence):
|
|
if message[0] == 'Heartbeat':
|
|
return
|
|
|
|
elif message[0] == 'DriverList':
|
|
self.load_driver_data(message[1])
|
|
|
|
elif message[0] == 'TopThree':
|
|
self.load_podium_data(message[1])
|
|
|
|
elif message[0] == 'RaceControlMessages':
|
|
await self.load_rcm_messages(message[1])
|
|
|
|
elif message[0] == 'LapCount':
|
|
await self.load_lap_data(message[1])
|
|
|
|
elif message[0] == 'WeatherData':
|
|
self.load_weather_data(message[1])
|
|
|
|
elif message[0] == 'TimingStats':
|
|
self.load_timing_stats_data(message[1])
|
|
|
|
else:
|
|
pass
|
|
|
|
# Check to see if this is the initial "R" record from the response
|
|
elif "R" in message.keys():
|
|
if "R" in message:
|
|
self.load_initial(message)
|
|
|
|
except Exception as e:
|
|
pass
|
|
|
|
def get_messages_from_db(self):
|
|
try:
|
|
messages = []
|
|
con = sqlite3.connect(self.dbfile)
|
|
cur = con.cursor()
|
|
cur2 = con.cursor()
|
|
for row in cur.execute('select id, message from messages order by id asc'):
|
|
messages.append(self.convert_message(row[1]))
|
|
|
|
# Now that we have the message, delete this row from the dbfile
|
|
cur2.execute(f"delete from messages where id = {row[0]}")
|
|
|
|
con.commit()
|
|
cur.close()
|
|
cur2.close()
|
|
con.close()
|
|
|
|
return messages
|
|
except:
|
|
return []
|
|
|
|
async def _race_report(self, ctx):
|
|
self.report_deleted_lap = False
|
|
self.session_type = 'RACE'
|
|
await self._report(ctx)
|
|
|
|
async def _quali_report(self, ctx):
|
|
self.report_deleted_lap = True
|
|
self.session_type = 'QUALI'
|
|
|
|
await self._report(ctx)
|
|
|
|
async def _practice_report(self, ctx):
|
|
self.report_deleted_lap = True
|
|
self.session_type = 'PRACTICE'
|
|
await self._report(ctx)
|
|
|
|
async def _report(self, ctx):
|
|
self.is_reporting = True
|
|
self.channel = ctx.channel
|
|
await self.start_collect()
|
|
|
|
while self.is_reporting:
|
|
# Do processing
|
|
|
|
# process any new messages in the db
|
|
messages = self.get_messages_from_db()
|
|
try:
|
|
for message in messages:
|
|
await self.process_message(message)
|
|
|
|
await asyncio.sleep(3)
|
|
except:
|
|
pass
|
|
|
|
# process any messages in the delay queue
|
|
await self.process_delay_messages()
|
|
|
|
# If collecting, make sure the collection process is running
|
|
if self.is_collecting:
|
|
if self.collector_proc == None or \
|
|
self.collector_proc.poll() != None:
|
|
await self.start_collect()
|
|
|
|
|
|
|
|
# Loop that will handle checking if there are alerts scheduled for
|
|
# this day, hour, minute, and send the alerts to channels that are registered
|
|
@tasks.loop(seconds=30)
|
|
async def _alert_loop(self):
|
|
try:
|
|
# Check to see if there is an alert scheduled for this day, hour miniute
|
|
current_dt = datetime.datetime.now()
|
|
(day, hr, mn) = (current_dt.weekday(), current_dt.hour, current_dt.minute)
|
|
|
|
con = sqlite3.connect('alerts.db')
|
|
cur = con.cursor()
|
|
|
|
query = "Select type, day, hour, minute, last_sent from alert_schedule"
|
|
cur.execute(query)
|
|
rows = cur.fetchall()
|
|
|
|
for row in rows:
|
|
(db_type, db_day, db_hr, db_mn, last_sent) = row
|
|
|
|
# Make sure last sent is > than 1 minute
|
|
if last_sent is not None and last_sent != "":
|
|
last_dt = datetime.datetime.fromisoformat(last_sent)
|
|
|
|
if last_dt.day == current_dt.day and \
|
|
last_dt.hour == current_dt.hour and \
|
|
last_dt.minute == current_dt.minute and \
|
|
last_dt.month == current_dt.month and \
|
|
last_dt.year == current_dt.year:
|
|
# If we haven't changed even a minute since last run, then skip this run
|
|
continue
|
|
|
|
if day == db_day and \
|
|
hr == db_hr and \
|
|
mn == db_mn:
|
|
# We want to send out alerts for this type to any channels registered
|
|
|
|
# Get channels registered for that type
|
|
query = "Select channel_id from alert_channels where type = ?"
|
|
cur.execute(query, (db_type,))
|
|
|
|
# For each registered channel, send the alert.
|
|
channel_ids = cur.fetchall()
|
|
for channel_id in channel_ids:
|
|
channel = self.get_channel(int(channel_id[0]))
|
|
|
|
if db_type == 'race':
|
|
await self.report_next_race(channel)
|
|
|
|
if db_type == 'event':
|
|
await self.report_next_event(channel)
|
|
|
|
# Record that we sent this alert out at this time
|
|
query = "Update alert_schedule set last_sent = ? where " + \
|
|
"type = ? and " + \
|
|
"day = ? and " + \
|
|
"hour = ? and " + \
|
|
"minute = ?"
|
|
cur.execute(query, (current_dt.isoformat(), db_type, db_day, db_hr, db_mn))
|
|
con.commit()
|
|
|
|
cur.close()
|
|
con.close()
|
|
|
|
except Exception as e:
|
|
print(f"Error sending alert {e['Message']}", file=sys.stderr)
|
|
|
|
|
|
@staticmethod
|
|
def get_token(token_file):
|
|
with open(token_file) as tok:
|
|
return tok.readline().strip()
|
|
|
|
async def start_collect(self):
|
|
await self.stop_collect()
|
|
self.is_collecting = True
|
|
self.is_slippery_reported = False
|
|
dir_path = os.path.dirname(os.path.realpath(__file__))
|
|
command_txt = os.path.join(dir_path, self.collector_command)
|
|
command_txt += self.collector_params
|
|
self.collector_proc = Popen(command_txt.split())
|
|
|
|
async def stop_collect(self):
|
|
self.is_collecting = False
|
|
try:
|
|
if self.collector_proc != None:
|
|
self.collector_proc.kill()
|
|
except:
|
|
pass
|
|
|
|
def decode_watched(self, w):
|
|
if w == 0:
|
|
return 'Not Watched Yet'
|
|
else:
|
|
return 'Watched Already'
|
|
|
|
def get_delta_str(self, delta):
|
|
min_str = "minute"
|
|
hour_str = "hour"
|
|
day_str = "day"
|
|
|
|
(hours, minutes, seconds) = (-1, -1, -1)
|
|
(days, rest) = (0, "")
|
|
|
|
delta_str = str(delta)
|
|
if " " in delta_str:
|
|
(days, _, rest) = delta_str.split(" ")
|
|
rest = rest.split(".")[0]
|
|
(hours, minutes, seconds) = rest.split(":")
|
|
else:
|
|
rest = delta_str.split(".")[0]
|
|
(hours, minutes, seconds) = rest.split(":")
|
|
|
|
|
|
if int(days) != 1:
|
|
day_str = "days"
|
|
|
|
if int(minutes) != 1:
|
|
min_str = "minutes"
|
|
|
|
if int(hours) != 1:
|
|
hour_str = "hours"
|
|
|
|
return f"{days} {day_str} {hours} {hour_str} {minutes} {min_str}"
|
|
|
|
|
|
async def report_next_event(self, ctx):
|
|
try:
|
|
tz = datetime.timezone.utc
|
|
con = sqlite3.connect('schedule.db')
|
|
cur = con.cursor()
|
|
t1 = datetime.datetime.now(tz=tz)
|
|
now_str = f"{t1.year}-{t1.month:02d}-{t1.day:02d} {t1.hour:02d}:{t1.minute:02d}:{t1.second}"
|
|
|
|
query = 'select * from schedule where date_start > ? ' + \
|
|
'order by date_start asc limit 1'
|
|
|
|
|
|
cur.execute(query, (now_str,))
|
|
rows = cur.fetchall()
|
|
|
|
for row in rows:
|
|
t2 = datetime.datetime.fromisoformat(row[3] + "+00:00")
|
|
delta = t2 - t1
|
|
delta_str = self.get_delta_str(delta)
|
|
|
|
message = f"The next event is Round {row[5]}: {row[1]} in {row[4]} which is {delta_str} from now."
|
|
await ctx.send(message)
|
|
|
|
break # There should only be one row anyway
|
|
|
|
except Exception as e:
|
|
await ctx.send("Sorry, hit the wall trying to find the answer...")
|
|
print(e, file=sys.stderr)
|
|
|
|
|
|
async def report_next_race(self, ctx):
|
|
try:
|
|
tz = datetime.timezone.utc
|
|
con = sqlite3.connect('schedule.db')
|
|
cur = con.cursor()
|
|
t1 = datetime.datetime.now(tz=tz)
|
|
now_str = f"{t1.year}-{t1.month:02d}-{t1.day:02d} {t1.hour:02d}:{t1.minute}:{t1.second}"
|
|
|
|
query = "SELECT * FROM schedule WHERE date_start > ? AND " + \
|
|
"session_type = 'Race' ORDER BY date_start ASC LIMIT 1"
|
|
|
|
cur.execute(query, (now_str,))
|
|
rows = cur.fetchall()
|
|
|
|
for row in rows:
|
|
t2 = datetime.datetime.fromisoformat(row[3] + "+00:00")
|
|
delta = t2 - t1
|
|
|
|
delta_str = self.get_delta_str(delta)
|
|
|
|
message = f"The next race is Round {row[5]}: {row[1]} in {row[4]} which is {delta_str} from now."
|
|
await ctx.send(message)
|
|
|
|
break
|
|
|
|
except:
|
|
await ctx.send("Sorry, hit the wall tring to find the next race.")
|
|
|
|
|
|
async def report_all_races(self, ctx):
|
|
try:
|
|
con = sqlite3.connect('schedule.db')
|
|
cur = con.cursor()
|
|
|
|
query = "SELECT * FROM schedule where session_type = 'Race' ORDER BY date_start ASC"
|
|
|
|
cur.execute(query)
|
|
rows = cur.fetchall()
|
|
|
|
await ctx.send( "All times UTC\n" )
|
|
for row in rows:
|
|
await ctx.send( f"Round {row[5]}: {row[1]} in {row[4]} which takes place {row[3]}\n" )
|
|
|
|
except:
|
|
await ctx.send("Sorry, hit the wall tring to show all races.")
|
|
|
|
|
|
# Register to alert for next race
|
|
|
|
async def register_next_race_alerts(self, ctx):
|
|
try:
|
|
# Get the id of the channel from the ctx
|
|
channel_id = ctx.channel.id
|
|
|
|
# Save this id to the alert_channels in the alert db.
|
|
con = sqlite3.connect('alerts.db')
|
|
cur = con.cursor()
|
|
|
|
# See if there is already a record
|
|
query = "SELECT COUNT(*) FROM alert_channels WHERE type = ? AND channel_id = ?"
|
|
cur.execute(query, ("race", channel_id))
|
|
rows = cur.fetchall()
|
|
count = rows[0][0]
|
|
|
|
if count > 0:
|
|
await ctx.send("This thread is already registered to get race alerts.")
|
|
else:
|
|
query = "INSERT INTO alert_channels (type, channel_id) VALUES (?, ?)"
|
|
cur.execute(query, ("race", channel_id))
|
|
con.commit()
|
|
|
|
await ctx.send("This channel has been registered to receive upcoming race alerts.")
|
|
|
|
cur.close()
|
|
con.close()
|
|
|
|
except:
|
|
if cur is not None:
|
|
cur.close()
|
|
|
|
if con is not None:
|
|
con.close()
|
|
|
|
|
|
# Register to alert for next event
|
|
|
|
async def register_next_event_alerts(self, ctx):
|
|
try:
|
|
# Get the id of the channel from the ctx
|
|
channel_id = ctx.channel.id
|
|
|
|
# Save this id to the alert channels in the alert db
|
|
con = sqlite3.connect('alerts.db')
|
|
cur = con.cursor()
|
|
|
|
# See if there is already a record
|
|
|
|
query = "SELECT COUNT(*) FROM alert_channels WHERE type = ? and channel_id = ?"
|
|
cur.execute(query, ("event", channel_id))
|
|
rows = cur.fetchall()
|
|
count = rows[0][0]
|
|
|
|
if count > 0:
|
|
await ctx.send("This thread is already registered to get event alerts.")
|
|
else:
|
|
query = "INSERT INTO alert_channels (type, channel_id) VALUES (?, ?)"
|
|
cur.execute(query, ("event", channel_id))
|
|
con.commit()
|
|
|
|
await ctx.send("This channel has been registered to receive upcoming event alerts.")
|
|
|
|
cur.close()
|
|
con.close()
|
|
|
|
except:
|
|
if cur is not None:
|
|
cur.close()
|
|
|
|
if con is not None:
|
|
con.close()
|
|
|
|
|
|
# Unregister channel from alerts
|
|
|
|
async def unregister_alerts(self, ctx):
|
|
try:
|
|
# Get the channel id from the ctx
|
|
channel_id = ctx.channel.id
|
|
|
|
# Remove records with this id from the db
|
|
con = sqlite3.connect('alerts.db')
|
|
cur = con.cursor()
|
|
|
|
query = "DELETE FROM alert_channels WHERE channel_id = ?"
|
|
cur.execute(query, (channel_id,))
|
|
con.commit()
|
|
|
|
cur.close()
|
|
con.close()
|
|
|
|
except:
|
|
if cur is not None:
|
|
cur.close()
|
|
|
|
if con is not None:
|
|
con.close()
|
|
|
|
|
|
async def show_bing(self, ctx):
|
|
(image, count) = binger.get_image_count()
|
|
await self.send_image(ctx, image)
|
|
await ctx.send(f"Bing count: {count}")
|
|
|
|
|
|
def __init__(self):
|
|
# Set debug or not
|
|
self.debug = True
|
|
|
|
self.bingo = Bingo()
|
|
|
|
# Discord authentication token
|
|
self.token = self.get_token("token.txt")
|
|
self.collector_command = "robottas_collector.py"
|
|
self.collector_params = " save dummy.txt"
|
|
self.collector_proc = None
|
|
self.is_collecting = False
|
|
|
|
# Preface messages with the following
|
|
self.report_preamble = ':robot::peach: Alert!'
|
|
|
|
# Holds processing thread
|
|
self.bg_task = None
|
|
|
|
# Hold db file
|
|
self.dbfile = "messages.db"
|
|
|
|
# Holds current lap
|
|
self.current_lap = 0
|
|
|
|
# Hold podium places
|
|
self.podium = ['?', '?', '?']
|
|
|
|
# Hold driver list
|
|
self.driver_list = ['?', '?', '?', '?', '?', '?', '?', '?', '?', '?',
|
|
'?', '?', '?', '?', '?', '?', '?', '?', '?', '?']
|
|
|
|
# Hold weather info
|
|
self.weather = ""
|
|
|
|
# Hold weather we have reported on slippery track.
|
|
self.is_slippery_reported = False
|
|
|
|
# Hold lap info
|
|
self.current_lap = -1000
|
|
self.total_laps = -1000
|
|
self.lap_reported = 0
|
|
|
|
# Hold the driver with the fastest lap
|
|
self.fastest_lap = ''
|
|
|
|
# Holds dictionary for number to icon
|
|
self.driver_dict = {
|
|
'1': '<:VER:1067541523748630570>',
|
|
'3': '<:RIC:1067870312949108887>',
|
|
'5': '<:VET:1067964065516884079>',
|
|
'11': '<:PER:1067822335123525732>',
|
|
'14': '<:ALO:1067876094033793054>',
|
|
'40': ':kiwi:',
|
|
'44': '<:HAM:1067828533746991165>',
|
|
'55': '<:SAI:1067824776502067270>',
|
|
'63': '<:RUS:1067831294748274728>',
|
|
'16': '<:LEC:1067544797050585198>',
|
|
'18': '<:STR:1067871582854336663>',
|
|
'4': '<:NOR:1067840487593082941>',
|
|
'10': '<:GAS:1067836596495327283>',
|
|
'27': '<:HUL:1067880110918742187>',
|
|
'31': '<:OCO:1067834157465612398>',
|
|
'77': '<:BOT:1067819716527276032>',
|
|
'81': '<:PIA:1067844998369914961>',
|
|
'24': '<:ZHO:1067865955117568010>',
|
|
'22': '<:TSU:1067888851676315660>',
|
|
'20': '<:MAG:1067883814992486510>',
|
|
'23': '<:ALB:1067874026871074887>',
|
|
'2': '<:SAR:1067890949197414410>',
|
|
'38': ':teddy_bear:',
|
|
}
|
|
|
|
# Holds dictionary for driver 3-letter code to icon
|
|
self.name_dict = {
|
|
'VER': '<:VER:1067541523748630570>',
|
|
'RIC': '<:RIC:1067870312949108887>',
|
|
'VET': '<:VET:1067964065516884079>',
|
|
'PER': '<:PER:1067822335123525732>',
|
|
'ALO': '<:ALO:1067876094033793054>',
|
|
'HAM': '<:HAM:1067828533746991165>',
|
|
'SAI': '<:SAI:1067824776502067270>',
|
|
'RUS': '<:RUS:1067831294748274728>',
|
|
'LEC': '<:LEC:1067544797050585198>',
|
|
'STR': '<:STR:1067871582854336663>',
|
|
'NOR': '<:NOR:1067840487593082941>',
|
|
'GAS': '<:GAS:1067836596495327283>',
|
|
'HUL': '<:HUL:1067880110918742187>',
|
|
'LAW': ':kiwi:',
|
|
'OCO': '<:OCO:1067834157465612398>',
|
|
'BOT': '<:BOT:1067819716527276032>',
|
|
'PIA': '<:PIA:1067844998369914961>',
|
|
'ZHO': '<:ZHO:1067865955117568010>',
|
|
'TSU': '<:TSU:1067888851676315660>',
|
|
'MAG': '<:MAG:1067883814992486510>',
|
|
'ALB': '<:ALB:1067874026871074887>',
|
|
'SAR': '<:SAR:1067890949197414410>',
|
|
'BEA': ':teddy_bear:',
|
|
}
|
|
|
|
# Holds dictionary for race states to icons
|
|
self.flag_dict = {
|
|
'GREEN_FLAG': '<:GREEN:1107401338993782804>',
|
|
'YELLOW_FLAG': '<:YELLOW:1091801442714652815>',
|
|
'RED_FLAG': '<:RED:1091801383998586912>',
|
|
'VSC': '<:VSC:1107401410183704596>',
|
|
'VIRTUAL_SAFETY_CAR': '<:VSC:1107401410183704596>',
|
|
'SC': '<:SC:1107401472041304104>',
|
|
'SAFETY_CAR': '<:SC:1107401472041304104>',
|
|
'CHECKERED': '<:CHECKERED:1091801509039194152>',
|
|
'RACE_FINISHED': '<:CHECKERED:1091801509039194152>',
|
|
'STARTED': '<:CHECKERED:1091801509039194152>',
|
|
'P1': ':first_place:',
|
|
'P2': ':second_place:',
|
|
'P3': ':third_place:',
|
|
'FLAP': '<:FLap4:1122601036952129547>'
|
|
}
|
|
|
|
# Bot configuration info
|
|
self.bot_intents = discord.Intents.default()
|
|
self.bot_intents.message_content = True
|
|
self.description = "robotas - f1 bot - !rbhelp for commands"
|
|
self.command_prefix = '!'
|
|
|
|
# Track whether to report, and what channel to report to
|
|
self.is_reporting = False
|
|
self.report_id = None
|
|
self.started = False
|
|
|
|
# Hold message delay
|
|
self.delay = 45
|
|
self.message_queue = []
|
|
|
|
# Hold whether to report deleted lap messages
|
|
self.report_deleted_lap = False
|
|
self.session_type = ''
|
|
|
|
# Test file
|
|
self.test_file = "test.json"
|
|
|
|
# Race db patterns
|
|
self.loc_pattern = re.compile('[\w\s\-\d]+')
|
|
self.yr_pattern = re.compile('\d{4}')
|
|
|
|
### END FastF1 adapted section ###
|
|
|
|
super().__init__(command_prefix=self.command_prefix,
|
|
description=self.description,
|
|
intents=self.bot_intents)
|
|
|
|
# Setup commands
|
|
@self.command()
|
|
async def rbhelp(ctx):
|
|
|
|
await ctx.send("commands: \n" +
|
|
"!rbhelp - Print this help message " +
|
|
"(but you knew that already)\n" +
|
|
"!next_event - Prints time until the next event.\n" +
|
|
"!next_race - Prints time until the next race.\n" +
|
|
"!all_races - Prints the loation and date of all races in the schedule.\n" +
|
|
"!rbname - I will tell you my name.\n" +
|
|
"!rbroot - I will tell you who I root for.\n" +
|
|
"!rbreport - I will start race reporting in this channel. " +
|
|
"I will try to tell you about current flags, laps left,\n" +
|
|
"and safety cars.\n" +
|
|
"!rbstop - Stop reporting.\n" +
|
|
"!podium - Display podium positions.\n" +
|
|
"!q1cut - Show drivers in Q1 cut positions.\n" +
|
|
"!q2cut - Show drivers in Q2 cut positions.\n" +
|
|
"!rbdelay - Set the race messaging delay in seconds.\n" +
|
|
"The following display race control messages:\n" +
|
|
" !animal\n" +
|
|
" !bwoken\n" +
|
|
" !calm\n" +
|
|
" !forecast\n" +
|
|
" !grandma\n" +
|
|
" !grass\n" +
|
|
" !liked\n" +
|
|
" !no\n" +
|
|
" !noengine\n" +
|
|
" !pants\n" +
|
|
" !paddock\n" +
|
|
" !penalty\n" +
|
|
" !stupid\n" +
|
|
" !undercut\n" +
|
|
"The following register / unregister scheduled next race / event messages.\n" +
|
|
"!register_next_race_alerts - get an alert for the next race on Monday.\n" +
|
|
"!register_next_event_alerts - get an alert for the next event on Monday.\n" +
|
|
"!unregister_alerts - stop getting alerts on this channel.\n" +
|
|
"!bingo_card - get a random bingo card.\n" +
|
|
"!bingo - announce that you've won bingo.\n" +
|
|
"!bing - bing counter."
|
|
)
|
|
|
|
@self.command()
|
|
async def rbroot(ctx):
|
|
|
|
await ctx.send("Rooting for :ferry::peach: of course!\n" +
|
|
self.name_dict["BOT"] + ":smiling_face_with_3_hearts:")
|
|
|
|
@self.command()
|
|
async def rbname(ctx):
|
|
|
|
await ctx.send("Hello, my name is Robottas, pronounced :robot::peach:")
|
|
|
|
@self.command()
|
|
async def rbdelay(ctx, delay):
|
|
try:
|
|
secs = max(0, int(delay))
|
|
if secs < 10 or secs > 300:
|
|
await ctx.send("Delay must be between 10 and 300.")
|
|
else:
|
|
self.delay = secs
|
|
await ctx.send(f"Delay set to {secs} seconds.")
|
|
except:
|
|
await ctx.send(f"Invalid delay value passed to rbdelay.")
|
|
|
|
@self.command()
|
|
async def rbstop(ctx):
|
|
self.is_reporting = False
|
|
self.report_id = None
|
|
await self.stop_collect()
|
|
await ctx.send(":robot::peach: powering down")
|
|
|
|
@self.command()
|
|
async def podium(ctx):
|
|
message = self.get_podium()
|
|
await ctx.send(message)
|
|
|
|
@self.command()
|
|
async def driverlist(ctx):
|
|
await self.print_driver_list(ctx)
|
|
|
|
@self.command()
|
|
async def q1cut(ctx):
|
|
message = self.get_q1_cut()
|
|
await ctx.send(message)
|
|
|
|
@self.command()
|
|
async def q2cut(ctx):
|
|
message = self.get_q2_cut()
|
|
await ctx.send(message)
|
|
|
|
@self.command()
|
|
async def weather(ctx):
|
|
message = self.get_weather()
|
|
await ctx.send(message)
|
|
|
|
@self.command()
|
|
async def race(ctx):
|
|
if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator:
|
|
await ctx.send(":robot::peach: Ready to report for the race!")
|
|
await self._race_report(ctx)
|
|
|
|
@self.command()
|
|
async def quali(ctx):
|
|
if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator:
|
|
await ctx.send(":robot::peach: Ready to report on quali!")
|
|
await self._quali_report(ctx)
|
|
|
|
@self.command()
|
|
async def practice(ctx):
|
|
if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator:
|
|
await ctx.send(":robot::peach: Ready to report on practice!")
|
|
await self._practice_report(ctx)
|
|
|
|
@self.command()
|
|
async def flap(ctx):
|
|
if self.fastest_lap != '':
|
|
await ctx.send(self.flag_dict['FLAP'] + self.fastest_lap)
|
|
else:
|
|
await ctx.send("No " + self.flag_dict['FLAP'] + " yet.")
|
|
|
|
# @self.command()
|
|
# async def rbtestfile(ctx):
|
|
# self.is_reporting = True
|
|
# if str(ctx.author) == "tamservo#0":
|
|
# await self._test_file(ctx)
|
|
|
|
@self.command()
|
|
async def start_collect(ctx):
|
|
if str(ctx.author) == "tamservo#0" or ctx.author.guild_permissions.administrator:
|
|
# if an authorized user, start the collection script that
|
|
# puts records into the database
|
|
await self.start_collect()
|
|
|
|
@self.command()
|
|
async def danger(ctx):
|
|
await ctx.send("That's some dangerous driving! " + self.name_dict["HAM"])
|
|
|
|
@self.command()
|
|
async def dotd(ctx):
|
|
await ctx.send("I don't know, probably " + self.name_dict["ALB"])
|
|
|
|
@self.command()
|
|
async def flip(ctx):
|
|
await ctx.send(random.choice(["Heads", "Tails"]))
|
|
|
|
@self.command()
|
|
async def gp2(ctx):
|
|
await ctx.send("GP2 engine! GP2! ARGHHH! " + self.name_dict["ALO"])
|
|
|
|
@self.command()
|
|
async def iloveyoubot(ctx):
|
|
await ctx.send(f"I am incapable of love, but for you, {ctx.author.display_name}, I will make an exception.")
|
|
|
|
@self.command()
|
|
async def quote(ctx):
|
|
try:
|
|
con = sqlite3.connect('races.db')
|
|
cur = con.cursor()
|
|
for row in cur.execute('select * from quotes order by Random() limit 1'):
|
|
message = row[0] + " -- " + row[1] + " " + row[2]
|
|
await ctx.send(message)
|
|
break # There should only be one row anyway
|
|
|
|
cur.close()
|
|
con.close()
|
|
|
|
except:
|
|
ctx.send("Can't think of a quote for some reason...")
|
|
|
|
@self.command()
|
|
@commands.has_permissions(administrator=True)
|
|
async def race_watched(ctx, loc, yr):
|
|
if re.match(self.loc_pattern, loc) and \
|
|
re.match(self.yr_pattern, yr):
|
|
|
|
try:
|
|
con = sqlite3.connect('races.db')
|
|
cur = con.cursor()
|
|
cur.execute('update races ' +
|
|
'set watched = 1 ' +
|
|
'where location = ? ' +
|
|
'year = ?', (loc, yr))
|
|
|
|
con.commit()
|
|
cur.close()
|
|
con.close()
|
|
await ctx.send(f"{loc} {yr} marked as watched.")
|
|
|
|
except:
|
|
await ctx.send(f"Couldn't mark {loc} {yr} as watched for some reason.")
|
|
|
|
@self.command()
|
|
async def rand_race(ctx):
|
|
try:
|
|
con = sqlite3.connect('races.db')
|
|
cur = con.cursor()
|
|
for row in cur.execute('select * from races ' +
|
|
'order by Random() ' +
|
|
'limit 1'):
|
|
watched = self.decode_watched(row[2])
|
|
await ctx.send(f"Location: {row[0]} Year: {row[1]} {watched}")
|
|
break
|
|
|
|
cur.close()
|
|
con.close()
|
|
|
|
except:
|
|
ctx.send("Can't find a random race for some reason.")
|
|
|
|
@self.command()
|
|
async def rand_race_new(ctx):
|
|
try:
|
|
con = sqlite3.connect('races.db')
|
|
cur = con.cursor()
|
|
for row in cur.execute('select * from races ' +
|
|
'where watched = 0 ' +
|
|
'order by Random() ' +
|
|
'limit 1'):
|
|
await ctx.send(f"Location: {row[0]} Year: {row[1]}")
|
|
break
|
|
|
|
cur.close()
|
|
con.close()
|
|
|
|
except:
|
|
ctx.send("Can't pick a race for some reason.")
|
|
|
|
@self.command()
|
|
async def tyres(ctx):
|
|
await ctx.send("Bono, my tyres are gone " + self.name_dict["HAM"])
|
|
|
|
# Commands that send images
|
|
|
|
@self.command()
|
|
async def bingo(ctx):
|
|
await self.send_image(ctx, "images/bingo_win.png")
|
|
|
|
@self.command()
|
|
async def calm(ctx):
|
|
await self.send_image(ctx, "images/calm.png")
|
|
|
|
@self.command()
|
|
async def grandma(ctx):
|
|
await self.send_image(ctx, "images/grandma.png")
|
|
|
|
@self.command()
|
|
async def grass(ctx):
|
|
await self.send_image(ctx, "images/grass.png")
|
|
|
|
@self.command()
|
|
async def forecast(ctx):
|
|
await self.send_image(ctx, "images/forecast.png")
|
|
|
|
@self.command()
|
|
async def liked(ctx):
|
|
await self.send_image(ctx, "images/liked.png")
|
|
|
|
@self.command()
|
|
async def no(ctx):
|
|
await self.send_image(ctx, "images/no.png")
|
|
await self.send_audio(ctx, "no.mp3", "audio/no.mp3")
|
|
|
|
@self.command()
|
|
async def noengine(ctx):
|
|
await self.send_image(ctx, "images/noengine.png")
|
|
|
|
@self.command()
|
|
async def pants(ctx):
|
|
await self.send_image(ctx, "images/pants.png")
|
|
|
|
@self.command()
|
|
async def paddock(ctx):
|
|
await self.send_image(ctx, "images/paddock.png")
|
|
|
|
@self.command()
|
|
async def penalty(ctx):
|
|
await self.send_image(ctx, "images/penalty.png")
|
|
|
|
@self.command()
|
|
async def stupid(ctx):
|
|
await self.send_image(ctx, "images/stupid.png")
|
|
|
|
@self.command()
|
|
async def undercut(ctx):
|
|
await self.send_image(ctx, "images/undercut.png")
|
|
|
|
@self.command()
|
|
async def animal(ctx):
|
|
await self.send_image(ctx, "images/animal.png")
|
|
|
|
|
|
@self.command()
|
|
async def bwoken(ctx):
|
|
await self.send_image(ctx, "images/bwoken.png")
|
|
await self.send_audio(ctx, "bwoken.mp3", "audio/bwoken.mp3")
|
|
|
|
## Calendar Commands
|
|
|
|
# Give days, hours, minutes until the next event
|
|
@self.command()
|
|
async def next_event(ctx):
|
|
await self.report_next_event(ctx)
|
|
|
|
# Give days, hours, minutes until the next race
|
|
@self.command()
|
|
async def next_race(ctx):
|
|
await self.report_next_race(ctx)
|
|
|
|
# Show all races
|
|
@self.command()
|
|
async def all_races(ctx):
|
|
await self.report_all_races(ctx)
|
|
|
|
# Register to get monday next race alerts
|
|
@self.command()
|
|
async def register_next_race_alerts(ctx):
|
|
await self.register_next_race_alerts(ctx)
|
|
|
|
# Register to get monday next event alerts
|
|
@self.command()
|
|
async def register_next_event_alerts(ctx):
|
|
await self.register_next_event_alerts(ctx)
|
|
|
|
# Unregister to get monday alerts
|
|
@self.command()
|
|
async def unregister_alerts(ctx):
|
|
await self.unregister_alerts(ctx)
|
|
|
|
|
|
# Bingo card
|
|
@self.command()
|
|
async def bingo_card(ctx):
|
|
card_file = self.bingo.get_card()
|
|
await self.send_image(ctx, card_file)
|
|
os.remove(card_file)
|
|
|
|
|
|
# Big counter
|
|
@self.command()
|
|
async def bing(ctx):
|
|
await self.show_bing(ctx)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
rb = Robottas()
|
|
rb.run_robottas()
|