In the last post, we have discussed about using FastAPI to develop a API Wrapper for F1 data. In this post, we will discuss about using Django to develop an API client control portal that will be deployed to Cloud Run, so that we can use it to manage the API clients and their access to the API.
There will be two main functions we will implement in this project:
1 - Create new API client
This allows us to create a new API client with appropriate level of access, and generate a client ID and client secret.
2 - Update API client
This allows us to modify the access level (scopes/columns) of an existing API client.
To follow with the post, you can check the code here
What is Django
Django is a high-level Python web framework that promotes rapid development and clean, pragmatic design. It comes with an admin interface, ORM, templating engine, and much more. For this project, we use Django to build a full-stack web application to manage our API clients.
Project Overview
1 - Initialize Django app with UV and pyproject.toml
2 - Build frontend using Django templates for managing scopes
3 - Store scopes and access control rules in PostgreSQL (Cloud SQL)
4 - Deploy to Google Cloud Run using Skaffold
Project Structure
.
├── Dockerfile
├── README.md
├── clientapp
│ ├── views.py
│ ├── models.py
│ ├── services.py
│ ├── forms.py
│ └── templates/clientapp
│ ├── base.html
│ ├── index.html
│ ├── client_scopes.html
│ ├── client_scopes_edit.html
│ ├── client_scopes_edit_success.html
│ ├── new_client.html
│ └── new_client_success.html
├── deploy/production/service.yaml
├── f1apiportal
│ ├── asgi.py
│ ├── gcp_secrets.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── manage.py
├── pyproject.toml
├── skaffold.yaml
└── uv.lock
Initialize Django Project
We will use UV as Python package manager.
uv venv --python 3.11
source .venv/bin/activate
uv pip install django
Create the Django project and app:
django-admin startproject f1apiportal .
python manage.py startapp clientapp
Set up templates:
mkdir -p clientapp/templates/clientapp
echo "base.html client_scopes.html client_scopes_edit.html client_scopes_success.html index.html new_client.html new_client_success.html" | xargs -n 1 -I {} touch clientapp/templates/clientapp/{}
These HTML files correspond to the UI of differnt pages of the client portal:
1 - base.html
2 - index.html
3 - client_scopes.html
4 - client_scopes_edit.html
5 - client_scopes_edit_success.html
6 - new_client.html
7 - new_client_success.html
And the followings are the example outputs
Models and Services
The backend models:
1 - ClientScopesGet
: holds the client ID, secret, scopes
2 - ClientEndpointsAccess
: maps each client to endpoint + column access
3 - ScopesGet
: defines available scopes and columns
In clientapp/models.py:
from django.db import models
class ClientScopesGet(models.Model):
client_id = models.CharField(max_length=50)
client_secret = models.CharField(max_length=100)
scopes = models.TextField()
is_active = models.IntegerField()
def __str__(self):
return self.client_id
class Meta:
db_table = "client_scopes_get"
class ClientEndpointsAccess(models.Model):
client_id = models.CharField(max_length=50)
endpoint = models.TextField()
allow_access = models.TextField()
extra_constrains = models.TextField(null=True, blank=True)
def __str__(self):
return f"{self.client_id} - {self.endpoint}"
class Meta:
db_table = "client_endpoints_access"
class ScopesGet(models.Model):
endpoint = models.CharField(max_length=180)
scope = models.CharField(max_length=1000)
column_access = models.TextField()
def __str__(self):
return self.endpoint
class Meta:
db_table = "scopes_get"
The business logic is handled in services.py
which processes:
1 - Column-level access per endpoint
2 - Scope modification and client generation
3 - Updates/deletes in a transaction-safe way
In clientapp/services.py
:
from typing import List, Dict, Any
import logging
from django.db import transaction
from django.urls import reverse
from django.shortcuts import redirect, get_object_or_404
from django.http import HttpRequest, HttpResponse
from .models import ClientScopesGet, ClientEndpointsAccess, ScopesGet
logger = logging.getLogger(__name__)
def get_client_and_scopes(client_id: str) -> Dict[str, Any]:
client = get_object_or_404(ClientScopesGet, client_id=client_id)
logger.info(f"Client ID: {client_id}")
current_scopes = client.scopes.split(", ") if client.scopes else []
logger.info(f"Current scopes: {current_scopes}")
other_scopes = ScopesGet.objects.exclude(scope__in=current_scopes).values_list(
"scope", flat=True
)
logger.info(f"Other scopes: {other_scopes}")
return {
"client": client,
"current_scopes": current_scopes,
"other_scopes": other_scopes,
}
def prepare_scopes_data(
current_scopes: List[str], client_id: str
) -> List[Dict[str, Any]]:
scopes_data = []
for scope in current_scopes:
scope_data = ScopesGet.objects.filter(scope=scope).first()
logger.info(f"Scope data: {scope_data}")
if scope_data:
allow_access = (
ClientEndpointsAccess.objects.filter(
client_id=client_id, endpoint=scope
)
.values_list("allow_access", flat=True)
.first()
)
logger.info(f"Allow access: {allow_access}")
current_access = allow_access.split(", ") if allow_access else []
logger.info(f"Current access: {current_access}")
columns = [
{"name": column, "access": column in current_access}
for column in scope_data.column_access.split(", ")
]
logger.info(f"Columns: {columns}")
scopes_data.append({"scope": scope, "columns": columns})
logger.info(f"Scopes data: {scopes_data}")
return scopes_data
def process_post_request(
request: HttpRequest, client: ClientScopesGet, current_scopes: List[str]
) -> HttpResponse:
new_scopes = request.POST.get("scopes", "").strip()
column_access_input = request.POST.get("column_access", "").strip()
removed_columns_input = request.POST.get("removed_columns", "").strip()
removed_scopes_input = request.POST.get("removed_scopes", "").strip()
if new_scopes:
client.scopes = new_scopes
client.save()
removed_scopes_list = (
removed_scopes_input.split(", ") if removed_scopes_input else []
)
column_access_dict: Dict[str, Any] = parse_input(column_access_input)
removed_columns_dict: Dict[str, Any] = parse_input(removed_columns_input)
with transaction.atomic():
delete_removed_scopes(client.client_id, removed_scopes_list)
update_scopes_access(
client.client_id,
current_scopes,
removed_scopes_list,
column_access_dict,
removed_columns_dict,
)
return redirect(
reverse("client_scopes_edit_success") + f"?client_id={client.client_id}"
)
def parse_input(input_data: str) -> Dict[str, List[str]]:
parsed_data = {}
if input_data:
for item in input_data.split("; "):
if ": " in item:
scope, columns = item.split(": ")
parsed_data[scope] = columns.split(", ")
logger.info(f"Parsed input data: {parsed_data}")
return parsed_data
def delete_removed_scopes(client_id: str, removed_scopes_list: List[str]) -> None:
for scope in removed_scopes_list:
ClientEndpointsAccess.objects.filter(
client_id=client_id, endpoint=scope
).delete()
def update_scopes_access(
client_id: str,
current_scopes: List[str],
removed_scopes_list: List[str],
column_access_dict: Dict[str, List[str]],
removed_columns_dict: Dict[str, List[str]],
) -> None:
for scope in current_scopes:
logger.info(f"Scope: {scope}")
if scope in removed_scopes_list:
continue
existing_access = ClientEndpointsAccess.objects.filter(
client_id=client_id, endpoint=scope
).first()
logger.info(f"Existing access: {existing_access}")
existing_columns = (
existing_access.allow_access.split(", ")
if existing_access and existing_access.allow_access
else []
)
logger.info(f"Existing columns: {existing_columns}")
new_columns = column_access_dict.get(scope, [])
logger.info(f"New columns: {new_columns}")
columns_to_remove = removed_columns_dict.get(scope, [])
logger.info(f"Columns to remove: {columns_to_remove}")
final_columns = [
col
for col in set(existing_columns + new_columns)
if col not in columns_to_remove
]
logger.info(f"Final columns: {final_columns}")
if existing_access:
existing_access.allow_access = ", ".join(final_columns)
existing_access.save()
elif new_columns:
ClientEndpointsAccess.objects.create(
client_id=client_id,
endpoint=scope,
allow_access=", ".join(new_columns),
)
4 - Deploying to Cloud Run
Create your Dockerfile
like this:
FROM python:3.11-slim
WORKDIR /usr/src
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
UV_VERSION=0.4.0 \
UV_HTTP_TIMEOUT=1200
ENV PATH="/root/.local/bin:$PATH"
RUN pip install pipx
RUN pipx install uv==${UV_VERSION}
COPY . /usr/src/
RUN uv venv
ENV PATH="/usr/src/.venv/bin:$PATH"
RUN uv pip install -r pyproject.toml
RUN python manage.py collectstatic --noinput
EXPOSE 8000
CMD ["uv", "run", "gunicorn", "f1apiportal.wsgi:application", "--bind", "0.0.0.0:8000"]
For Cloud Run, we need to set up a few things:
deploy/production/service.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: f1apiportal
annotations:
run.googleapis.com/ingress: all
run.googleapis.com/ingress-status: all
spec:
template:
metadata:
annotations:
run.googleapis.com/cloudsql-instances: gcp-prj-123:us-central1:cloudsqldb
run.googleapis.com/vpc-access-egress: all-traffic
run.googleapis.com/startup-cpu-boost: 'true'
spec:
serviceAccountName: svc@developer.serviceaccount.com
containers:
- image: us-central1-docker.pkg.dev/gcp-prj-123/repo/f1apiportal
ports:
- name: http1
containerPort: 8000
env:
- name: GCP_PROJECT_ID
value: gcp-prj-123
- name: ENV
value: prod
resources:
limits:
memory: 1Gi
cpu: 1000m
We will be using Skaffold to deploy to Cloud Run.
skaffold.yaml
apiVersion: skaffold/v4beta2
kind: Config
metadata:
name: f1apiportal
build:
artifacts:
- image: us-central1-docker.pkg.dev/gcp-prj-123/repo/f1apiportal
docker:
dockerfile: Dockerfile
platforms:
- "linux/amd64"
profiles:
- name: production
manifests:
rawYaml:
- deploy/production/service.yaml
deploy:
cloudrun:
projectid: gcp-prj-123
region: us-central1
To deploy, we will run the command:
skaffold run -p production -v info
And that’s it!
Now you have a full-featured client management portal for your F1 API on Cloud Run.
Thanks for reading!