Building MLOps Workflows with Airflow 2 on GKE

Building MLOps Workflows with Airflow 2 on GKE

About this codelab

subjectLast updated Dec 10, 2024
account_circleWritten by Laurent Grangeau, Darren Evans, Merve Gül Sayan

1. Overview

852dc8844309ffb8.png

This CodeLab demonstrates how to integrate DevOps practices into machine learning (MLOps) by downloading a dataset, refining a model and deploying the LLM on Google Kubernetes Engine (GKE) by using an Airflow DAG with the least amount of abstraction. As a result we're using gcloud commands and not terraform so that you can follow the lab step-by-step and easily understand each process from the perspective of both the Platform Engineer and the Machine Learning Engineer.

This hands-on guide will walk you through leveraging Airflow to streamline your AI workflows, providing a clear and practical demonstration of the entire MLOps lifecycle by configuring a DAG.

What you'll learn

  • Foster greater collaboration and understanding between Platform and Machine Learning Engineers by breaking down knowledge silos and improving workflows
  • Understand how to deploy, use and manage Airflow 2 on GKE
  • Configure an Airflow DAG from end to end
  • Build the basis for production grade machine learning systems with GKE
  • Instrument and operationalize machine learning systems
  • Understand how Platform Engineering has become a critical support pillar for MLOps

What this CodeLab achieves

  • You can ask questions about movies from a LLM that we fine tuned based on Gemma-2-9b-it, served in GKE with vLLM.

Target Audience

  • Machine Learning Engineers
  • Platform Engineers
  • Data scientists
  • Data Engineers
  • DevOps Engineers
  • Platform Architect
  • Customer Engineers

This CodeLab is not intended

  • As an introduction to GKE or AI/ML workflows
  • As a run-through of the entire Airflow feature-set

2. Platform Engineering aids Machine Learning Engineers/Scientists

16635a8284b994c.png

Platform engineering and MLOps are interdependent disciplines that collaborate to create a robust and efficient environment for ML development and deployment.

Scope: Platform engineering has a broader scope than MLOps, encompassing the entire software development lifecycle and providing the tools and infrastructure for it.

MLOps bridges the gap between ML development, deployment and inference.

Expertise: Platform engineers typically have strong expertise in infrastructure technologies like cloud computing, containerization and data management.

MLOps engineers specialize in ML model development, deployment, and monitoring, often possessing data science and software engineering skills.

Tools: Platform engineers create tools for infrastructure provisioning, configuration management and container orchestration and application scaffolding. MLOps engineers utilize tools for ML model training, experimentation, deployment, monitoring, and versioning.

3. Google Cloud Setup and requirements

Self-paced environment setup

  1. Sign-in to the Google Cloud Console and create a new project or reuse an existing one. If you don't already have a Gmail or Google Workspace account, you must create one.

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • The Project name is the display name for this project's participants. It is a character string not used by Google APIs. You can always update it.
  • The Project ID is unique across all Google Cloud projects and is immutable (cannot be changed after it has been set). The Cloud Console auto-generates a unique string; usually you don't care what it is. In most codelabs, you'll need to reference your Project ID (typically identified as PROJECT_ID). If you don't like the generated ID, you might generate another random one. Alternatively, you can try your own, and see if it's available. It can't be changed after this step and remains for the duration of the project.
  • For your information, there is a third value, a Project Number, which some APIs use. Learn more about all three of these values in the documentation.
  1. Next, you'll need to enable billing in the Cloud Console to use Cloud resources/APIs. Running through this codelab won't cost much, if anything at all. To shut down resources to avoid incurring billing beyond this tutorial, you can delete the resources you created or delete the project. New Google Cloud users are eligible for the $300 USD Free Trial program.

Start Cloud Shell

While Google Cloud can be operated remotely from your laptop, in this codelab you will be using Cloud Shell, a command line environment running in the Cloud.

Activate Cloud Shell

  1. From the Cloud Console, click Activate Cloud Shell 853e55310c205094.png.

3c1dabeca90e44e5.png

If this is your first time starting Cloud Shell, you're presented with an intermediate screen describing what it is. If you were presented with an intermediate screen, click Continue.

9c92662c6a846a5c.png

It should only take a few moments to provision and connect to Cloud Shell.

9f0e51b578fecce5.png

This virtual machine is loaded with all the development tools needed. It offers a persistent 5 GB home directory and runs in Google Cloud, greatly enhancing network performance and authentication. Much, if not all, of your work in this codelab can be done with a browser.

Once connected to Cloud Shell, you should see that you are authenticated and that the project is set to your project ID.

  1. Run the following command in Cloud Shell to confirm that you are authenticated:
gcloud auth list

Command output

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. Run the following command in Cloud Shell to confirm that the gcloud command knows about your project:
gcloud config list project

Command output

[core]
project = <PROJECT_ID>

If it is not, you can set it with this command:

gcloud config set project <PROJECT_ID>

Command output

Updated property [core/project].

4. Step #1 - Sign-up and authenticate on Kaggle

To begin the CodeLab, you need to create an account on Kaggle which is an online community platform for data scientists and machine learning enthusiasts owned by Google and hosts a vast repository of publicly available datasets for various domains. It's from this site that you'll download the RottenTomatoes dataset, used to train your model.

  • Signup to Kaggle, you can use Google SSO to sign-in
  • Accept terms and conditions
  • Goto Settings and get your username username
  • Under the API section, select "Create new token from" Kaggle which will download kaggle.json
  • If you have any issues, then go to the support page here

5. Step #2 - Sign-up and authenticate on HuggingFace

HuggingFace is a central location for anyone to engage with Machine Learning technology. It hosts 900k models, 200k datasets, and 300k demo apps (Spaces), all open source and publicly available.

  • Signup to HuggingFace - Create an account with username, you cannot use Google SSO
  • Confirm your email address
  • Go here and accept license for the Gemma-2-9b-it model
  • Create a HuggingFace token here
  • Record the token credentials, you'll require it later

6. Step #3 - Create the required Google Cloud infrastructure resources

You'll set up GKE, GCE, Artifact registry and apply IAM roles using workload identity federation.

Your AI workflow employs two nodepools, one for training and one for inference. The training nodepool is using a g2-standard-8 GCE VM equipped with one Nvidia L4 Tensor Core GPU. The inference nodepool is using a g2-standard-24 VM equipped with two Nvidia L4 Tensor Core GPU. While specifying the region, choose one where the required GPU is supported ( Link).

In your Cloud Shell run the following commands:

# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")

SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1

# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto

# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount

# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com

# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
  --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-12 \
  --num-nodes=1

# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
  --accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-24 \
  --num-nodes=1

# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a

# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}

Create your YAML manifests

mkdir manifests
cd manifests

mlops-sa.yaml

apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
  name: airflow-mlops-sa
  namespace: airflow
  labels:
    tier: airflow

pv-dags.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-dags
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 5Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_DAGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pv-logs.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-logs
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 100Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_LOGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pvc-dags.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-dags
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  volumeName: airflow-dags
  storageClassName: standard

pvc-logs.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  volumeName: airflow-logs
  storageClassName: standard

namespace.yaml

kind: Namespace
apiVersion: v1
metadata:
  name: airflow
  labels:
    name: airflow

sa-role.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-deployment-role
rules:
- apiGroups: ["apps"]
  resources: ["deployments"]
  verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]

sa-rolebinding.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow-deployment-rolebinding
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: airflow-worker
  namespace: airflow
roleRef:
  kind: Role
  name: airflow-deployment-role
  apiGroup: rbac.authorization.k8s.io

inference.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
 
name: inference-deployment
 
namespace: airflow
spec:
 
replicas: 1
 
selector:
   
matchLabels:
     
app: gemma-server
 
template:
   
metadata:
     
labels:
       
app: gemma-server
       
ai.gke.io/model: gemma-2-9b-it
       
ai.gke.io/inference-server: vllm
     
annotations:
       
gke-gcsfuse/volumes: "true"
   
spec:
     
serviceAccountName: airflow-mlops-sa
     
tolerations:
     
- key: "nvidia.com/gpu"
       
operator: "Exists"
       
effect: "NoSchedule"
     
- key: "on-demand"
       
value: "true"
       
operator: "Equal"
       
effect: "NoSchedule"
     
containers:
     
- name: inference-server
       
image: vllm/vllm-openai:latest
       
ports:
       
- containerPort: 8000
       
resources:
         
requests:
           
nvidia.com/gpu: "2"
         
limits:
           
nvidia.com/gpu: "2"
       
command: ["/bin/sh", "-c"]
       
args:
       
- |
         
python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
       
volumeMounts:
       
- mountPath: /dev/shm
         
name: dshm
       
- name: gcs-fuse-csi-ephemeral
         
mountPath: /modeldata
         
readOnly: true
     
volumes:
     
- name: dshm
       
emptyDir:
         
medium: Memory
     
- name: gcs-fuse-csi-ephemeral
       
csi:
         
driver: gcsfuse.csi.storage.gke.io
         
volumeAttributes:
           
bucketName: BUCKET_DATA_NAME
           
mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
           
fileCacheCapacity: "20Gi"
           
fileCacheForRangeRead: "true"
           
metadataStatCacheCapacity: "-1"
           
metadataTypeCacheCapacity: "-1"
           
metadataCacheTTLSeconds: "-1"
     
nodeSelector:
       
cloud.google.com/gke-accelerator: nvidia-l4

inference-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: llm-service
  namespace: airflow
spec:
  selector:
    app: gemma-server
  type: LoadBalancer
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

Create 3 Google Cloud Storage (GCS) bucket's

gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud
storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud
storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}

# Create the namespace in GKE
kubectl
apply -f namespace.yaml

# Create the PV and PVC in GKE for Airflow DAGs storage
sed
-i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed
-i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed
-i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl
apply -f pv-dags.yaml
kubectl
apply -f pv-logs.yaml
kubectl
apply -f pvc-dags.yaml
kubectl
apply -f pvc-logs.yaml
kubectl
apply -f mlops-sa.yaml
kubectl
apply -f sa-role.yaml
kubectl
apply -f sa-rolebinding.yaml

Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation

gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"

7. Step #4 - Install Airflow on GKE through the helm chart

Now we deploy Airflow 2 using Helm. Apache Airflow is an open-source workflow management platform for data engineering pipelines. We'll go into the feature set of Airflow 2 later on.

values.yaml for the Airflow helm chart

config:
  webserver:
    expose_config: true
webserver:
  service:
    type: LoadBalancer
  podAnnotations:
    gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
  - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
    value: "30"
logs:
  persistence:
    enabled: true
    existingClaim: "airflow-logs"
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
scheduler:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
triggerer:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
workers:
  podAnnotations:
    gke-gcsfuse/volumes: "true"

Deploy Airflow 2

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade
--install airflow apache-airflow/airflow --namespace airflow -f values.yaml

8. Step #5 - Initialize Airflow with Connections and Variables

Once Airflow 2 has been deployed, we can start configuring it. We define some variables, which are read by our Python scripts.

  1. Access the Airflow UI on port 8080 with your browser

Get the external IP

kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'

Open a web browser and go to http://<EXTERNAL-IP>:8080 . The Login is admin / admin

  1. Create a default GCP connection within the Airflow UI, so go to Admin → Connections → + Add a new record
  • Connection Id: google_cloud_default
  • Connection Type: Google Cloud

Click Save.

  1. Create the needed variables, so go to Admin → Variables → + Add a new record
  • Key: BUCKET_DATA_NAME - Value: Copy from echo $BUCKET_DATA_NAME
  • Key: GCP_PROJECT_ID - Value: Copy from echo $DEVSHELL_PROJECT_ID
  • Key: HF_TOKEN - Value: Insert your HF token
  • Key: KAGGLE_USERNAME - Value: Insert your kaggle username
  • Key: KAGGLE_KEY - Value: Copy this from kaggle.json

Click Save after each key-value pair.

Your UI should look like this:

771121470131b5ec.png

9. Application code container #1 - Data downloading

In this Python script, we authenticate with Kaggle to download the dataset to our GCS bucket.

The script itself is containerized because this becomes DAG Unit #1 and we expect the dataset to be frequently updated, so we want to automate this.

Create directory and copy our scripts here

cd .. ; mkdir 1-dataset-download
cd 1-dataset-download

dataset-download.py

import os
import kagglehub
from google.cloud import storage

KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
   
storage_client = storage.Client()
   
bucket = storage_client.bucket(bucket_name)
   
blob = bucket.blob(destination_blob_name)
   
blob.upload_from_filename(source_file_name)
   
print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")

print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"

upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]

requirements.txt

google-cloud-storage==2.19.0
kagglehub
==0.3.4

Now we create a container image for dataset-download and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest

10. Application code container #2 - Data preparation

During our data preparation step, this is what we achieve:

  1. Specify how much of the dataset that we want to use for finetuning our base model
  2. Loads the dataset, i.e. reads the CSV file into a Pandas dataframe which is a 2-dimensional data structure for rows and columns
  3. Data transformation / preprocessing - Determine what parts of the dataset are irrelevant by specifying what we want to keep, which in effect is removing the rest
  4. Applies the transform function to each row of the DataFrame
  5. Save the prepared data back into the GCS bucket

Create directory and copy our scripts here

cd .. ; mkdir 2-data-preparation
cd
2-data-preparation

data-preparation.py

import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset

# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100"))  # Process a limited number of rows, used 100 during testing phase but can be increased

DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"

# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")

def transform(data):
    """
    Transforms a row of the DataFrame into the desired format for fine-tuning.

    Args:
      data: A pandas Series representing a row of the DataFrame.

    Returns:
      A dictionary containing the formatted text.
    """
   
question = f"Review analysis for movie '{data['id']}'"
   
context = data['reviewText']
   
answer = data['scoreSentiment']
   
template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
   
return {'text': template.format(question=question, context=context, answer=answer)}

try:
   
df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
   
print(f"Dataset loaded successfully.")

   
# Drop rows with NaN values in relevant columns
   
df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])

   
# Apply transformation to the DataFrame
   
transformed_data = df.apply(transform, axis=1).tolist()

   
# Convert transformed data to a DataFrame and then to a Hugging Face Dataset
   
transformed_df = pd.DataFrame(transformed_data)
   
dataset = Dataset.from_pandas(transformed_df)

   
# Save the prepared dataset to JSON lines format
   
with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
       
for item in dataset:
           
f.write(json.dumps(item) + "\n")

   
print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
   
except Exception as e:
   
print(f"Error during data loading or preprocessing: {e}")
   
import traceback
   
print(traceback.format_exc())

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR
/app
COPY requirements
.txt .
RUN pip install
--no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data
-preparation.py .
CMD
["python", "data-preparation.py"]

requirements.txt

datasets==3.1.0
gcsfs
==2024.9.0
pandas
==2.2.3

# Now we create a container images for data-preparation and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest

11. Application code container #3 - Finetuning

Here we use Gemma-2-9b-it as a base model and then finetune it with our new dataset.

These are the sequence of steps which happen during the finetuning step.

1. Setup: Import libraries, define parameters (for model, data, and training), and load the dataset from Google Cloud Storage.

2. Load Model: Load a pre-trained language model with quantization for efficiency, and load the corresponding tokenizer.

3. Configure LoRA: Set up Low-Rank Adaptation (LoRA) to finetune the model efficiently by adding small trainable matrices.

4. Train: Define training parameters and use the SFTTrainer to fine-tune the model on the loaded dataset using FP16 quantization type.

5. Save and Upload: Save the finetuned model and tokenizer locally, then upload them to our GCS bucket.

Then we create a container image using Cloud Build and store it in the Artifact Registry.

Create directory and copy our scripts here

cd .. ; mkdir 3-fine-tuning
cd
3-fine-tuning

finetuning.py

import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage

# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")

# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
   
"json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)

################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))

################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50

################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False

# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
   
attn_implementation="eager",
   
pretrained_model_name_or_path=model_name,
   
torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")

# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
   
lora_alpha=lora_alpha,
   
lora_dropout=lora_dropout,
   
r=lora_r,
   
bias="none",
   
task_type="CAUSAL_LM",
   
target_modules=[
       
"q_proj",
       
"k_proj",
       
"v_proj",
       
"o_proj",
       
"gate_proj",
       
"up_proj",
       
"down_proj",
   
],
)

# Set training parameters
training_arguments = SFTConfig(
       
bf16=bf16,
       
dataset_kwargs={
           
"add_special_tokens": False,  
           
"append_concat_token": False,
       
},
       
dataset_text_field="text",
       
disable_tqdm=True,
       
fp16=fp16,
       
gradient_accumulation_steps=gradient_accumulation_steps,
       
gradient_checkpointing=gradient_checkpointing,
       
gradient_checkpointing_kwargs={"use_reentrant": False},
       
group_by_length=group_by_length,
       
log_on_each_node=False,
       
logging_steps=logging_steps,
       
learning_rate=learning_rate,
       
lr_scheduler_type=lr_scheduler_type,
       
max_grad_norm=max_grad_norm,
       
max_seq_length=max_seq_length,
       
max_steps=max_steps,
       
num_train_epochs=num_train_epochs,
       
optim=optim,
       
output_dir=save_model_path,
       
packing=packing,
       
per_device_train_batch_size=per_device_train_batch_size,
       
save_strategy=save_strategy,
       
save_steps=save_steps,
       
save_total_limit=save_total_limit,
       
warmup_ratio=warmup_ratio,
       
weight_decay=weight_decay,
   
)

print(f"Configuring fine tuning completed")

# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
   
model=model,
   
train_dataset=dataset,
   
peft_config=peft_config,
   
dataset_text_field="text",
   
max_seq_length=max_seq_length,
   
tokenizer=tokenizer,
   
args=training_arguments,
   
packing=packing,
)

print(f"Creating trainer completed")

# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")

# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
   
low_cpu_mem_usage=True,
   
pretrained_model_name_or_path=model_name,
   
return_dict=True,
   
torch_dtype=torch.float16,
)

model = PeftModel.from_pretrained(
   
model=base_model,
   
model_id=new_model,
)
model = model.merge_and_unload()

print(f"Merging the new model with base model completed")

accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")

print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
   
is_main_process=accelerator.is_main_process,
   
save_directory=save_model_path,
   
save_function=accelerator.save,
)
print(f"Save unwrapped model completed")

print(f"Save new tokenizer started")
if accelerator.is_main_process:
   
tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")

# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
    """Uploads a directory to GCS."""
   
storage_client = storage.Client()
   
bucket = storage_client.bucket(bucket_name)
   
for root, _, files in os.walk(model_dir):
       
for file in files:
           
local_file_path = os.path.join(root, file)
           
gcs_file_path = os.path.relpath(local_file_path, model_dir)
           
blob = bucket.blob(os.path.join(new_model, gcs_file_path))  # Use new_model_name
           
blob.upload_from_filename(local_file_path)

# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")

Dockerfile

# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04

# Install necessary system packages
RUN apt-get update && \
    apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements.txt into the container
COPY requirements.txt .

# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Copy your finetune script into the container
COPY finetuning.py .

# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
 
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]

requirements.txt

accelerate==1.1.1
bitsandbytes
==0.45.0
datasets
==3.1.0
gcsfs
==2024.9.0
peft
==v0.13.2
torch
==2.5.1
transformers
==4.47.0
trl
==v0.11.4

Now we create a container images for finetuning and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest

12. Airflow 2 Overview inc what is a DAG

Airflow is a platform for orchestrating workflows and data pipelines. It uses DAGs (Directed Acyclic Graphs) to define these workflows in Python code, visually representing tasks and their dependencies.

Airflow, with its static DAGs and Python-based definitions, is well suited for scheduling and managing predefined workflows. Its architecture includes a user-friendly UI for monitoring and managing these workflows.

Essentially, Airflow allows you to define, schedule, and monitor your data pipelines using Python, making it a flexible and powerful tool for workflow orchestration.

13. Overview of our DAG

ec49964ad7d61491.png

DAG stands for Directed Acyclic Graph, in Airflow a DAG itself represents the entire workflow or pipeline. It defines the tasks, their dependencies, and the order of execution.

The units of workflow within the DAG are executed from a pod on the GKE cluster, initiated from the Airflow configuration.

Summary:

Airflow: Data download - This script automates the process of getting a movie review dataset from Kaggle and storing it in your GCS bucket, making it readily available for further processing or analysis in your cloud environment.

Airflow: Data Preparation - The code takes the raw movie review dataset, removes extraneous data columns not required for our use case and deletes datasets with missing values. Next, it structures the dataset into a question-answering format suitable for machine learning, and stores it back in GCS for later use.

Airflow: Model Finetuning - This code fine-tunes a large language model (LLM) using a technique called LoRA (Low-Rank Adaptation) and then saves the updated model. It starts by loading a pre-trained LLM and a dataset from Google Cloud Storage. Then, it applies LoRA to efficiently fine-tune the model on this dataset. Finally, it saves the fine-tuned model back to Google Cloud Storage for later use in applications like text generation or question answering.

Airflow: Model Serving - Serving the finetuned model on GKE with vllm for inference.

Airflow: Feedback loop - Retraining of the model every xx time (hourly, daily, weekly).

This diagram explains how Airflow 2 works, when run on GKE.

8691f41166209a5d.png

14. Finetuning a model vs using RAG

This CodeLab finetunes a LLM rather than using Retrieval Augmented Generation (RAG).

Let's compare these two approaches:

Finetuning: Creates a specialized model: Finetuning adapts the LLM to a specific task or dataset, allowing it to operate independently without relying on external data sources.

Simplifies inference: This eliminates the need for a separate retrieval system and database, resulting in faster and cheaper responses, especially for frequent use cases.

RAG: Relies on external knowledge: RAG retrieves relevant information from a knowledge base for each request, ensuring access to up-to-date and specific data.

Increases complexity: Implementing RAG in a production environment like a Kubernetes cluster often involves multiple microservices for data processing and retrieval, potentially increasing latency and computational costs.

Why finetuning was chosen:

While RAG would be suitable for the small dataset used in this CodeLab, we opted for finetuning to demonstrate a typical use case for Airflow. This choice allows us to focus on the workflow orchestration aspects rather than delve into the nuances of setting up additional infrastructure and microservices for RAG.

Conclusion:

Both finetuning and RAG are valuable techniques with their own strengths and weaknesses. The optimal choice depends on the specific requirements of your project, such as the size and complexity of your data, performance needs, and cost considerations.

15. DAG Task #1 - Create your first step on Airflow: Data download

As an overview of this DAG unit, our Python code hosted in a container image downloads the latest RottenTomatoes dataset from Kaggle.

Do not copy this code into the GCS bucket. We copy mlops-dag.py as the last step, which contains all DAG Unit steps within one Python script.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
           
start_date=datetime(2024,11,1),
           
schedule_interval="@daily",
           
catchup=False) as dag:

       
# Step 1: Fetch raw data to GCS Bucket
       
dataset_download = KubernetesPodOperator(
           
task_id="dataset_download_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
           
name="dataset-download",
           
service_account_name="airflow-mlops-sa",
           
env_vars={
                   
"KAGGLE_USERNAME":KAGGLE_USERNAME,
                   
"KAGGLE_KEY":KAGGLE_KEY,
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
           
}
       
)

       
dataset_download

16. DAG Task #2 - Create your second step on Airflow: Data Preparation

As an overview of this DAG unit, we load a CSV file (rotten_tomatoes_movie_reviews.csv) from GCS into a Pandas DataFrame.

Next, we limit the number of rows processed using DATASET_LIMIT for testing and resource efficiency and finally convert the transformed data into a Hugging Face Dataset.

If you look carefully, you will see that we are training 1000 rows in the model with the "DATASET_LIMIT": "1000", this is because it takes 20 minutes on an Nvidia L4 GPU to do so.

Do not copy this code into the GCS bucket. We copy mlops-dag.py at the last step, which contains all steps within one Python script.

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
           
start_date=datetime(2024,11,1),
           
schedule_interval="@daily",
           
catchup=False) as dag:

       
# Step 1: Fetch raw data to GCS Bucket
       
dataset_download = KubernetesPodOperator(
           
task_id="dataset_download_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
           
name="dataset-download",
           
service_account_name="airflow-mlops-sa",
           
env_vars={
                   
"KAGGLE_USERNAME":KAGGLE_USERNAME,
                   
"KAGGLE_KEY":KAGGLE_KEY,
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
           
}
       
)

       
# Step 2: Run GKEJob for data preparation
       
data_preparation = KubernetesPodOperator(
           
task_id="data_pipeline_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
           
name="data-preparation",
           
service_account_name="airflow-mlops-sa",
           
env_vars={
                   
"GCP_PROJECT_ID":GCP_PROJECT_ID,
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                   
"DATASET_LIMIT": "1000",
                   
"HF_TOKEN":HF_TOKEN
           
}
       
)

       
dataset_download >> data_preparation

17. DAG Task #3 - Create your third step on Airflow: Model Finetuning

As an overview of this DAG unit, here we execute finetune.py to refine the Gemma model with our new dataset.

Do not copy this code into the GCS bucket. We copy mlops-dag.py at the last step, which contains all steps within one Python script.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
           
start_date=datetime(2024,11,1),
           
schedule_interval="@daily",
           
catchup=False) as dag:

       
# DAG Task 1: Fetch raw data to GCS Bucket
       
dataset_download = KubernetesPodOperator(
           
task_id="dataset_download_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
           
name="dataset-download",
           
service_account_name="airflow-mlops-sa",
           
env_vars={
                   
"KAGGLE_USERNAME":KAGGLE_USERNAME,
                   
"KAGGLE_KEY":KAGGLE_KEY,
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
           
}
       
)

       
# DAG Task 2: Run GKEJob for data preparation
       
data_preparation = KubernetesPodOperator(
           
task_id="data_pipeline_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
           
name="data-preparation",
           
service_account_name="airflow-mlops-sa",
           
env_vars={
                   
"GCP_PROJECT_ID":GCP_PROJECT_ID,
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                   
"DATASET_LIMIT": "1000",
                   
"HF_TOKEN":HF_TOKEN
           
}
       
)

       
# DAG Task 3: Run GKEJob for fine tuning
       
fine_tuning = KubernetesPodOperator(
           
task_id="fine_tuning_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
           
name="fine-tuning",
           
service_account_name="airflow-mlops-sa",
           
startup_timeout_seconds=600,
           
container_resources=models.V1ResourceRequirements(
                   
requests={"nvidia.com/gpu": "1"},
                   
limits={"nvidia.com/gpu": "1"}
           
),
           
env_vars={
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                   
"HF_TOKEN":HF_TOKEN
           
}
       
)

       
dataset_download >> data_preparation >> fine_tuning

18. DAG Task #4 - Create your final step on Airflow: Inference / Serving the model

vLLM is a powerful open-source library specifically designed for high-performance inference of LLMs. When deployed on Google Kubernetes Engine (GKE), it leverages the scalability and efficiency of Kubernetes to serve LLMs effectively.

Summary of steps:

  • Upload the DAG "mlops-dag.py" into the GCS bucket.
  • Copy two Kubernetes YAML config files to setup inference, into a GCS bucket.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

def model_serving():
   
config.load_incluster_config()
   
k8s_apps_v1 = client.AppsV1Api()
   
k8s_core_v1 = client.CoreV1Api()

   
while True:
       
try:
           
k8s_apps_v1.delete_namespaced_deployment(
                   
namespace="airflow",
                   
name="inference-deployment",
                   
body=client.V1DeleteOptions(
                   
propagation_policy="Foreground", grace_period_seconds=5
                   
)
           
)
       
except ApiException:
           
break
   
print("Deployment inference-deployment deleted")
   
   
with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
       
dep = yaml.safe_load(f)
       
resp = k8s_apps_v1.create_namespaced_deployment(
           
body=dep, namespace="airflow")
       
print(f"Deployment created. Status='{resp.metadata.name}'")
   
   
while True:
       
try:
           
k8s_core_v1.delete_namespaced_service(
                   
namespace="airflow",
                   
name="llm-service",
                   
body=client.V1DeleteOptions(
                   
propagation_policy="Foreground", grace_period_seconds=5
                   
)
           
)
       
except ApiException:
           
break
   
print("Service llm-service deleted")

   
with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
       
dep = yaml.safe_load(f)
       
resp = k8s_core_v1.create_namespaced_service(
           
body=dep, namespace="airflow")
       
print(f"Service created. Status='{resp.metadata.name}'")

with DAG(dag_id="mlops-dag",
           
start_date=datetime(2024,11,1),
           
schedule_interval="@daily",
           
catchup=False) as dag:

       
# DAG Step 1: Fetch raw data to GCS Bucket
       
dataset_download = KubernetesPodOperator(
           
task_id="dataset_download_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
           
name="dataset-download",
           
service_account_name="airflow-mlops-sa",
           
env_vars={
                   
"KAGGLE_USERNAME":KAGGLE_USERNAME,
                   
"KAGGLE_KEY":KAGGLE_KEY,
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
           
}
       
)

       
# DAG Step 2: Run GKEJob for data preparation
       
data_preparation = KubernetesPodOperator(
           
task_id="data_pipeline_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
           
name="data-preparation",
           
service_account_name="airflow-mlops-sa",
           
env_vars={
                   
"GCP_PROJECT_ID":GCP_PROJECT_ID,
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                   
"DATASET_LIMIT": "1000",
                   
"HF_TOKEN":HF_TOKEN
           
}
       
)

       
# DAG Step 3: Run GKEJob for fine tuning
       
fine_tuning = KubernetesPodOperator(
           
task_id="fine_tuning_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
           
name="fine-tuning",
           
service_account_name="airflow-mlops-sa",
           
startup_timeout_seconds=600,
           
container_resources=models.V1ResourceRequirements(
                   
requests={"nvidia.com/gpu": "1"},
                   
limits={"nvidia.com/gpu": "1"}
           
),
           
env_vars={
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                   
"HF_TOKEN":HF_TOKEN
           
}
       
)

       
# DAG Step 4: Run GKE Deployment for model serving
       
model_serving = PythonOperator(
           
task_id="model_serving",
           
python_callable=model_serving
       
)

       
dataset_download >> data_preparation >> fine_tuning >> model_serving

Upload your Python script (DAG file), as well as the Kubernetes manifests into the DAGS GCS bucket.

gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud
storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud
storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}

In your Airflow UI, you will see mlops-dag.

  1. Select un-pause.
  2. Select Trigger DAG to perform a manual MLOps cycle.

d537281b92d5e8bb.png

Once your DAG has completed, you'll see output like this in the Airflow UI.

3ed42abf8987384e.png

After the final step, you can grab the model endpoint and send a prompt to test the model.

Wait approximately 5 minutes before issuing the curl command, so that inference of the model can begin and the load balancer can assign an external IP Address.

export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')

curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
    "prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
    "temperature": 0.1,
    "top_p": 1.0,
    "max_tokens": 128
}
EOF

Output:

19. Congratulations!

You have created your first AI workflow using a DAG pipeline with Airflow 2 on GKE.

Don't forget to un-provision the resources that you've deployed.

20. Doing this in Production

Whilst the CodeLab has provided you with a fantastic insight to how you set up Airflow 2 on GKE, in the real world you'll want to consider some of the following topics, when doing this in production.

Implement a web frontend using Gradio or similar tooling.

Either configure automatic application monitoring for workloads with GKE here or export metrics from Airflow here.

You might require larger GPUs to fine tune the model faster, especially if you have larger datasets. However, if we want to train the model across multiple GPUs, we have to split the dataset and shard the training. Here is an explanation of FSDP with PyTorch (fully sharded data parallel, using GPU sharing to achieve that aim. Further reading can be found here in a blog post from Meta and another in this tutorial on FSDP using Pytorch.

Google Cloud Composer is a managed Airflow service, so you don't need to maintain Airflow itself, just deploy your DAG and away you go.

Learn more

License

This work is licensed under a Creative Commons Attribution 2.0 Generic License.