Vertex AI: Use custom prediction routines with Sklearn to preprocess and postprocess data for predictions

1. Overview

In this lab, you'll learn how to use custom prediction routines on Vertex AI to write custom preprocessing and postprocessing logic. While this sample uses Scikit-learn, custom prediction routines can work with other Python ML frameworks such as XGBoost, PyTorch, and TensorFlow.

What you learn

You'll learn how to:

  • Write custom prediction logic with custom prediction routines
  • Test the custom serving container and model locally
  • Test the custom serving container on Vertex AI Predictions

The total cost to run this lab on Google Cloud is about $1 USD.

2. Intro to Vertex AI

This lab uses the newest AI product offering available on Google Cloud. Vertex AI integrates the ML offerings across Google Cloud into a seamless development experience. Previously, models trained with AutoML and custom models were accessible via separate services. The new offering combines both into a single API, along with other new products. You can also migrate existing projects to Vertex AI.

Vertex AI includes many different products to support end-to-end ML workflows. This lab will focus on Predictions and Workbench.

Vertex product overview

3. Use Case Overview

Use Case

In this lab, you'll build a random forest regression model to predict the price of a diamond based on attributes like cut, clarity, and size.

You'll write custom preprocessing logic to check that the data at serving time is in the format expected by the model. You'll also write custom postprocessing logic to round the predictions and convert them to strings. To write this logic, you'll use custom prediction routines.

Introduction to custom prediction routines

The Vertex AI pre-built containers handle prediction requests by performing the prediction operation of the machine learning framework. Prior to custom prediction routines, if you wanted to preprocess the input before the prediction is performed, or postprocess the model's prediction before returning the result, you would need to build a custom container.

Building a custom serving container requires writing an HTTP server that wraps the trained model, translates HTTP requests into model inputs, and translates model outputs into responses.

With custom prediction routines, Vertex AI provides the serving-related components for you, so that you can focus on your model and data transformations.

4. Set up your environment

You'll need a Google Cloud Platform project with billing enabled to run this codelab. To create a project, follow the instructions here.

Step 1: Enable the Compute Engine API

Navigate to Compute Engine and select Enable if it isn't already enabled. You'll need this to create your notebook instance.

Step 2: Enable the Artifact Registry API

Navigate to Artifact Registry and select Enable if it isn't already. You'll use this to create a custom serving container.

Step 3: Enable the Vertex AI API

Navigate to the Vertex AI section of your Cloud Console and click Enable Vertex AI API.

Vertex AI dashboard

Step 4: Create a Vertex AI Workbench instance

From the Vertex AI section of your Cloud Console, click on Workbench:

Vertex AI menu

Enable the Notebooks API if it isn't already.

Notebook_api

Once enabled, click USER-MANAGED NOTEBOOKS, then select NEW NOTEBOOK

New_notebook

Note: Running docker in a managed notebook kernel is currently not supported. That is why this lab uses a user-managed notebook.

Then select Python 3

Customize_notebook

Use the default options and then click Create.

When the instance is ready, click on OPEN JUPYTERLAB to open the notebook.

Open_jl

5. Write training code

Step 1: Create a cloud storage bucket

You'll store the model and preprocessing artifacts to a Cloud Storage bucket. If you already have a bucket in your project you'd like to use, you can skip this step.

From the launcher open up a new terminal session.

Open_terminal

From your terminal, run the following to define an env variable for your project, making sure to replace your-cloud-project with the ID of your project:

PROJECT_ID='your-cloud-project'

Next, run the following in your Terminal to create a new bucket in your project.

BUCKET="gs://${PROJECT_ID}-cpr-bucket"
gsutil mb -l us-central1 $BUCKET

Step 2: Train model

From the terminal, create a new directory called cpr-codelab and cd into it.

mkdir cpr-codelab
cd cpr-codelab

In the file browser, navigate to the new cpr-codelab directory, and then use the launcher to create a new Python 3 notebook called task.ipynb.

file_browser

Your cpr-codelab directory should now look like:

+ cpr-codelab/
    + task.ipynb

In the notebook, paste in the following code.

First, write a requirements.txt file.

%%writefile requirements.txt
fastapi
uvicorn==0.17.6
joblib~=1.0
numpy~=1.20
scikit-learn~=0.24
pandas
google-cloud-storage>=1.26.0,<2.0.0dev
google-cloud-aiplatform[prediction]>=1.16.0

The model you deploy will have a different set of dependencies pre-installed than your notebook environment. Because of this, you'll want to list all of the dependencies for the model in requirements.txt and then use pip to install the exact same dependencies in the notebook. Later, you'll test the model locally before deploying to Vertex AI to double check that the environments match.

Pip install the dependencies in the notebook.

!pip install -U --user -r requirements.txt

Note that you'll need to restart the kernel after the pip install completes.

restart_kernel

Next, create the directories where you'll store the model and preprocessing artifacts.

USER_SRC_DIR = "src_dir"
!mkdir $USER_SRC_DIR
!mkdir model_artifacts

# copy the requirements to the source dir
!cp requirements.txt $USER_SRC_DIR/requirements.txt

Your cpr-codelab directory should now look like:

+ cpr-codelab/
    + model_artifacts/
    + scr_dir/
        + requirements.txt
    + task.ipynb
    + requirements.txt

Now that the directory structure is set up, it's time to train a model!

First, import the libraries.

import seaborn as sns
import numpy as np
import pandas as pd

from sklearn import preprocessing
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import make_pipeline
from sklearn.compose import make_column_transformer

import joblib
import logging

# set logging to see the docker container logs
logging.basicConfig(level=logging.INFO)

Then define the following variables. Be sure to replace PROJECT_ID with your project id and BUCKET_NAME with the bucket you created in the previous step.

REGION = "us-central1"
MODEL_ARTIFACT_DIR = "sklearn-model-artifacts"
REPOSITORY = "diamonds"
IMAGE = "sklearn-image"
MODEL_DISPLAY_NAME = "diamonds-cpr"

# Replace with your project
PROJECT_ID = "{PROJECT_ID}"

# Replace with your bucket
BUCKET_NAME = "gs://{BUCKET_NAME}"

Load the data from the seaborn library and then create two dataframes, one with the features and the other with the label.

data = sns.load_dataset('diamonds', cache=True, data_home=None)

label = 'price'

y_train = data['price']
x_train = data.drop(columns=['price'])

Let's take a look at the training data. You can see that each row represents a diamond.

x_train.head()

And the labels, which are the corresponding prices.

y_train.head()

Now, define a sklearn column transform to one hot encode the categorical features and scale the numerical features

column_transform = make_column_transformer(
    (preprocessing.OneHotEncoder(sparse=False), [1,2,3]),
    (preprocessing.StandardScaler(), [0,4,5,6,7,8]))

Define the random forest model

regr = RandomForestRegressor(max_depth=10, random_state=0)

Next, make a sklearn pipeline. This means that data fed to this pipeline will first be encoded/scaled and then passed to the model.

my_pipeline = make_pipeline(column_transform, regr)

Fit the pipeline on the training data

my_pipeline.fit(x_train, y_train)

Let's try the model to make sure it's working as expected. Call the predict method on the model, passing in a test sample.

my_pipeline.predict([[0.23, 'Ideal', 'E', 'SI2', 61.5, 55.0, 3.95, 3.98, 2.43]])

Now we can save the pipeline to the model_artifacts dir, and copy it to the Cloud Storage bucket.

joblib.dump(my_pipeline, 'model_artifacts/model.joblib')

!gsutil cp model_artifacts/model.joblib {BUCKET_NAME}/{MODEL_ARTIFACT_DIR}/

Step 3: Save a preprocessing artifact

Next you'll create a preprocessing artifact. This artifact will be loaded in the custom container when the model server starts up. Your preprocessing artifact can be of almost any form (such as a pickle file), but in this case you'll write out a dictionary to a JSON file.

clarity_dict={"Flawless": "FL",
              "Internally Flawless": "IF",
              "Very Very Slightly Included": "VVS1",
              "Very Slightly Included": "VS2",
              "Slightly Included": "S12",
              "Included": "I3"}

The clarity feature in our training data was always in the abbreviated form (ie "FL" instead of "Flawless"). At serving time, we want to check that the data for this feature is also abbreviated. This is because our model knows how to one hot encode "FL" but not "Flawless". You'll write this custom preprocessing logic later. But for now, just save this look up table to a json file and then write it to the Cloud Storage bucket.

import json
with open("model_artifacts/preprocessor.json", "w") as f:
    json.dump(clarity_dict, f)

!gsutil cp model_artifacts/preprocessor.json {BUCKET_NAME}/{MODEL_ARTIFACT_DIR}/

Your local cpr-codelab directory should now look like:

+ cpr-codelab/
    + model_artifacts/
        + model.joblib
        + preprocessor.json
    + scr_dir/
        + requirements.txt
    + task.ipynb
    + requirements.txt

6. Build a custom serving container using the CPR model server

Now that the model has been trained and the and preprocessing artifact saved, it's time to build the custom serving container. Typically building a serving container requires writing model server code. However, with custom prediction routines, Vertex AI Predictions generates a model server and builds a custom container image for you.

A custom serving container contains the following 3 pieces of code:

  1. Model server (this will be generated automatically by the SDK and stored in scr_dir/)
    • HTTP server that hosts the model
    • Responsible for setting up routes/ports/etc.
  2. Request Handler
    • Responsible for webserver aspects of handling a request, such as deserializing the request body, and serializing the reponse, setting response headers, etc.
    • In this example, you'll use the default Handler, google.cloud.aiplatform.prediction.handler.PredictionHandler provided in the SDK.
  3. Predictor
    • Responsible for the ML logic for processing a prediction request.

Each of these components can be customized based on the requirements of your use case. In this example, you'll only implement the predictor.

The predictor is responsible for the ML logic for processing a prediction request, such as custom preprocessing and postprocessing. To write custom prediction logic, you'll subclass the Vertex AI Predictor interface.

This release of custom prediction routines comes with reusable XGBoost and Sklearn predictors, but if you need to use a different framework you can create your own by subclassing the base predictor.

You can see an example of the Sklearn predictor below. This is all the code you would need to write in order to build this custom model server.

sklearn_predictor

In your notebook paste in the following code below to subclass the SklearnPredictor and write it to a Python file in the src_dir/. Note that in this example we are only customizing the load, preprocess, and postprocess methods, and not the predict method.

%%writefile $USER_SRC_DIR/predictor.py

import joblib
import numpy as np
import json

from google.cloud import storage
from google.cloud.aiplatform.prediction.sklearn.predictor import SklearnPredictor


class CprPredictor(SklearnPredictor):

    def __init__(self):
        return

    def load(self, artifacts_uri: str) -> None:
        """Loads the sklearn pipeline and preprocessing artifact."""

        super().load(artifacts_uri)

        # open preprocessing artifact
        with open("preprocessor.json", "rb") as f:
            self._preprocessor = json.load(f)


    def preprocess(self, prediction_input: np.ndarray) -> np.ndarray:
        """Performs preprocessing by checking if clarity feature is in abbreviated form."""

        inputs = super().preprocess(prediction_input)

        for sample in inputs:
            if sample[3] not in self._preprocessor.values():
                sample[3] = self._preprocessor[sample[3]]
        return inputs

    def postprocess(self, prediction_results: np.ndarray) -> dict:
        """Performs postprocessing by rounding predictions and converting to str."""

        return {"predictions": [f"${value}" for value in np.round(prediction_results)]}

Let's take a deeper look at each of these methods.

  • the load method loads in the preprocessing artifact, which in this case is a dictionary mapping the diamond clarity values to their abbreviations.
  • the preprocess method uses that artifact to ensure that at serving time the clarity feature is in its abbreviated format. If not, it converts the full string to its abbreviation.
  • the postprocess method returns the predicted value as a string with a $ sign and rounds the value.

Next, use the Vertex AI Python SDK to build the image. Using custom prediction routines, the Dockerfile will be generated and image will be built for you.

from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, location=REGION)

import os

from google.cloud.aiplatform.prediction import LocalModel

from src_dir.predictor import CprPredictor  # Should be path of variable $USER_SRC_DIR

local_model = LocalModel.build_cpr_model(
    USER_SRC_DIR,
    f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPOSITORY}/{IMAGE}",
    predictor=CprPredictor,
    requirements_path=os.path.join(USER_SRC_DIR, "requirements.txt"),
)

Write a test file with two samples for prediction. One of the instances has the abbreviated clarity name, but the other needs to be converted first.

import json

sample = {"instances": [
  [0.23, 'Ideal', 'E', 'VS2', 61.5, 55.0, 3.95, 3.98, 2.43],
  [0.29, 'Premium', 'J', 'Internally Flawless', 52.5, 49.0, 4.00, 2.13, 3.11]]}

with open('instances.json', 'w') as fp:
    json.dump(sample, fp)

Test the container locally by deploying a local model.

with local_model.deploy_to_local_endpoint(
    artifact_uri = 'model_artifacts/', # local path to artifacts
) as local_endpoint:
    predict_response = local_endpoint.predict(
        request_file='instances.json',
        headers={"Content-Type": "application/json"},
    )

    health_check_response = local_endpoint.run_health_check()

You can see the prediction results with:

predict_response.content

7. Deploy model to Vertex AI

Now that you've tested the container locally, it's time to push the image to Artifact Registry and upload the model to Vertex AI Model Registry.

First, configure Docker to access Artifact Registry.

!gcloud artifacts repositories create {REPOSITORY} --repository-format=docker \
--location=us-central1 --description="Docker repository"


!gcloud auth configure-docker {REGION}-docker.pkg.dev --quiet

Then, push the image.

local_model.push_image()

And upload the model.

model = aiplatform.Model.upload(local_model = local_model,
                                display_name=MODEL_DISPLAY_NAME,
                                artifact_uri=f"{BUCKET_NAME}/{MODEL_ARTIFACT_DIR}",)

When the model is uploaded, you should see it in the console:

model_registry

Next, deploy the model so you can use it for online predictions. Custom prediction routines work with batch prediction as well so if your use case does not require online predictions, you do not need to deploy the model.

endpoint = model.deploy(machine_type="n1-standard-2")

Lastly, test the deployed model by getting a prediction.

endpoint.predict(instances=[[0.23, 'Ideal', 'E', 'VS2', 61.5, 55.0, 3.95, 3.98, 2.43]])

🎉 Congratulations! 🎉

You've learned how to use Vertex AI to:

  • Write custom preprocessing and postprocessing logic with custom prediction routines

To learn more about different parts of Vertex AI, check out the documentation.

8. Cleanup

If you'd like to continue using the notebook you created in this lab, it is recommended that you turn it off when not in use. From the Workbench UI in the Google Cloud Console, select the notebook and then select Stop.

If you'd like to delete the notebook entirely, click the Delete button in the top right.

Stop_nb

To delete the endpoint you deployed, navigate to the Endpoints section of the console, click on the endpoint you created, and then select Undeploy model from endpoint:

delete_endpoint

To delete the container image, navigate to Artifact Registry, select the repository you created, and select Delete

delete_image

To delete the Storage Bucket, using the Navigation menu in your Cloud Console, browse to Storage, select your bucket, and click Delete:

Delete storage