In some of my previous, we have discussed the use of FastAPI, such as using it as a webhook for a chatbot, and creating a machine learning API using FastAPI for prediction. In this post, we will discuss about using FastAPI to develop a API Wrapper for F1 data.
Project Overview
1 - Create a Cloud SQL database to store client scopes for authentication
2 - Scrape F1 drivers and team results from Formula1.com
3 - Build a FastAPI app that authenticates clients using a Cloud SQL database
4 - Deploy to Cloud Run with Skaffold and use UV as the Python package manager
To follow with the post, you can check the code here
Project Structure
The core folder structure is organized as follows:
.
├── Dockerfile
├── README.md
├── app
│ ├── common
│ │ ├── f1api.py
│ │ ├── exception
│ │ ├── gcp_cloudsql.py
│ │ └── gcp_secrets.py
│ ├── config
│ │ ├── __init__.py
│ │ └── config.py
│ ├── dependencies.py
│ ├── main.py
│ ├── routes
│ │ ├── auth.py
│ │ ├── root.py
│ │ └── f1api.py
│ └── schema
│ ├── auth.py
│ └── f1api.py
├── deploy
│ └── production
│ └── service.yaml
├── pyproject.toml
├── skaffold.yaml
└── uv.lock
Preparation
We will first have to create a Cloud SQL instance and a database to store the client scopes for authentication. The schema will look something like this. And the client_secret will be hashed using bcrypt.
CREATE TABLE client_scopes_get (
client_id VARCHAR(50) NOT NULL,
client_secret VARCHAR(100) NOT NULL,
scopes VARCHAR(1000),
is_active INT
);
The project uses UV as the Python package manager, in pyproject.toml
we can have something like this:
[project]
name = "f1api-wrapper"
version = "0.1.0"
description = "F1 API Wrapper"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"loguru>=0.7.2",
"requests>=2.32.3",
"lxml>=5.3.0",
"cssselect>=1.2.0",
"pydantic>=2.4.2",
"pydantic-settings>=2.0.3",
"google-cloud-secret-manager>=2.16.3",
"google-cloud-logging>=3.8.0",
"google-cloud-storage>=2.10.0",
"protobuf==4.25.3",
"fastapi>=0.104.1",
"slowapi>=0.1.9",
"uvicorn>=0.24.0.post1",
"bcrypt>=4.0.1",
"python-jose>=3.3.0",
"passlib>=1.7.4",
"python-multipart>=0.0.17",
"SQLAlchemy>=2.0.36",
"pg8000>=1.31.2",
"psycopg2-binary>=2.9.10",
]
Then we can run,
uv venv --python 3.11
source .venv/bin/activate
uv pip install -r pyproject.toml
uv lock
Developing API Wrapper
Firstly, we will need to extract data from the official F1 website. We can use the requests
and lxml
libraries to scrape the data.
In common/f1api.py
, we can have something like this:
from lxml import html
import requests
import logging
logger = logging.getLogger(__name__)
class F1Scraper:
def __init__(self):
self._driver_stat_url = (
"https://www.formula1.com/en/results.html/2025/drivers.html"
)
self._team_stat_url = "https://www.formula1.com/en/results/2025/team"
@staticmethod
def _scrape_f1(url: str, key: str) -> dict:
page = requests.get(url)
tree = html.fromstring(page.content)
tables = tree.cssselect("table")
for table in tables:
headers = [
header.text_content().strip().lower()
for header in table.cssselect("th")
]
if key in headers and "pos" in headers and "pts" in headers:
table_rows = table.cssselect("tr")
break
else:
logger.error(
"No matching table found. The structure of the website might have changed."
)
return {"data": []}
column_headers = [
column.text_content().strip() for column in table_rows[0].cssselect("th")
]
data_rows = []
for row in table_rows[1:]:
data = [column.text_content().strip() for column in row.cssselect("td")]
data_dict = dict(zip(column_headers, data))
if key.capitalize() in data_dict:
data_dict[key.capitalize()] = data_dict[key.capitalize()].replace(
"\xa0", " "
)
data_rows.append(data_dict)
return {"data": data_rows}
def get_driver_data(self) -> dict:
return self._scrape_f1(self._driver_stat_url, "driver")
def get_team_data(self) -> dict:
return self._scrape_f1(self._team_stat_url, "team")
In common/gcp_cloudsql.py
, we can have something like this:
from typing import Any, Dict, Optional, Union
from app.config import config
from app.common.exception.exceptions import CustomError
from loguru import logger
from sqlalchemy.sql import text
def pgsql_query(sql: str, params: dict = None) -> list[dict]:
engine = config.postgres.engine
conn = None
try:
conn = engine.connect()
logger.info("Connected to the PostgreSQL database.")
result = conn.execute(text(sql), params or {})
logger.info("Query executed successfully.")
rows = result.fetchall()
logger.info("Fetched all rows.")
print("Fetched all rows.")
columns = result.keys()
result_list = [dict(zip(columns, row)) for row in rows]
logger.info("Converted rows to a list of dictionaries.")
print(f"Converted rows to a list of dictionaries. {result_list}")
except Exception as e:
logger.error(f"Error executing query: {e}")
raise CustomError(f"Error executing query: {e}")
finally:
if conn is not None:
conn.close()
logger.info("Database connection closed.")
return result_list
def get_pgql_data(
query: str,
params: Dict[str, Any],
result_key: Optional[str] = None,
log_message: str = "",
) -> Union[dict, str]:
result = pgsql_query(query, params)
if result:
logger.info(f"{log_message} return: {result[0]}")
return result[0][result_key] if result_key else result[0]
return "" if result_key else {}
def get_client_by_id(client_id: str) -> dict:
query = "SELECT * FROM client_scopes_get WHERE client_id = :client_id"
return get_pgql_data(
query=query, params={"client_id": client_id}, log_message="get_client_by_id"
)
def get_client_scopes(client_id: str) -> str:
query = "SELECT scopes FROM client_scopes_get WHERE client_id = :client_id"
return get_pgql_data(
query=query,
params={"client_id": client_id},
result_key="scopes",
log_message="get_client_scopes",
)
In config/config.py
, we can have something like this:
from pydantic import SecretStr, PrivateAttr, BaseModel
from pydantic_settings import BaseSettings
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
class PostgresConfig(BaseModel):
_engine: Engine | None = PrivateAttr(default=None)
connection_string: str
@property
def engine(self) -> Engine:
if not self._engine:
self._engine = create_engine(self.connection_string)
return self._engine
class Config(BaseSettings):
postgres: PostgresConfig
SECRET_KEY: SecretStr
ALGORITHM: str
ACCESS_TOKEN_EXPIRE_MINUTES: int
API_ENDPOINT_BASE_URL: str
RATE_LIMIT_WINDOW: int
Then we will have to define schemas for the API, in fastAPI, schemas are used to define the request and response models.
In schema/auth.py
, we can have something like this:
from pydantic import BaseModel
from typing import Optional
from fastapi import Form
class TokenResponse(BaseModel):
access_token: str
token_type: str
expires_in: int
class TokenData(BaseModel):
client_id: str | None = None
class TokenRequestForm:
def __init__(
self,
grant_type: Optional[str] = Form(..., regex="^(password|client_credentials)$"),
client_id: Optional[str] = Form(...),
client_secret: Optional[str] = Form(...),
):
self.grant_type = grant_type
self.client_id = client_id
self.client_secret = client_secret
In schema/f1api.py
, we can have something like this:
from app.schema.base import F1BaseModel
class Drivers(F1BaseModel):
pos: str | None = None
driver: str | None = None
nationality: str | None = None
car: str | None = None
pts: float | None = None
class Teams(F1BaseModel):
pos: str | None = None
team: str | None = None
pts: str | None = None
Then, to ensure that the API is secure, we will have to implement the OAuth2 authentication and Bearer token authorization.
In dependencies.py
, we can have something like this:
from typing import Annotated
from fastapi import Depends, Request
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
from jose import JWTError, jwt
from loguru import logger
from app.config import config
from app.schema.auth import TokenData
from app.common.exception.exceptions import UnauthorizedException
from app.common.gcp_cloudsql import get_client_scopes
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)], request: Request
):
try:
payload = jwt.decode(
token, config.SECRET_KEY.get_secret_value(), algorithms=[config.ALGORITHM]
)
client_id: str = payload.get("sub")
if client_id is None:
logger.error("Could not validate credentials, missing client id")
raise UnauthorizedException("Could not validate credentials")
scopes = get_client_scopes(client_id)
logger.info(f"Client scopes: {scopes}")
endpoint = request.url.path[1:]
logger.info(f"Endpoint: {request.url.path}")
if endpoint not in [scope.strip() for scope in scopes.split(",")]:
logger.error(f"Client does not have access to endpoint: {endpoint}")
raise UnauthorizedException("Client does not have access to this endpoint")
token_data = TokenData(client_id=client_id)
except JWTError as e:
logger.error(f"JWTError, Could not validate credentials: {e}")
raise UnauthorizedException("Could not validate credentials")
return token_data
Which this code will ensure that the client has the correct scopes to access the API endpoint.
In routes/auth.py
, we can have something like this:
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends, Request
from jose import jwt
from loguru import logger
from app.config import config
from app.schema.auth import TokenResponse, TokenRequestForm
from app.common.gcp_cloudsql import get_client_by_id
from app.common.exception.exceptions import UnauthorizedException
from app.routes.root import limiter
router = APIRouter(prefix="/auth", tags=["auth"])
def authenticate_client(client_id: str, client_secret: str):
client = get_client_by_id(client_id)
if client and client["client_secret"] == client_secret and client["is_active"] == 1:
return client
return None
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode, config.SECRET_KEY.get_secret_value(), algorithm=config.ALGORITHM
)
return encoded_jwt
@router.post("/token", response_model=TokenResponse)
@limiter.limit("5/minute")
async def login_for_access_token(
request: Request, form_data: TokenRequestForm = Depends()
):
client = authenticate_client(form_data.client_id, form_data.client_secret)
if not client:
logger.error("Invalid client ID or secret")
raise UnauthorizedException("Invalid client ID or secret")
access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": client["client_id"]}, expires_delta=access_token_expires
)
logger.info("Access token created successfully")
return TokenResponse(
access_token=access_token,
token_type="bearer",
expires_in=60 * config.ACCESS_TOKEN_EXPIRE_MINUTES,
)
And in routes/f1api.py
, we can have something like this:
from typing import List, Annotated
from fastapi import APIRouter, HTTPException, Depends, Request
from app.schema.f1api import Drivers, Teams
from app.common.f1api import F1Scraper
from app.dependencies import get_current_user
from app.routes.root import limiter
f1api = F1Scraper()
router = APIRouter(prefix="/v1", tags=["f1"])
@router.get("/f1drivers")
@limiter.limit("5/minute")
async def f1drivers(
request: Request,
current_user: Annotated[str, Depends(get_current_user)] = None,
) -> List[Drivers]:
try:
data = f1api.get_driver_data()
data_items = []
if data:
driver_data = data["data"]
data_items = [Drivers(**item) for item in driver_data]
return data_items
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/f1teams")
async def f1teams(
request: Request,
current_user: Annotated[str, Depends(get_current_user)] = None,
) -> List[Teams]:
try:
data_items = []
data = f1api.get_team_data()
if data:
team_data = data["data"]
data_items = [Teams(**item) for item in team_data]
return data_items
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Optionally, we can also add a rate limiter to the API, to prevent abuse of the API.
In routes/root.py
, we can have something like this:
from http import HTTPStatus
from fastapi import APIRouter, Request
from slowapi import Limiter
from slowapi.util import get_remote_address
from app.common.gcp_cloudsql import pgsql_query
router = APIRouter()
limiter = Limiter(key_func=get_remote_address)
HEALTH_DESCRIPTION = "Check to make sure the service is up and running"
@router.get(
"/health",
summary="Health check",
status_code=HTTPStatus.OK,
description=HEALTH_DESCRIPTION,
)
@limiter.limit("5/minute")
async def health(request: Request) -> None:
"""Simply returns with positive code if service is running as expected"""
pass
And finally, in main.py
, we can put everything together like this:
from fastapi import FastAPI
from fastapi.exceptions import RequestValidationError
from slowapi.errors import RateLimitExceeded
from app.common.exception.exceptions import (
CustomError,
handle_custom_error,
handle_validation_error,
handle_rate_limit_exceeded_error,
)
from app.routes import auth, f1api, root
from app.routes.root import limiter
import app.common.logging as custom_logging
from os import getenv
import google.cloud.logging
if (env := getenv("ENV")) and env == "prod":
client = google.cloud.logging.Client()
client.setup_logging()
app = FastAPI()
app.exception_handler(RateLimitExceeded)(handle_rate_limit_exceeded_error)
app.exception_handler(CustomError)(handle_custom_error)
app.exception_handler(RequestValidationError)(handle_validation_error)
app.include_router(auth.router)
app.include_router(root.router, prefix="", tags=["root"])
app.include_router(f1api.router)
@app.on_event("startup")
async def connect():
custom_logging.init()
app.state.limiter = limiter
Deploying to Cloud Run
In the deploy/production/service.yaml
, we can have something like this:
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: fastapi-wrapper-app
labels:
cloud.googleapis.com/location: us-central1
annotations:
run.googleapis.com/ingress: all
run.googleapis.com/ingress-status: all
spec:
template:
metadata:
annotations:
run.googleapis.com/startup-cpu-boost: 'true'
spec:
serviceAccountName: gcp-svc-compute@developer.gserviceaccount.com
containers:
- image: us-central1-docker.pkg.dev/gcp-prj-123/gcp-app/fastapi-wrapper-app
ports:
- name: http1
containerPort: 80
env:
- name: GCP_PROJECT_ID
value: gcp-prj-123
- name: ENV
value: prod
And in skaffold.yaml
, we can have something like this:
apiVersion: skaffold/v4beta2
kind: Config
metadata:
name: fastapi-wrapper-app
build:
artifacts:
- image: us-central1-docker.pkg.dev/gcp-prj-123/gcp-app/fastapi-wrapper-app
docker:
dockerfile: Dockerfile
platforms:
- "linux/amd64"
profiles:
- name: production
manifests:
rawYaml:
- deploy/production/service.yaml
deploy:
cloudrun:
projectid: gcp-prj-123
region: us-central1
And in Dockerfile, we can have something like this:
FROM python:3.11-slim
WORKDIR /usr/src
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
UV_VERSION=0.4.0
ENV PATH="/root/.local/bin:$PATH"
RUN pip install pipx
RUN pipx install uv==${UV_VERSION}
COPY ./uv.lock pyproject.toml /usr/src/
COPY app /usr/src/app
CMD ["uv", "run", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "80"]
Finally, we can deploy by running:
skaffold run -p production -v info
Once the deployment is successful, we can access the API as below
BASE_URL = "https://myapp.us-central1.run.app"
token_url = f"{BASE_URL}/auth/token"
drivers_url = f"{BASE_URL}/v1/f1drivers"
teams = f"{BASE_URL}/hrapi/f1teams"
payload = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": "",
}
response_token = requests.post(token_url, data=payload)
access_token = response_token.json()["access_token"]
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
driver_response = requests.get(drivers_url, headers=headers)
team_response = requests.get(teams_url, headers=headers)
And that’s it! We have successfully deployed a F1 API Wrapper using FastAPI.
Thank you for reading.