Introduction

In my previous articles, we have talked about how to set up a Cloud Run application with for MLFlow Server and Kubflow on GKE with Ingress, load balancer, IAP set up for only allowing traffic from specific IP addresses. In this article, I will talk about how to deploy ML endpoints to serve predictions as a Cloud Run applications with Ingress control to communicate with each other, also how to deploy a Vertex AI serving endpoint and a kubeflow KServe endpoint.

Architecture

Here is an overview of the MLOPS workflow.

png

And in today’s post, we will focus on this part.

png

To follow along this blog post, the repo is here

Learnings

In today’s post, we will be

1 - Set up a VPC connector for the Cloud Run application

2 - Create a FastAPI application that uses MLFlow model to make predictions

3 - Deploy Cloud Run applications with appropriate configuration

4 - Make a request to the FastAPI application to make predictions

5 - Set up a Vertex AI endpoint (optional)

6 - Set up a Kubeflow KServe endpoint (optional)

Set Up VPC connector

The reason for setting up Cloud Run applications with Ingress control to communicate with each other is that after setting up MLFlow with Cloud Run application, we will be building an API endpoint that uses model from MLFlow to make predictions. We will deploy the API endpoint with Cloud Run as the second application, thus the two applications need to communicate with each other. With the MLFlow application set up with Ingress, if we set up the API endpoint without any network control, it will not be able to reach the MLFlow API to retrieve the model for making predictions. Therefore, we will set up a VPC connector for the API endpoint to communicate with the MLFlow application.

A high level application structure looks something like this

png

To create a VPC connector, on GCP console, we can navigate to VPC network -> Serverless VPC access -> Create connector, fill in the necessary details and create the connector.

png

To check the available locations for VPC connector, we can run the following command

gcloud compute networks vpc-access locations list

Create FastAPI application

Folder strcutre for the FastAPI application looks like this

.
├── Dockerfile
├── README.md
├── app
│   ├── basic_auth.py
│   ├── gcp_secrets.py
│   ├── helpers
│   │   ├── load_mlflow_models.py
│   │   └── models.py
│   ├── main.py
│   └── settings.py
├── deploy
│   └── production
│       └── service.yaml
├── poetry.lock
├── pyproject.toml
└── skaffold.yaml

Our Dockerfile looks like this, please note that the python version that we are using for the API endpoint needs to be the same as the environment where the prediction API will be used. For example, if you will be calling the API from a Jupiter notebook, the python version of the Jupiter notebook should also e 3.11

FROM python:3.11-slim

WORKDIR /usr/src/app

ENV PYTHONUNBUFFERED=1 \
    PYTHONDONTWRITEBYTECODE=1 \
    POETRY_VERSION=1.7.1 \
    USERNAME=nonroot

RUN adduser $USERNAME
USER $USERNAME

ENV HOME=/home/$USERNAME
ENV PATH="$HOME/.local/bin:$PATH"

RUN pip install pipx
RUN pipx install poetry==${POETRY_VERSION}

COPY ./poetry.lock pyproject.toml /usr/src/app/
RUN poetry install -nv --no-root

COPY app .
CMD ["poetry", "run", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "80"]

Our pyproject.toml looks like this. Same as Dockerfile, apart from the python version, the scikit-learn and xgboost version should be the same as the environment where the prediction API will be used. Could be a Jupiter notebook, a local python virtual env, etc.

[tool.poetry]
name = "mlflow-wine-pred-endpoint"
version = "0.1.0"
description = ""
authors = [""]
readme = "README.md"

[tool.poetry.dependencies]
python = "3.11"

pandas = "2.2.2"
httpx = "^0.27.0"
fastapi = "^0.104.1"
uvicorn = "^0.24.0.post1"
emoji = "^2.9.0"

pydantic-settings = "^2.2.1"

mlflow = "2.14.1"
xgboost = "1.7.5"
scikit-learn = "1.3.1"

google-cloud-secret-manager = "^2.16.3"
google-cloud-logging = "^3.8.0"
google-cloud-storage = "^2.10.0"

requests = "^2.31.0"

[build-system]
requires = ["poetry-core>=1.4.1"]
build-backend = "poetry.core.masonry.api"

For the API endpoint, we will want to set up a mechanism to authenticate the request to the API endpoint. We can use HTTPBasic for this. The validate function will be used to validate the credentials. The validate function will be used as a dependency for the API endpoint.

import secrets
from typing import Annotated
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from settings import app_config

security = HTTPBasic()

AUTH_USERNAME = app_config.MLFLOW_TRACKING_USERNAME.encode("utf8")
AUTH_PASSWORD = app_config.MLFLOW_TRACKING_PASSWORD.encode("utf8")


def validate(credentials: Annotated[HTTPBasicCredentials, Depends(security)]):
    current_username_bytes = credentials.username.encode("utf8")
    is_correct_username = secrets.compare_digest(current_username_bytes, AUTH_USERNAME)
    current_password_bytes = credentials.password.encode("utf8")
    is_correct_password = secrets.compare_digest(current_password_bytes, AUTH_PASSWORD)
    if not (is_correct_username and is_correct_password):
        raise HTTPException(status_code=401)
    return credentials.username

And in our main.py, we can set up the API endpoint like this. Which ensures that the request to the API endpoint is authenticated.

from fastapi import Depends, FastAPI, Response, Request, HTTPException
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBasicCredentials

from typing import List, Annotated
import pandas as pd

from helpers.models import WineQualityInput
from helpers.load_mlflow_models import load_latest_model
from basic_auth import validate

from os import getenv
import google.cloud.logging
import logging


if (env := getenv("ENV")) and env == "prod":
    client = google.cloud.logging.Client()
    client.setup_logging()

logging.basicConfig(
    level=logging.INFO,
    format="[%(asctime)s] {%(filename)s:%(lineno)d} %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)

app = FastAPI()


@app.get("/health")
async def health():
    return "ok"


@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    exc_str = f"{exc}".replace("\n", " ").replace("   ", " ")
    logger.error(f"{request}: {exc_str}")
    content = {"code": 422, "message": exc_str, "data": None}
    return JSONResponse(content=content, status_code=422)


@app.post("/invocations")
async def invocations(
    inputs: List[WineQualityInput],
    credentials: Annotated[HTTPBasicCredentials, Depends(validate)],
):
    input_df = pd.DataFrame([input_data.dict() for input_data in inputs])

    model = load_latest_model()

    try:
        predictions = model.predict(input_df)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

    return predictions.tolist()

Deploy Cloud Run application with VPC connector

Deploy FastAPI application to Cloud Run using skaffold

The skaffold.yaml looks like this

apiVersion: skaffold/v4beta2
kind: Config
metadata:
  name: mlflow-wine-pred-endpoint-app
build:
  artifacts:
  - image: us-central1-docker.pkg.dev/gcp-prj-id-123/mlflow-gcp/mlflow-wine-pred-endpoint-app
    docker:
      dockerfile: Dockerfile
    platforms:
      - "linux/amd64"
profiles:
- name: production
  manifests:
    rawYaml:
    - deploy/production/service.yaml
deploy:
  cloudrun:
    projectid: gcp-prj-id-123
    region: us-central1

We can see the skaffold configuration file is set up to reference deploy/production/service.yaml.

The service.yaml file looks like this

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: mlflow-wine-pred-endpoint-app
  labels:
    cloud.googleapis.com/location: us-central1
    annotations:
spec:
  template:
    metadata:
      annotations:
        run.googleapis.com/vpc-access-egress: all-traffic
        run.googleapis.com/vpc-access-connector: projects/gcp-prj-id-123/locations/us-central1/connectors/custom-vpc-connector
    spec:
      serviceAccountName: svc@developer.gserviceaccount.com
      containers:
      - image: us-central1-docker.pkg.dev/gcp-prj-id-123/mlflow-gcp/mlflow-wine-pred-endpoint-app
        ports:
        - name: http1
          containerPort: 80
        env:
        - name: GCP_PROJECT_ID
          value: gcp-prj-id-123
        - name: ENV
          value: prod

We can see we have set the run.googleapis.com/vpc-access-egress and run.googleapis.com/vpc-access-connector annotations to allow the Cloud Run application to use the VPC connector we have created.

And finally, to deploy, we can run the command

skaffold run -p production

Redeploy MLFlow application to use VPC

Update the deploy/production/service.yaml to add the below lines under spec -> template -> metadata -> annotations

run.googleapis.com/vpc-access-egress: private-ranges-only
run.googleapis.com/vpc-access-connector: projects/gcp-prj-id-123/locations/us-central1/connectors/custom-vpc-connector

The complete yaml file looks like this

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: mlflow-gcp
  annotations:
    run.googleapis.com/ingress: internal-and-cloud-load-balancing
    run.googleapis.com/ingress-status: internal-and-cloud-load-balancing
spec:
  template:
    metadata:
      annotations:
        run.googleapis.com/cloudsql-instances: gcp-prj-id-123:us-central1:gcp-pgsql
        run.googleapis.com/startup-cpu-boost: 'true'
        run.googleapis.com/vpc-access-egress: private-ranges-only
        run.googleapis.com/vpc-access-connector: projects/gcp-prj-id-123/locations/us-central1/connectors/custom-vpc-connector
    spec:
      serviceAccountName: svc@developer.gserviceaccount.com
      containers:
      - image: us-central1-docker.pkg.dev/gcp-prj-id-123/mlflow-gcp/mlflow-gcp
        ports:
        - name: http1
          containerPort: 8080
        env:
        - name: GCP_PROJECT
          value: gcp-prj-id-123
        resources:
          limits:
            memory: 2Gi
            cpu: 1000m

To deploy

skaffold run -p production

Make a request to the FastAPI application to make predictions

from google.cloud import secretmanager

class GCPSecrets:
    def __init__(self) -> None:
        self.client = secretmanager.SecretManagerServiceClient()
        self.base_url = f"projects/{PROJECT_ID}/secrets"

    def get_secret(self, secret):
        secret_response = self.client.access_secret_version(
            name=f"{self.base_url}/{secret}/versions/latest"
        )
        creds = secret_response.payload.data.decode("UTF-8")
        return creds

secrets = GCPSecrets()
import base64
import requests
import json

cred_str = f"{secrets.get_secret("user")}:{secrets.get_secret("password")}"
cred_bytes = str.encode(cred_str)
credentials = base64.b64encode(cred_bytes).decode("utf-8")

headers = {
"Content-Type": "application/json",
"Authorization": f"Basic {credentials}",
}
# Prepare the input data

input_data = X_test.to_dict(orient='records')

input_data = input_data[:5]
input_json = json.dumps(input_data)

CLOUD_RUN_URL = f"{secrets.get_secret("prediction-endpoint")}/invocations"
# Make a request to the FastAPI application to make predictions
response = requests.post(CLOUD_RUN_URL, data=input_json, headers=headers)

if response.status_code == 200:
    predictions = response.json()
    print("Predictions:", predictions)
else:
    print("Failed to get predictions. Status code:", response.status_code)
    print("Response:", response.text)
  

png

And then if we try to request the FastAPI application to make predictions, without an Authentication header, we will get an error like this

png

Set Up Vertex AI endpoint (optional)

We already slightly discussed about this on the previous post, to use kubeflow to deploy a Vertex AI servin endpoint, we can execute the below.

@component(
    packages_to_install=["google-cloud-aiplatform"]
)
def model_deployment()-> NamedTuple("endpoint", [("endpoint", str)]):
    
    from google.cloud import aiplatform
    
    aiplatform.init(project="gcp-prj-id-123", location="us-central1", staging_bucket="gs://gcp-bucket-kubeflow")
    
    model = aiplatform.Model.upload(
        display_name="coupon-recommendation-model",
        artifact_uri="gs://gcp-bucket-kubeflow/ml-model/artifacts/",
        serving_container_image_uri = "us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-6:latest",
        sync=False
    )
    
    DEPLOYED_NAME = "coupon-model-endpoint"
    TRAFFIC_SPLIT = {"0": 100}
    MIN_NODES = 1
    MAX_NODES = 1

    endpoint = model.deploy(
        deployed_model_display_name=DEPLOYED_NAME,
        traffic_split=TRAFFIC_SPLIT,
        machine_type="n1-standard-4",
        min_replica_count=MIN_NODES,
        max_replica_count=MAX_NODES
    )
@dsl.pipeline(
    pipeline_root="gs://gcp-bcuket-kubeflow/ml-pipeline-v1",
    name="ml-model-training-ep-deplyment-pipeline",
)
def pipeline(
    project: str = "kubeflow-mlops",
    region: str = "us-central1"
    ):
    
    max_depth=5
    learning_rate=0.2
    n_estimators=40

    input_validation_task = validate_input_ds()
    

    model_training = custom_training_job_component(
        max_depth=max_depth,
        learning_rate=learning_rate,
        n_estimators=n_estimators,
    ).after(input_validation_task)
    
    with dsl.Condition(model_training.outputs["model_validation"] == "true"):
        task_deploy_model = model_deployment().after(model_training)

        
compiler.Compiler().compile(pipeline_func=pipeline,package_path='ml-pipeline-ep-deploy.json')

start_pipeline = pipeline_jobs.PipelineJob(
    display_name="ml-model-ep-deployment-pipeline",
    template_path="ml-pipeline-ep-deploy.json",
    enable_caching=False,
    location="us-central1",
)

start_pipeline.run()

Set Up KServe endpoint (optional)

To deploy the endpoint to Kserve

# Import the necessary libraries

from kubernetes import client
from kserve import KServeClient
from kserve import constants
from kserve import utils
from kserve import V1beta1InferenceService
from kserve import V1beta1InferenceServiceSpec
from kserve import V1beta1PredictorSpec
from kserve import V1beta1XGBoostSpec
namespace = utils.get_default_target_namespace()
name = 'xgboost-ml-model'
kserve_version = 'v1beta1'
api_version = constants.KSERVE_GROUP + '/' + kserve_version
# Define InferenceService
isvc = V1beta1InferenceService(api_version=api_version,
                               kind=constants.KSERVE_KIND,
                               metadata=client.V1ObjectMeta(
                                   name=name, namespace=namespace, annotations={'sidecar.istio.io/inject': 'false'}),
                               spec=V1beta1InferenceServiceSpec(
                                   predictor=V1beta1PredictorSpec(
                                       xgboost=V1beta1XGBoostSpec(
                                           storage_uri="gs://gcp-bucket-kubeflow/ml-model/artifacts/",
                                       )
                                   )
                               )
)

# Create InferenceService 

KServe = KServeClient()
KServe.create(isvc)

Thank you for reading and have a nice day!