In some of my previous, we have discussed the use of FastAPI, such as using it as a webhook for a chatbot, and creating a machine learning API using FastAPI for prediction. In this post, we will discuss about using FastAPI to develop a API Wrapper for F1 data.

Project Overview

1 - Create a Cloud SQL database to store client scopes for authentication 2 - Scrape F1 drivers and team results from Formula1.com
3 - Build a FastAPI app that authenticates clients using a Cloud SQL database
4 - Deploy to Cloud Run with Skaffold and use UV as the Python package manager

To follow with the post, you can check the code here

Project Structure

The core folder structure is organized as follows:

.
├── Dockerfile
├── README.md
├── app
│   ├── common
│   │   ├── f1api.py
│   │   ├── exception
│   │   ├── gcp_cloudsql.py
│   │   └── gcp_secrets.py
│   ├── config
│   │   ├── __init__.py
│   │   └── config.py
│   ├── dependencies.py
│   ├── main.py
│   ├── routes
│   │   ├── auth.py
│   │   ├── root.py
│   │   └── f1api.py
│   └── schema
│       ├── auth.py
│       └── f1api.py
├── deploy
│   └── production
│       └── service.yaml
├── pyproject.toml
├── skaffold.yaml
└── uv.lock

Preparation

We will first have to create a Cloud SQL instance and a database to store the client scopes for authentication. The schema will look something like this. And the client_secret will be hashed using bcrypt.

CREATE TABLE client_scopes_get (
    client_id VARCHAR(50) NOT NULL,
    client_secret VARCHAR(100) NOT NULL,
    scopes VARCHAR(1000),
    is_active INT
);

The project uses UV as the Python package manager, in pyproject.toml we can have something like this:

[project]
name = "f1api-wrapper"
version = "0.1.0"
description = "F1 API Wrapper"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
    "loguru>=0.7.2",
    "requests>=2.32.3",
    "lxml>=5.3.0",
    "cssselect>=1.2.0",
    "pydantic>=2.4.2",
    "pydantic-settings>=2.0.3",
    "google-cloud-secret-manager>=2.16.3",
    "google-cloud-logging>=3.8.0",
    "google-cloud-storage>=2.10.0",
    "protobuf==4.25.3",
    "fastapi>=0.104.1",
    "slowapi>=0.1.9",
    "uvicorn>=0.24.0.post1",
    "bcrypt>=4.0.1",
    "python-jose>=3.3.0",
    "passlib>=1.7.4",
    "python-multipart>=0.0.17",
    "SQLAlchemy>=2.0.36",
    "pg8000>=1.31.2",
    "psycopg2-binary>=2.9.10",
]

Then we can run,

uv venv --python 3.11
source .venv/bin/activate
uv pip install -r pyproject.toml
uv lock

Developing API Wrapper

Firstly, we will need to extract data from the official F1 website. We can use the requests and lxml libraries to scrape the data. In common/f1api.py, we can have something like this:

from lxml import html
import requests
import logging

logger = logging.getLogger(__name__)


class F1Scraper:
    def __init__(self):
        self._driver_stat_url = (
            "https://www.formula1.com/en/results.html/2025/drivers.html"
        )
        self._team_stat_url = "https://www.formula1.com/en/results/2025/team"

    @staticmethod
    def _scrape_f1(url: str, key: str) -> dict:
        page = requests.get(url)
        tree = html.fromstring(page.content)

        tables = tree.cssselect("table")

        for table in tables:
            headers = [
                header.text_content().strip().lower()
                for header in table.cssselect("th")
            ]
            if key in headers and "pos" in headers and "pts" in headers:
                table_rows = table.cssselect("tr")
                break
        else:
            logger.error(
                "No matching table found. The structure of the website might have changed."
            )
            return {"data": []}

        column_headers = [
            column.text_content().strip() for column in table_rows[0].cssselect("th")
        ]

        data_rows = []
        for row in table_rows[1:]:
            data = [column.text_content().strip() for column in row.cssselect("td")]
            data_dict = dict(zip(column_headers, data))
            if key.capitalize() in data_dict:
                data_dict[key.capitalize()] = data_dict[key.capitalize()].replace(
                    "\xa0", " "
                )
            data_rows.append(data_dict)

        return {"data": data_rows}

    def get_driver_data(self) -> dict:
        return self._scrape_f1(self._driver_stat_url, "driver")

    def get_team_data(self) -> dict:
        return self._scrape_f1(self._team_stat_url, "team")

In common/gcp_cloudsql.py, we can have something like this:

from typing import Any, Dict, Optional, Union

from app.config import config
from app.common.exception.exceptions import CustomError

from loguru import logger
from sqlalchemy.sql import text


def pgsql_query(sql: str, params: dict = None) -> list[dict]:
    engine = config.postgres.engine
    conn = None
    try:
        conn = engine.connect()
        logger.info("Connected to the PostgreSQL database.")

        result = conn.execute(text(sql), params or {})
        logger.info("Query executed successfully.")

        rows = result.fetchall()
        logger.info("Fetched all rows.")
        print("Fetched all rows.")

        columns = result.keys()
        result_list = [dict(zip(columns, row)) for row in rows]
        logger.info("Converted rows to a list of dictionaries.")
        print(f"Converted rows to a list of dictionaries. {result_list}")
    except Exception as e:
        logger.error(f"Error executing query: {e}")
        raise CustomError(f"Error executing query: {e}")
    finally:
        if conn is not None:
            conn.close()
            logger.info("Database connection closed.")
    return result_list


def get_pgql_data(
    query: str,
    params: Dict[str, Any],
    result_key: Optional[str] = None,
    log_message: str = "",
) -> Union[dict, str]:
    result = pgsql_query(query, params)
    if result:
        logger.info(f"{log_message} return: {result[0]}")
        return result[0][result_key] if result_key else result[0]
    return "" if result_key else {}


def get_client_by_id(client_id: str) -> dict:
    query = "SELECT * FROM client_scopes_get WHERE client_id = :client_id"
    return get_pgql_data(
        query=query, params={"client_id": client_id}, log_message="get_client_by_id"
    )


def get_client_scopes(client_id: str) -> str:
    query = "SELECT scopes FROM client_scopes_get WHERE client_id = :client_id"
    return get_pgql_data(
        query=query,
        params={"client_id": client_id},
        result_key="scopes",
        log_message="get_client_scopes",
    )

In config/config.py, we can have something like this:

from pydantic import SecretStr, PrivateAttr, BaseModel
from pydantic_settings import BaseSettings

from sqlalchemy import create_engine
from sqlalchemy.engine import Engine


class PostgresConfig(BaseModel):
    _engine: Engine | None = PrivateAttr(default=None)

    connection_string: str

    @property
    def engine(self) -> Engine:
        if not self._engine:
            self._engine = create_engine(self.connection_string)
        return self._engine


class Config(BaseSettings):
    postgres: PostgresConfig
    SECRET_KEY: SecretStr
    ALGORITHM: str
    ACCESS_TOKEN_EXPIRE_MINUTES: int
    API_ENDPOINT_BASE_URL: str
    RATE_LIMIT_WINDOW: int

Then we will have to define schemas for the API, in fastAPI, schemas are used to define the request and response models.

In schema/auth.py, we can have something like this:

from pydantic import BaseModel

from typing import Optional
from fastapi import Form


class TokenResponse(BaseModel):
    access_token: str
    token_type: str
    expires_in: int


class TokenData(BaseModel):
    client_id: str | None = None


class TokenRequestForm:
    def __init__(
        self,
        grant_type: Optional[str] = Form(..., regex="^(password|client_credentials)$"),
        client_id: Optional[str] = Form(...),
        client_secret: Optional[str] = Form(...),
    ):
        self.grant_type = grant_type
        self.client_id = client_id
        self.client_secret = client_secret

In schema/f1api.py, we can have something like this:

from app.schema.base import F1BaseModel


class Drivers(F1BaseModel):
    pos: str | None = None
    driver: str | None = None
    nationality: str | None = None
    car: str | None = None
    pts: float | None = None


class Teams(F1BaseModel):
    pos: str | None = None
    team: str | None = None
    pts: str | None = None

Then, to ensure that the API is secure, we will have to implement the OAuth2 authentication and Bearer token authorization.

In dependencies.py, we can have something like this:

from typing import Annotated

from fastapi import Depends, Request
from fastapi.security import OAuth2PasswordBearer

from passlib.context import CryptContext
from jose import JWTError, jwt

from loguru import logger

from app.config import config
from app.schema.auth import TokenData
from app.common.exception.exceptions import UnauthorizedException

from app.common.gcp_cloudsql import get_client_scopes


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")


async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)], request: Request
):
    try:
        payload = jwt.decode(
            token, config.SECRET_KEY.get_secret_value(), algorithms=[config.ALGORITHM]
        )
        client_id: str = payload.get("sub")
        if client_id is None:
            logger.error("Could not validate credentials, missing client id")
            raise UnauthorizedException("Could not validate credentials")

        scopes = get_client_scopes(client_id)
        logger.info(f"Client scopes: {scopes}")
        endpoint = request.url.path[1:]
        logger.info(f"Endpoint: {request.url.path}")

        if endpoint not in [scope.strip() for scope in scopes.split(",")]:
            logger.error(f"Client does not have access to endpoint: {endpoint}")
            raise UnauthorizedException("Client does not have access to this endpoint")

        token_data = TokenData(client_id=client_id)
    except JWTError as e:
        logger.error(f"JWTError, Could not validate credentials: {e}")
        raise UnauthorizedException("Could not validate credentials")
    return token_data

Which this code will ensure that the client has the correct scopes to access the API endpoint.

In routes/auth.py, we can have something like this:

from datetime import datetime, timedelta, timezone

from fastapi import APIRouter, Depends, Request
from jose import jwt

from loguru import logger

from app.config import config
from app.schema.auth import TokenResponse, TokenRequestForm

from app.common.gcp_cloudsql import get_client_by_id
from app.common.exception.exceptions import UnauthorizedException

from app.routes.root import limiter


router = APIRouter(prefix="/auth", tags=["auth"])


def authenticate_client(client_id: str, client_secret: str):
    client = get_client_by_id(client_id)
    if client and client["client_secret"] == client_secret and client["is_active"] == 1:
        return client
    return None


def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(
        to_encode, config.SECRET_KEY.get_secret_value(), algorithm=config.ALGORITHM
    )
    return encoded_jwt


@router.post("/token", response_model=TokenResponse)
@limiter.limit("5/minute")
async def login_for_access_token(
    request: Request, form_data: TokenRequestForm = Depends()
):
    client = authenticate_client(form_data.client_id, form_data.client_secret)
    if not client:
        logger.error("Invalid client ID or secret")
        raise UnauthorizedException("Invalid client ID or secret")

    access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": client["client_id"]}, expires_delta=access_token_expires
    )
    logger.info("Access token created successfully")
    return TokenResponse(
        access_token=access_token,
        token_type="bearer",
        expires_in=60 * config.ACCESS_TOKEN_EXPIRE_MINUTES,
    )

And in routes/f1api.py, we can have something like this:

from typing import List, Annotated

from fastapi import APIRouter, HTTPException, Depends, Request

from app.schema.f1api import Drivers, Teams

from app.common.f1api import F1Scraper

from app.dependencies import get_current_user
from app.routes.root import limiter


f1api = F1Scraper()

router = APIRouter(prefix="/v1", tags=["f1"])


@router.get("/f1drivers")
@limiter.limit("5/minute")
async def f1drivers(
    request: Request,
    current_user: Annotated[str, Depends(get_current_user)] = None,
) -> List[Drivers]:
    try:
        data = f1api.get_driver_data()
        data_items = []
        if data:
            driver_data = data["data"]
            data_items = [Drivers(**item) for item in driver_data]
        return data_items
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@router.get("/f1teams")
async def f1teams(
    request: Request,
    current_user: Annotated[str, Depends(get_current_user)] = None,
) -> List[Teams]:
    try:
        data_items = []
        data = f1api.get_team_data()
        if data:
            team_data = data["data"]
            data_items = [Teams(**item) for item in team_data]
        return data_items
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Optionally, we can also add a rate limiter to the API, to prevent abuse of the API. In routes/root.py, we can have something like this:

from http import HTTPStatus

from fastapi import APIRouter, Request

from slowapi import Limiter
from slowapi.util import get_remote_address

from app.common.gcp_cloudsql import pgsql_query

router = APIRouter()
limiter = Limiter(key_func=get_remote_address)

HEALTH_DESCRIPTION = "Check to make sure the service is up and running"


@router.get(
    "/health",
    summary="Health check",
    status_code=HTTPStatus.OK,
    description=HEALTH_DESCRIPTION,
)
@limiter.limit("5/minute")
async def health(request: Request) -> None:
    """Simply returns with positive code if service is running as expected"""
    pass

And finally, in main.py, we can put everything together like this:

from fastapi import FastAPI
from fastapi.exceptions import RequestValidationError

from slowapi.errors import RateLimitExceeded


from app.common.exception.exceptions import (
    CustomError,
    handle_custom_error,
    handle_validation_error,
    handle_rate_limit_exceeded_error,
)

from app.routes import auth, f1api, root
from app.routes.root import limiter

import app.common.logging as custom_logging

from os import getenv
import google.cloud.logging


if (env := getenv("ENV")) and env == "prod":
    client = google.cloud.logging.Client()
    client.setup_logging()

app = FastAPI()


app.exception_handler(RateLimitExceeded)(handle_rate_limit_exceeded_error)

app.exception_handler(CustomError)(handle_custom_error)
app.exception_handler(RequestValidationError)(handle_validation_error)

app.include_router(auth.router)
app.include_router(root.router, prefix="", tags=["root"])
app.include_router(f1api.router)


@app.on_event("startup")
async def connect():
    custom_logging.init()
    app.state.limiter = limiter

Deploying to Cloud Run

In the deploy/production/service.yaml, we can have something like this:

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: fastapi-wrapper-app
  labels:
    cloud.googleapis.com/location: us-central1
  annotations:
    run.googleapis.com/ingress: all
    run.googleapis.com/ingress-status: all
spec:
  template:
    metadata:
      annotations:
        run.googleapis.com/startup-cpu-boost: 'true'
    spec:
      serviceAccountName: gcp-svc-compute@developer.gserviceaccount.com
      containers:
      - image: us-central1-docker.pkg.dev/gcp-prj-123/gcp-app/fastapi-wrapper-app
        ports:
        - name: http1
          containerPort: 80
        env:
        - name: GCP_PROJECT_ID
          value: gcp-prj-123
        - name: ENV
          value: prod

And in skaffold.yaml, we can have something like this:

apiVersion: skaffold/v4beta2
kind: Config
metadata:
  name: fastapi-wrapper-app
build:
  artifacts:
  - image: us-central1-docker.pkg.dev/gcp-prj-123/gcp-app/fastapi-wrapper-app
    docker:
      dockerfile: Dockerfile
    platforms:
      - "linux/amd64"
profiles:
- name: production
  manifests:
    rawYaml:
    - deploy/production/service.yaml
deploy:
  cloudrun:
    projectid: gcp-prj-123
    region: us-central1

And in Dockerfile, we can have something like this:

FROM python:3.11-slim

WORKDIR /usr/src

ENV PYTHONUNBUFFERED=1 \
    PYTHONDONTWRITEBYTECODE=1 \
    UV_VERSION=0.4.0

ENV PATH="/root/.local/bin:$PATH"

RUN pip install pipx
RUN pipx install uv==${UV_VERSION}


COPY ./uv.lock pyproject.toml /usr/src/

COPY app /usr/src/app

CMD ["uv", "run", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "80"]

Finally, we can deploy by running:

skaffold run -p production -v info

Once the deployment is successful, we can access the API as below

BASE_URL = "https://myapp.us-central1.run.app"
token_url = f"{BASE_URL}/auth/token"
drivers_url = f"{BASE_URL}/v1/f1drivers"
teams = f"{BASE_URL}/hrapi/f1teams"

payload = {
    "grant_type": "client_credentials",
    "client_id": client_id,
    "client_secret": client_secret,
    "scope": "",
}

response_token = requests.post(token_url, data=payload)

access_token = response_token.json()["access_token"]

headers = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json",
}


driver_response = requests.get(drivers_url, headers=headers)
team_response = requests.get(teams_url, headers=headers)

And that’s it! We have successfully deployed a F1 API Wrapper using FastAPI.

Thank you for reading.