Spaces:
Runtime error
Runtime error
| import datetime | |
| import os | |
| import streamlit as st | |
| import numpy as np | |
| import math | |
| import fastf1 | |
| import pandas as pd | |
| from fastapi import FastAPI | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, HTMLResponse | |
| from pydantic import BaseModel | |
| import functools | |
| import math | |
| import numpy as np | |
| import concurrent.futures | |
| import available_data | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| FASTF1_CACHE_DIR = os.environ['FASTF1_CACHE_DIR'] | |
| fastf1.Cache.enable_cache(FASTF1_CACHE_DIR) | |
| def smooth_derivative(t_in, v_in): | |
| # | |
| # Function to compute a smooth estimation of a derivative. | |
| # [REF: http://holoborodko.com/pavel/numerical-methods/numerical-derivative/smooth-low-noise-differentiators/] | |
| # | |
| # Configuration | |
| # | |
| # Derivative method: two options: 'smooth' or 'centered'. Smooth is more conservative | |
| # but helps to supress the very noisy signals. 'centered' is more agressive but more noisy | |
| method = "smooth" | |
| t = t_in.copy() | |
| v = v_in.copy() | |
| # (0) Prepare inputs | |
| # (0.1) Time needs to be transformed to seconds | |
| try: | |
| for i in range(0, t.size): | |
| t.iloc[i] = t.iloc[i].total_seconds() | |
| except: | |
| pass | |
| t = np.array(t) | |
| v = np.array(v) | |
| # (0.1) Assert they have the same size | |
| assert t.size == v.size | |
| # (0.2) Initialize output | |
| dvdt = np.zeros(t.size) | |
| # (1) Manually compute points out of the stencil | |
| # (1.1) First point | |
| dvdt[0] = (v[1] - v[0]) / (t[1] - t[0]) | |
| # (1.2) Second point | |
| dvdt[1] = (v[2] - v[0]) / (t[2] - t[0]) | |
| # (1.3) Third point | |
| dvdt[2] = (v[3] - v[1]) / (t[3] - t[1]) | |
| # (1.4) Last points | |
| n = t.size | |
| dvdt[n - 1] = (v[n - 1] - v[n - 2]) / (t[n - 1] - t[n - 2]) | |
| dvdt[n - 2] = (v[n - 1] - v[n - 3]) / (t[n - 1] - t[n - 3]) | |
| dvdt[n - 3] = (v[n - 2] - v[n - 4]) / (t[n - 2] - t[n - 4]) | |
| # (2) Compute the rest of the points | |
| if method == "smooth": | |
| c = [5.0 / 32.0, 4.0 / 32.0, 1.0 / 32.0] | |
| for i in range(3, t.size - 3): | |
| for j in range(1, 4): | |
| dvdt[i] += ( | |
| 2 * j * c[j - 1] * (v[i + j] - v[i - j]) / | |
| (t[i + j] - t[i - j]) | |
| ) | |
| elif method == "centered": | |
| for i in range(3, t.size - 2): | |
| for j in range(1, 4): | |
| dvdt[i] = (v[i + 1] - v[i - 1]) / (t[i + 1] - t[i - 1]) | |
| return dvdt | |
| def truncated_remainder(dividend, divisor): | |
| divided_number = dividend / divisor | |
| divided_number = ( | |
| -int(-divided_number) if divided_number < 0 else int(divided_number) | |
| ) | |
| remainder = dividend - divisor * divided_number | |
| return remainder | |
| def transform_to_pipi(input_angle): | |
| pi = math.pi | |
| revolutions = int((input_angle + np.sign(input_angle) * pi) / (2 * pi)) | |
| p1 = truncated_remainder(input_angle + np.sign(input_angle) * pi, 2 * pi) | |
| p2 = ( | |
| np.sign( | |
| np.sign(input_angle) | |
| + 2 | |
| * ( | |
| np.sign( | |
| math.fabs( | |
| (truncated_remainder(input_angle + pi, 2 * pi)) / (2 * pi) | |
| ) | |
| ) | |
| - 1 | |
| ) | |
| ) | |
| ) * pi | |
| output_angle = p1 - p2 | |
| return output_angle, revolutions | |
| def remove_acceleration_outliers(acc): | |
| acc_threshold_g = 7.5 | |
| if math.fabs(acc[0]) > acc_threshold_g: | |
| acc[0] = 0.0 | |
| for i in range(1, acc.size - 1): | |
| if math.fabs(acc[i]) > acc_threshold_g: | |
| acc[i] = acc[i - 1] | |
| if math.fabs(acc[-1]) > acc_threshold_g: | |
| acc[-1] = acc[-2] | |
| return acc | |
| def compute_accelerations(telemetry): | |
| v = np.array(telemetry["Speed"]) / 3.6 | |
| lon_acc = smooth_derivative(telemetry["Time"], v) / 9.81 | |
| dx = smooth_derivative(telemetry["Distance"], telemetry["X"]) | |
| dy = smooth_derivative(telemetry["Distance"], telemetry["Y"]) | |
| theta = np.zeros(dx.size) | |
| theta[0] = math.atan2(dy[0], dx[0]) | |
| for i in range(0, dx.size): | |
| theta[i] = ( | |
| theta[i - 1] + | |
| transform_to_pipi(math.atan2(dy[i], dx[i]) - theta[i - 1])[0] | |
| ) | |
| kappa = smooth_derivative(telemetry["Distance"], theta) | |
| lat_acc = v * v * kappa / 9.81 | |
| # Remove outliers | |
| lon_acc = remove_acceleration_outliers(lon_acc) | |
| lat_acc = remove_acceleration_outliers(lat_acc) | |
| return np.round(lon_acc,2), np.round(lat_acc,2) | |
| # @st.cache_data | |
| async def driver_standings() -> any: | |
| YEAR = 2023 #datetime.datetime.now().year | |
| df = pd.DataFrame( | |
| pd.read_html(f"https://www.formula1.com/en/results.html/{YEAR}/drivers.html")[0] | |
| ) | |
| df = df[["Driver", "PTS", "Car"]] | |
| # reverse the order | |
| df = df.sort_values(by="PTS", ascending=True) | |
| # in Driver column only keep the last 3 characters | |
| df["Driver"] = df["Driver"].str[:-5] | |
| # add colors to the dataframe | |
| car_colors = available_data.team_colors(YEAR) | |
| df["fill"] = df["Car"].map(car_colors) | |
| # remove rows where points is 0 | |
| df = df[df["PTS"] != 0] | |
| df.reset_index(inplace=True, drop=True) | |
| df.rename(columns={"PTS": "Points"}, inplace=True) | |
| return {"WDC":df.to_dict("records")} | |
| # @st.cache_data | |
| async def root(): | |
| return HTMLResponse( | |
| content="""<iframe src="https://tracinginsights-f1-analysis.hf.space" frameborder="0" style="width:100%; height:100%;" scrolling="yes" allowfullscreen:"yes"></iframe>""", | |
| status_code=200) | |
| # @st.cache_data | |
| async def years_available() -> any: | |
| # make a list from 2018 to current year | |
| current_year = datetime.datetime.now().year | |
| years = list(range(2018, current_year+1)) | |
| # reverse the list to get the latest year first | |
| years.reverse() | |
| years = [{"label": str(year), "value": year} for year in years] | |
| return {"years": years} | |
| # format for events {"events":[{"label":"Saudi Arabian Grand Prix","value":2},{"label":"Bahrain Grand Prix","value":1},{"label":"Pre-Season Testing","value":"t1"}]} | |
| # @st.cache_data | |
| async def events_available(year: int) -> any: | |
| # get events available for a given year | |
| data = available_data.LatestData(year) | |
| events = data.get_events() | |
| events = [{"label": event, "value": event} for i, event in enumerate(events)] | |
| events.reverse() | |
| return {"events": events} | |
| # format for sessions {"sessions":[{"label":"FP1","value":"FP1"},{"label":"FP2","value":"FP2"},{"label":"FP3","value":"FP3"},{"label":"Qualifying","value":"Q"},{"label":"Race","value":"R"}]} | |
| # @st.cache_data | |
| async def sessions_available(year: int, event: str | int) -> any: | |
| # get sessions available for a given year and event | |
| data = available_data.LatestData(year) | |
| sessions = data.get_sessions(event) | |
| sessions = [{"label": session, "value": session} for session in sessions] | |
| return {"sessions": sessions} | |
| # format for drivers {"drivers":[{"color":"#fff500","label":"RIC","value":"RIC"},{"color":"#ff8700","label":"NOR","value":"NOR"},{"color":"#c00000","label":"VET","value":"VET"},{"color":"#0082fa","label":"LAT","value":"LAT"},{"color":"#787878","label":"GRO","value":"GRO"},{"color":"#ffffff","label":"GAS","value":"GAS"},{"color":"#f596c8","label":"STR","value":"STR"},{"color":"#787878","label":"MAG","value":"MAG"},{"color":"#0600ef","label":"ALB","value":"ALB"},{"color":"#ffffff","label":"KVY","value":"KVY"},{"color":"#fff500","label":"OCO","value":"OCO"},{"color":"#0600ef","label":"VER","value":"VER"},{"color":"#00d2be","label":"HAM","value":"HAM"},{"color":"#ff8700","label":"SAI","value":"SAI"},{"color":"#00d2be","label":"BOT","value":"BOT"},{"color":"#960000","label":"GIO","value":"GIO"}]} | |
| # @st.cache_data | |
| async def session_drivers(year: int, event: str | int, session: str) -> any: | |
| # get drivers available for a given year, event and session | |
| f1session = fastf1.get_session(year, event, session) | |
| f1session.load(telemetry=False, weather=False, messages=False) | |
| laps = f1session.laps | |
| team_colors = available_data.team_colors(year) | |
| drivers = laps.Driver.unique() | |
| # for each driver in drivers, get the Team column from laps and get the color from team_colors dict | |
| drivers = [{"color": team_colors[laps[laps.Driver == | |
| driver].Team.iloc[0]], "label": driver, "value": driver} for driver in drivers] | |
| return {"drivers": drivers} | |
| # format for chartData {"chartData":[{"lapnumber":1},{ | |
| # "VER":91.564, | |
| # "VER_compound":"SOFT", | |
| # "VER_compound_color":"#FF5733", | |
| # "lapnumber":2 | |
| # },{"lapnumber":3},{"VER":90.494,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":4},{"lapnumber":5},{"VER":90.062,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":6},{"lapnumber":7},{"VER":89.815,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":8},{"VER":105.248,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":9},{"lapnumber":10},{"VER":89.79,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":11},{"VER":145.101,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":12},{"lapnumber":13},{"VER":89.662,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":14},{"lapnumber":15},{"VER":89.617,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":16},{"lapnumber":17},{"VER":140.717,"VER_compound":"SOFT","VER_compound_color":"#FF5733","lapnumber":18}]} | |
| # @st.cache_data | |
| async def laps_data(year: int, event: str | int, session: str, driver: str) -> any: | |
| # get drivers available for a given year, event and session | |
| f1session = fastf1.get_session(year, event, session) | |
| f1session.load(telemetry=False, weather=False, messages=False) | |
| laps = f1session.laps | |
| team_colors = available_data.team_colors(year) | |
| # add team_colors dict to laps on Team column | |
| drivers = laps.Driver.unique() | |
| # for each driver in drivers, get the Team column from laps and get the color from team_colors dict | |
| drivers = [{"color": team_colors[laps[laps.Driver == | |
| driver].Team.iloc[0]], "label": driver, "value": driver} for driver in drivers] | |
| driver_laps = laps.pick_driver(driver) | |
| driver_laps['LapTime'] = driver_laps['LapTime'].dt.total_seconds() | |
| compound_colors = { | |
| "SOFT": "#FF0000", | |
| "MEDIUM": "#FFFF00", | |
| "HARD": "#FFFFFF", | |
| "INTERMEDIATE": "#00FF00", | |
| "WET": "#088cd0", | |
| } | |
| driver_laps_data = [] | |
| for _, row in driver_laps.iterrows(): | |
| if row['LapTime'] > 0: | |
| lap = {f"{driver}": row['LapTime'], | |
| f"{driver}_compound": row['Compound'], | |
| f"{driver}_compound_color": compound_colors[row['Compound']], | |
| "lapnumber": row['LapNumber']} | |
| else: | |
| lap = {"lapnumber": row['LapNumber']} | |
| driver_laps_data.append(lap) | |
| return {"chartData": driver_laps_data} | |
| # @st.cache_data | |
| async def telemetry_data(year: int, event: str | int, session: str, driver: str, lap_number: int) -> any: | |
| f1session = fastf1.get_session(year, event, session) | |
| f1session.load(telemetry=True, weather=False, messages=False) | |
| laps = f1session.laps | |
| driver_laps = laps.pick_driver(driver) | |
| driver_laps['LapTime'] = driver_laps['LapTime'].dt.total_seconds() | |
| # get the telemetry for lap_number | |
| selected_lap = driver_laps[driver_laps.LapNumber == lap_number] | |
| telemetry = selected_lap.get_telemetry() | |
| lon_acc, lat_acc = compute_accelerations(telemetry) | |
| telemetry["lon_acc"] = lon_acc | |
| telemetry["lat_acc"] = lat_acc | |
| telemetry['Time'] = telemetry['Time'].dt.total_seconds() | |
| laptime = selected_lap.LapTime.values[0] | |
| data_key = f"{driver} - Lap {int(lap_number)} - {year} {session} [{int(laptime//60)}:{laptime%60}]" | |
| telemetry['DRS'] = telemetry['DRS'].apply(lambda x: 1 if x in [10,12,14] else 0) | |
| brake_tel = [] | |
| drs_tel = [] | |
| gear_tel = [] | |
| rpm_tel = [] | |
| speed_tel = [] | |
| throttle_tel = [] | |
| time_tel = [] | |
| track_map = [] | |
| lon_acc_tel = [] | |
| lat_acc_tel = [] | |
| for _, row in telemetry.iterrows(): | |
| brake = {"x": row['Distance'], | |
| "y": row['Brake'], | |
| } | |
| brake_tel.append(brake) | |
| drs = {"x": row['Distance'], | |
| "y": row['DRS'], | |
| } | |
| drs_tel.append(drs) | |
| gear = {"x": row['Distance'], | |
| "y": row['nGear'], | |
| } | |
| gear_tel.append(gear) | |
| rpm = {"x": row['Distance'], | |
| "y": row['RPM'], | |
| } | |
| rpm_tel.append(rpm) | |
| speed = {"x": row['Distance'], | |
| "y": row['Speed'], | |
| } | |
| speed_tel.append(speed) | |
| throttle = {"x": row['Distance'], | |
| "y": row['Throttle'], | |
| } | |
| throttle_tel.append(throttle) | |
| time = {"x": row['Distance'], | |
| "y": row['Time'], | |
| } | |
| time_tel.append(time) | |
| lon_acc = {"x": row['Distance'], | |
| "y": row['lon_acc'], | |
| } | |
| lon_acc_tel.append(lon_acc) | |
| lat_acc = {"x": row['Distance'], | |
| "y": row['lat_acc'], | |
| } | |
| lat_acc_tel.append(lat_acc) | |
| track = {"x": row['X'], | |
| "y": row['Y'], | |
| } | |
| track_map.append(track) | |
| telemetry_data = { | |
| "telemetryData":{ | |
| "brake": brake_tel, | |
| "dataKey": data_key, | |
| "drs": drs_tel, | |
| "gear": gear_tel, | |
| "rpm": rpm_tel, | |
| "speed": speed_tel, | |
| "throttle": throttle_tel, | |
| "time": time_tel, | |
| "lon_acc": lon_acc_tel, | |
| "lat_acc": lat_acc_tel, | |
| "trackMap": track_map, | |
| } | |
| } | |
| return telemetry_data | |