MLOps-Workflows mit Airflow 2 in GKE erstellen

1. Übersicht

852dc8844309ffb8.png

In diesem Codelab wird gezeigt, wie Sie DevOps-Praktiken in Machine Learning (MLOps) integrieren, indem Sie ein Dataset herunterladen, ein Modell optimieren und das LLM in Google Kubernetes Engine (GKE) bereitstellen. Dazu verwenden Sie einen Airflow-DAG mit der geringsten Abstraktion. Daher verwenden wir gcloud-Befehle und nicht Terraform, damit Sie dem Lab Schritt für Schritt folgen und jeden Prozess aus der Perspektive des Platform Engineer und des Machine Learning Engineer leicht nachvollziehen können.

In diesem praktischen Leitfaden erfahren Sie, wie Sie Airflow verwenden können, um Ihre KI-Workflows zu optimieren. Außerdem wird der gesamte MLOps-Lebenszyklus durch die Konfiguration eines DAGs klar und praktisch veranschaulicht.

Lerninhalte

  • Bessere Zusammenarbeit und mehr Verständnis zwischen Platform- und Machine Learning-Engineers durch den Abbau von Wissenssilos und die Verbesserung von Workflows
  • Airflow 2 in GKE bereitstellen, verwenden und verwalten
  • Airflow-DAG von Anfang bis Ende konfigurieren
  • Grundlage für produktionsreife Machine-Learning-Systeme mit GKE schaffen
  • Systeme für maschinelles Lernen instrumentieren und operationalisieren
  • Verstehen, wie Platform Engineering zu einer wichtigen Stütze für MLOps geworden ist

Was in diesem Codelab erreicht wird

  • Sie können einem LLM, das auf Grundlage von Gemma-2-9b-it feinabgestimmt und in GKE mit vLLM bereitgestellt wurde, Fragen zu Filmen stellen.

Zielgruppe

  • Machine Learning Engineers
  • Plattformentwickler
  • Data Scientists
  • Data Engineers
  • DevOps Engineers
  • Plattformarchitekt
  • Customer Engineers

Dieses Codelab ist nicht für

  • Als Einführung in GKE oder KI‑/ML-Workflows
  • Als Überblick über alle Airflow-Funktionen

2. Platform Engineering unterstützt Machine Learning Engineers/Scientists

16635a8284b994c.png

Platform Engineering und MLOps sind voneinander abhängige Disziplinen, die zusammenarbeiten, um eine robuste und effiziente Umgebung für die ML-Entwicklung und -Bereitstellung zu schaffen.

Umfang:Platform Engineering hat einen größeren Umfang als MLOps und umfasst den gesamten Softwareentwicklungszyklus sowie die Tools und die Infrastruktur dafür.

MLOps schließt die Lücke zwischen ML-Entwicklung, Bereitstellung und Inferenz.

Fachwissen:Platform Engineers verfügen in der Regel über fundiertes Fachwissen in Infrastrukturtechnologien wie Cloud Computing, Containerisierung und Datenverwaltung.

MLOps-Entwickler sind auf die Entwicklung, Bereitstellung und Überwachung von ML-Modellen spezialisiert und verfügen oft über Data-Science- und Softwareentwicklungskompetenzen.

Tools:Platform Engineers erstellen Tools für die Infrastrukturbereitstellung, die Konfigurationsverwaltung, die Containerorchestrierung und das Anwendungs-Scaffolding. MLOps-Entwickler verwenden Tools für das Trainieren, Testen, Bereitstellen, Überwachen und Versionieren von ML-Modellen.

3. Google Cloud-Einrichtung und -Anforderungen

Umgebung zum selbstbestimmten Lernen einrichten

  1. Melden Sie sich in der Google Cloud Console an und erstellen Sie ein neues Projekt oder verwenden Sie ein vorhandenes. Wenn Sie noch kein Gmail- oder Google Workspace-Konto haben, müssen Sie eines erstellen.

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • Der Projektname ist der Anzeigename für die Teilnehmer dieses Projekts. Es handelt sich um einen String, der nicht von Google APIs verwendet wird. Sie können sie jederzeit aktualisieren.
  • Die Projekt-ID ist für alle Google Cloud-Projekte eindeutig und unveränderlich (kann nach dem Festlegen nicht mehr geändert werden). In der Cloud Console wird automatisch ein eindeutiger String generiert. Normalerweise ist es nicht wichtig, wie dieser String aussieht. In den meisten Codelabs müssen Sie auf Ihre Projekt-ID verweisen (in der Regel als PROJECT_ID angegeben). Wenn Ihnen die generierte ID nicht gefällt, können Sie eine andere zufällige ID generieren. Alternativ können Sie es mit einem eigenen Namen versuchen und sehen, ob er verfügbar ist. Sie kann nach diesem Schritt nicht mehr geändert werden und bleibt für die Dauer des Projekts bestehen.
  • Zur Information: Es gibt einen dritten Wert, die Projektnummer, die von einigen APIs verwendet wird. Weitere Informationen zu diesen drei Werten
  1. Als Nächstes müssen Sie die Abrechnung in der Cloud Console aktivieren, um Cloud-Ressourcen/-APIs zu verwenden. Die Durchführung dieses Codelabs kostet wenig oder gar nichts. Wenn Sie Ressourcen herunterfahren möchten, um Kosten zu vermeiden, die über diese Anleitung hinausgehen, können Sie die erstellten Ressourcen oder das Projekt löschen. Neue Google Cloud-Nutzer können am kostenlosen Testzeitraum mit einem Guthaben von 300$ teilnehmen.

Cloud Shell starten

Während Sie Google Cloud von Ihrem Laptop aus per Fernzugriff nutzen können, wird in diesem Codelab Cloud Shell verwendet, eine Befehlszeilenumgebung, die in der Cloud ausgeführt wird.

Cloud Shell aktivieren

  1. Klicken Sie in der Cloud Console auf Cloud Shell aktivieren 853e55310c205094.png.

3c1dabeca90e44e5.png

Wenn Sie die Cloud Shell zum ersten Mal starten, wird ein Fenster mit einer Beschreibung eingeblendet. Klicken Sie in diesem Fall einfach auf Weiter.

9c92662c6a846a5c.png

Das Herstellen der Verbindung mit der Cloud Shell sollte nur wenige Augenblicke dauern.

9f0e51b578fecce5.png

Auf dieser virtuellen Maschine sind alle erforderlichen Entwicklungstools installiert. Sie bietet ein Basisverzeichnis mit 5 GB nichtflüchtigem Speicher und läuft in Google Cloud, was die Netzwerkleistung und Authentifizierung erheblich verbessert. Die meisten, wenn nicht sogar alle Aufgaben in diesem Codelab können mit einem Browser erledigt werden.

Sobald die Verbindung mit der Cloud Shell hergestellt ist, sehen Sie, dass Sie authentifiziert sind und für das Projekt Ihre Projekt-ID eingestellt ist.

  1. Führen Sie in der Cloud Shell den folgenden Befehl aus, um zu prüfen, ob Sie authentifiziert sind:
gcloud auth list

Befehlsausgabe

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. Führen Sie den folgenden Befehl in Cloud Shell aus, um zu bestätigen, dass der gcloud-Befehl Ihr Projekt kennt:
gcloud config list project

Befehlsausgabe

[core]
project = <PROJECT_ID>

Ist dies nicht der Fall, können Sie die Einstellung mit diesem Befehl vornehmen:

gcloud config set project <PROJECT_ID>

Befehlsausgabe

Updated property [core/project].

4. Schritt 1: Auf Kaggle registrieren und authentifizieren

Für dieses Codelab müssen Sie ein Konto bei Kaggle erstellen. Kaggle ist eine Online-Community-Plattform für Data Scientists und Enthusiasten im Bereich des maschinellen Lernens, die von Google betrieben wird und ein umfangreiches Repository mit öffentlich verfügbaren Datasets für verschiedene Bereiche bietet. Von dieser Website laden Sie das RottenTomatoes-Dataset herunter, das zum Trainieren Ihres Modells verwendet wird.

  • Melden Sie sich bei Kaggle an. Sie können sich mit Google SSO anmelden.
  • Nutzungsbedingungen akzeptieren
  • Rufen Sie die Einstellungen auf und suchen Sie nach Ihrem Nutzernamen username.
  • Wählen Sie im Abschnitt „API“ die Option „Neues Token erstellen aus“ Kaggle aus. Dadurch wird die Datei „kaggle.json“ heruntergeladen.
  • Wenn Sie Probleme haben, rufen Sie diese Supportseite auf.

5. Schritt 2: Bei HuggingFace registrieren und authentifizieren

HuggingFace ist ein zentraler Ort für alle, die sich mit Technologien für maschinelles Lernen beschäftigen möchten. Die Plattform hostet 900.000 Modelle, 200.000 Datasets und 300.000 Demo-Apps (Spaces), die alle Open Source und öffentlich verfügbar sind.

6. Schritt 3: Erstellen Sie die erforderlichen Google Cloud-Infrastrukturressourcen.

Sie richten GKE, GCE und Artifact Registry ein und weisen IAM-Rollen mithilfe der Identitätsföderation von Arbeitslasten zu.

Ihr KI-Workflow verwendet zwei Knotenpools, einen für das Training und einen für die Inferenz. Der Trainingsknotenpool verwendet eine GCE-VM vom Typ „g2-standard-8“ mit einer Nvidia L4 Tensor Core-GPU. Der Inferenzknotenpool verwendet eine g2-standard-24-VM mit zwei Nvidia L4 Tensor Core-GPUs. Wählen Sie beim Angeben der Region eine aus, in der die erforderliche GPU unterstützt wird ( Link).

Führen Sie in Cloud Shell die folgenden Befehle aus:

# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")

SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1

# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto

# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount

# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com

# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
  --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-12 \
  --num-nodes=1

# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
  --accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-24 \
  --num-nodes=1

# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a

# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}

YAML-Manifeste erstellen

mkdir manifests
cd manifests

mlops-sa.yaml

apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
  name: airflow-mlops-sa
  namespace: airflow
  labels:
    tier: airflow

pv-dags.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-dags
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 5Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_DAGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pv-logs.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-logs
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 100Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_LOGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pvc-dags.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-dags
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  volumeName: airflow-dags
  storageClassName: standard

pvc-logs.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  volumeName: airflow-logs
  storageClassName: standard

namespace.yaml

kind: Namespace
apiVersion: v1
metadata:
  name: airflow
  labels:
    name: airflow

sa-role.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-deployment-role
rules:
- apiGroups: ["apps"] 
  resources: ["deployments"]
  verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]

sa-rolebinding.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow-deployment-rolebinding
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: airflow-worker
  namespace: airflow
roleRef:
  kind: Role
  name: airflow-deployment-role
  apiGroup: rbac.authorization.k8s.io

inference.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-deployment
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gemma-server
  template:
    metadata:
      labels:
        app: gemma-server
        ai.gke.io/model: gemma-2-9b-it
        ai.gke.io/inference-server: vllm
      annotations:
        gke-gcsfuse/volumes: "true"
    spec:
      serviceAccountName: airflow-mlops-sa
      tolerations:
      - key: "nvidia.com/gpu"
        operator: "Exists"
        effect: "NoSchedule"
      - key: "on-demand"
        value: "true"
        operator: "Equal"
        effect: "NoSchedule"
      containers:
      - name: inference-server
        image: vllm/vllm-openai:v0.6.6
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: "2"
          limits:
            nvidia.com/gpu: "2"
        command: ["/bin/sh", "-c"]
        args:
        - |
          python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
        volumeMounts:
        - mountPath: /dev/shm
          name: dshm
        - name: gcs-fuse-csi-ephemeral
          mountPath: /modeldata
          readOnly: true
      volumes:
      - name: dshm
        emptyDir:
          medium: Memory
      - name: gcs-fuse-csi-ephemeral
        csi:
          driver: gcsfuse.csi.storage.gke.io
          volumeAttributes:
            bucketName: BUCKET_DATA_NAME
            mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
            fileCacheCapacity: "20Gi"
            fileCacheForRangeRead: "true"
            metadataStatCacheCapacity: "-1"
            metadataTypeCacheCapacity: "-1"
            metadataCacheTTLSeconds: "-1"
      nodeSelector:
        cloud.google.com/gke-accelerator: nvidia-l4

inference-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: llm-service
  namespace: airflow
spec:
  selector:
    app: gemma-server
  type: LoadBalancer
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

Drei Google Cloud Storage-Buckets (GCS) erstellen

gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}

# Create the namespace in GKE
kubectl apply -f namespace.yaml

# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml

Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation

gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"

7. Schritt 4: Airflow in GKE über das Helm-Chart installieren

Jetzt stellen wir Airflow 2 mit Helm bereit. Apache Airflow ist eine Open-Source-Workflow-Managementplattform für Data-Engineering-Pipelines. Wir werden später noch auf die Funktionen von Airflow 2 eingehen.

values.yaml für das Airflow-Helm-Diagramm

config:
  webserver:
    expose_config: true
webserver:
  service:
    type: LoadBalancer
  podAnnotations:
    gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
  - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
    value: "30"
logs:
  persistence:
    enabled: true
    existingClaim: "airflow-logs"
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
scheduler:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
triggerer:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
workers:
  podAnnotations:
    gke-gcsfuse/volumes: "true"

Airflow 2 bereitstellen

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml

8. Schritt 5: Airflow mit Verbindungen und Variablen initialisieren

Sobald Airflow 2 bereitgestellt wurde, können wir es konfigurieren. Wir definieren einige Variablen, die von unseren Python-Skripts gelesen werden.

  1. Rufen Sie die Airflow-UI über Port 8080 in Ihrem Browser auf.

Externe IP-Adresse abrufen

kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'

Öffnen Sie einen Webbrowser und rufen Sie http://<EXTERNAL-IP>:8080 auf . Die Anmeldung erfolgt mit „admin“ / „admin“.

  1. Erstellen Sie eine Standard-GCP-Verbindung in der Airflow-Benutzeroberfläche. Rufen Sie dazu „Admin“ → „Connections“ → „+ Add a new record“ auf.
  • Verbindungs-ID: google_cloud_default
  • Verbindungstyp: Google Cloud

Klicken Sie auf „Speichern“.

  1. Erstellen Sie die erforderlichen Variablen. Gehen Sie dazu zu „Verwaltung“ → „Variablen“ → „+ Neuen Eintrag hinzufügen“.
  • Schlüssel: BUCKET_DATA_NAME – Wert: Kopieren Sie den Wert aus „echo $BUCKET_DATA_NAME“.
  • Schlüssel: GCP_PROJECT_ID – Wert: Kopieren Sie den Wert aus „echo $DEVSHELL_PROJECT_ID“.
  • Schlüssel: HF_TOKEN – Wert: HF-Token einfügen
  • Schlüssel: KAGGLE_USERNAME – Wert: Geben Sie Ihren Kaggle-Nutzernamen ein.
  • Schlüssel: KAGGLE_KEY – Wert: Aus „kaggle.json“ kopieren

Klicken Sie nach jedem Schlüssel/Wert-Paar auf „Speichern“.

Die Benutzeroberfläche sollte so aussehen:

771121470131b5ec.png

9. Anwendungscode-Container 1 – Daten herunterladen

In diesem Python-Skript authentifizieren wir uns bei Kaggle, um das Dataset in unseren GCS-Bucket herunterzuladen.

Das Script selbst ist in einem Container untergebracht, da es die erste DAG-Einheit ist und wir davon ausgehen, dass das Dataset häufig aktualisiert wird. Daher möchten wir diesen Vorgang automatisieren.

Verzeichnis erstellen und unsere Skripts hierher kopieren

cd .. ; mkdir 1-dataset-download
cd 1-dataset-download

dataset-download.py

import os
import kagglehub
from google.cloud import storage

KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")

print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"

upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]

requirements.txt

google-cloud-storage==2.19.0
kagglehub==0.3.4

Jetzt erstellen wir ein Container-Image für den Dataset-Download und übertragen es per Push an Artifact Registry.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest

10. Anwendungscode-Container 2 – Datenvorbereitung

Das erreichen wir im Rahmen der Datenvorbereitung:

  1. Geben Sie an, wie viel des Datasets zum Feinabstimmen des Basismodells verwendet werden soll.
  2. Lädt das Dataset, d. h., die CSV-Datei wird in einen Pandas-DataFrame eingelesen, der eine zweidimensionale Datenstruktur für Zeilen und Spalten ist.
  3. Datentransformation / Vorverarbeitung: Legen Sie fest, welche Teile des Datasets irrelevant sind, indem Sie angeben, was beibehalten werden soll. Dadurch werden die restlichen Teile entfernt.
  4. Wendet die Funktion transform auf jede Zeile des DataFrames an.
  5. Vorbereitete Daten wieder im GCS-Bucket speichern

Verzeichnis erstellen und unsere Skripts hierher kopieren

cd .. ; mkdir 2-data-preparation
cd 2-data-preparation

data-preparation.py

import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset

# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100"))  # Process a limited number of rows, used 100 during testing phase but can be increased

DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"

# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")

def transform(data):
    """
    Transforms a row of the DataFrame into the desired format for fine-tuning.

    Args:
      data: A pandas Series representing a row of the DataFrame.

    Returns:
      A dictionary containing the formatted text.
    """ 
    question = f"Review analysis for movie '{data['id']}'"
    context = data['reviewText']
    answer = data['scoreSentiment']
    template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
    return {'text': template.format(question=question, context=context, answer=answer)}

try:
    df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
    print(f"Dataset loaded successfully.")

    # Drop rows with NaN values in relevant columns
    df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])

    # Apply transformation to the DataFrame
    transformed_data = df.apply(transform, axis=1).tolist()

    # Convert transformed data to a DataFrame and then to a Hugging Face Dataset
    transformed_df = pd.DataFrame(transformed_data)
    dataset = Dataset.from_pandas(transformed_df)

    # Save the prepared dataset to JSON lines format
    with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
        for item in dataset:
            f.write(json.dumps(item) + "\n")

    print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
    
except Exception as e:
    print(f"Error during data loading or preprocessing: {e}")
    import traceback
    print(traceback.format_exc())

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]

requirements.txt

datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3

# Now we create a container images for data-preparation and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest

11. Anwendungscode-Container 3 – Feinabstimmung

Hier verwenden wir Gemma-2-9b-it als Basismodell und optimieren es dann mit unserem neuen Dataset.

Das sind die Schritte, die während des Feinabstimmungsschritts ausgeführt werden.

1. Einrichtung:Importieren Sie Bibliotheken, definieren Sie Parameter (für Modell, Daten und Training) und laden Sie das Dataset aus Google Cloud Storage.

2. Modell laden:Laden Sie ein vortrainiertes Sprachmodell mit Quantisierung, um die Effizienz zu steigern, und laden Sie den entsprechenden Tokenizer.

3. LoRA konfigurieren:Richten Sie Low-Rank Adaptation (LoRA) ein, um das Modell effizient abzustimmen, indem Sie kleine trainierbare Matrizen hinzufügen.

4. Trainieren:Definieren Sie die Trainingsparameter und verwenden Sie SFTTrainer, um das Modell mit dem geladenen Dataset mit dem Quantisierungstyp FP16 abzustimmen.

5. Speichern und hochladen:Speichern Sie das feinabgestimmte Modell und den Tokenizer lokal und laden Sie sie dann in unseren GCS-Bucket hoch.

Anschließend erstellen wir mit Cloud Build ein Container-Image und speichern es in Artifact Registry.

Verzeichnis erstellen und unsere Skripts hierher kopieren

cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning

finetuning.py

import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage

# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")

# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
    "json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)

################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))

################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50

################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False

# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
    attn_implementation="eager",
    pretrained_model_name_or_path=model_name,
    torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")

# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
    lora_alpha=lora_alpha,
    lora_dropout=lora_dropout,
    r=lora_r,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
)

# Set training parameters
training_arguments = SFTConfig(
        bf16=bf16,
        dataset_kwargs={
            "add_special_tokens": False,  
            "append_concat_token": False, 
        },
        dataset_text_field="text",
        disable_tqdm=True,
        fp16=fp16,
        gradient_accumulation_steps=gradient_accumulation_steps,
        gradient_checkpointing=gradient_checkpointing,
        gradient_checkpointing_kwargs={"use_reentrant": False},
        group_by_length=group_by_length,
        log_on_each_node=False,
        logging_steps=logging_steps,
        learning_rate=learning_rate,
        lr_scheduler_type=lr_scheduler_type,
        max_grad_norm=max_grad_norm,
        max_seq_length=max_seq_length,
        max_steps=max_steps,
        num_train_epochs=num_train_epochs,
        optim=optim,
        output_dir=save_model_path,
        packing=packing,
        per_device_train_batch_size=per_device_train_batch_size,
        save_strategy=save_strategy,
        save_steps=save_steps,
        save_total_limit=save_total_limit,
        warmup_ratio=warmup_ratio,
        weight_decay=weight_decay,
    )

print(f"Configuring fine tuning completed")

# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset,
    peft_config=peft_config,
    dataset_text_field="text",
    max_seq_length=max_seq_length,
    tokenizer=tokenizer,
    args=training_arguments,
    packing=packing,
)

print(f"Creating trainer completed")

# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")

# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
    low_cpu_mem_usage=True,
    pretrained_model_name_or_path=model_name,
    return_dict=True,
    torch_dtype=torch.float16,
)

model = PeftModel.from_pretrained(
    model=base_model,
    model_id=new_model,
)
model = model.merge_and_unload()

print(f"Merging the new model with base model completed")

accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")

print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
    is_main_process=accelerator.is_main_process,
    save_directory=save_model_path,
    save_function=accelerator.save,
)
print(f"Save unwrapped model completed")

print(f"Save new tokenizer started")
if accelerator.is_main_process:
    tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")

# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
    """Uploads a directory to GCS."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    for root, _, files in os.walk(model_dir):
        for file in files:
            local_file_path = os.path.join(root, file)
            gcs_file_path = os.path.relpath(local_file_path, model_dir)
            blob = bucket.blob(os.path.join(new_model, gcs_file_path))  # Use new_model_name
            blob.upload_from_filename(local_file_path)

# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")

Dockerfile

# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04

# Install necessary system packages
RUN apt-get update && \
    apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements.txt into the container
COPY requirements.txt .

# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Copy your finetune script into the container
COPY finetuning.py .

# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
 
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]

requirements.txt

accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4

Jetzt erstellen wir ein Container-Image für das Feinabstimmen und übertragen es per Push an Artifact Registry.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest

12. Airflow 2 – Übersicht (einschließlich Definition von DAG)

Airflow ist eine Plattform zum Orchestrieren von Workflows und Datenpipelines. Dazu werden DAGs (Directed Acyclic Graphs, gerichtete azyklische Graphen) verwendet, um diese Workflows in Python-Code zu definieren und Aufgaben und ihre Abhängigkeiten visuell darzustellen.

Airflow mit seinen statischen DAGs und Python-basierten Definitionen eignet sich gut für die Planung und Verwaltung vordefinierter Workflows. Die Architektur umfasst eine benutzerfreundliche Benutzeroberfläche zum Überwachen und Verwalten dieser Workflows.

Mit Airflow können Sie Ihre Datenpipelines mit Python definieren, planen und überwachen. Das macht Airflow zu einem flexiblen und leistungsstarken Tool für die Workflow-Orchestrierung.

13. Übersicht über unsere DAG

ec49964ad7d61491.png

DAG steht für „Directed Acyclic Graph“ (gerichteter azyklischer Graph). In Airflow stellt ein DAG selbst den gesamten Workflow oder die gesamte Pipeline dar. Sie definiert die Aufgaben, ihre Abhängigkeiten und die Ausführungsreihenfolge.

Die Einheiten des Workflows innerhalb des DAG werden von einem Pod im GKE-Cluster ausgeführt, der über die Airflow-Konfiguration initiiert wird.

Zusammenfassung:

Airflow: Data download (Airflow: Daten herunterladen): Dieses Skript automatisiert den Prozess, ein Filmrezensions-Dataset von Kaggle abzurufen und in Ihrem GCS-Bucket zu speichern. So ist es für die weitere Verarbeitung oder Analyse in Ihrer Cloud-Umgebung verfügbar.

Airflow: Data Preparation (Airflow: Datenvorbereitung): Der Code verwendet den Rohdatensatz für Filmrezensionen, entfernt überflüssige Datenspalten, die für unseren Anwendungsfall nicht erforderlich sind, und löscht Datensätze mit fehlenden Werten. Anschließend werden die Daten in ein Frage-Antwort-Format strukturiert, das für maschinelles Lernen geeignet ist, und zur späteren Verwendung wieder in GCS gespeichert.

Airflow: Model Finetuning: Mit diesem Code wird ein großes Sprachmodell (LLM) mit einer Technik namens LoRA (Low-Rank Adaptation) optimiert und das aktualisierte Modell gespeichert. Zuerst werden ein vortrainiertes LLM und ein Dataset aus Google Cloud Storage geladen. Anschließend wird LoRA angewendet, um das Modell effizient für dieses Dataset abzustimmen. Schließlich wird das feinabgestimmte Modell in Google Cloud Storage gespeichert, damit es später in Anwendungen wie der Textgenerierung oder der Beantwortung von Fragen verwendet werden kann.

Airflow: Model Serving: Das optimierte Modell wird mit vllm für die Inferenz in GKE bereitgestellt.

Airflow: Feedbackschleife – Das Modell wird alle xx Stunden, Tage oder Wochen neu trainiert.

In diesem Diagramm wird die Funktionsweise von Airflow 2 bei der Ausführung in GKE erläutert.

8691f41166209a5d.png

14. Modell abstimmen im Vergleich zur Verwendung von RAG

In diesem CodeLab wird ein LLM feinabgestimmt, anstatt Retrieval Augmented Generation (RAG) zu verwenden.

Vergleichen wir diese beiden Ansätze:

Abstimmung:Erstellt ein spezialisiertes Modell: Bei der Abstimmung wird das LLM an eine bestimmte Aufgabe oder ein bestimmtes Dataset angepasst, sodass es unabhängig ohne externe Datenquellen arbeiten kann.

Vereinfacht die Inferenz: Ein separates Abrufsystem und eine separate Datenbank sind nicht mehr erforderlich. Das führt zu schnelleren und kostengünstigeren Antworten, insbesondere bei häufigen Anwendungsfällen.

RAG:Stützt sich auf externes Wissen: RAG ruft für jede Anfrage relevante Informationen aus einer Wissensdatenbank ab und sorgt so für den Zugriff auf aktuelle und spezifische Daten.

Erhöhte Komplexität: Die Implementierung von RAG in einer Produktionsumgebung wie einem Kubernetes-Cluster umfasst oft mehrere Mikrodienste für die Datenverarbeitung und den Abruf, was die Latenz und die Rechenkosten erhöhen kann.

Warum wurde das Finetuning ausgewählt?

RAG wäre für das kleine Dataset, das in diesem Codelab verwendet wird, geeignet. Wir haben uns jedoch für das Finetuning entschieden, um einen typischen Anwendungsfall für Airflow zu demonstrieren. So können wir uns auf die Aspekte der Workflow-Orchestrierung konzentrieren, anstatt uns mit den Feinheiten der Einrichtung zusätzlicher Infrastruktur und Microservices für RAG zu befassen.

Fazit:

Sowohl das Finetuning als auch RAG sind wertvolle Techniken mit eigenen Stärken und Schwächen. Die optimale Wahl hängt von den spezifischen Anforderungen Ihres Projekts ab, z. B. von der Größe und Komplexität Ihrer Daten, den Leistungsanforderungen und den Kosten.

15. DAG-Aufgabe 1: Ersten Schritt in Airflow erstellen – Daten herunterladen

In dieser DAG-Einheit wird mit unserem in einem Container-Image gehosteten Python-Code das aktuelle RottenTomatoes-Dataset von Kaggle heruntergeladen.

Kopieren Sie diesen Code nicht in den GCS-Bucket. Im letzten Schritt kopieren wir mlops-dag.py, das alle DAG-Einheitsschritte in einem Python-Skript enthält.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        dataset_download

16. DAG-Aufgabe 2: Zweiten Schritt in Airflow erstellen: Datenvorbereitung

Als Übersicht über diese DAG-Einheit laden wir eine CSV-Datei (rotten_tomatoes_movie_reviews.csv) aus GCS in einen Pandas-DataFrame.

Als Nächstes begrenzen wir die Anzahl der verarbeiteten Zeilen mit DATASET_LIMIT, um Ressourcen zu sparen und Tests zu ermöglichen. Schließlich konvertieren wir die transformierten Daten in ein Hugging Face-Dataset.

Wenn Sie genau hinsehen, werden Sie feststellen, dass wir 1.000 Zeilen im Modell mit „DATASET_LIMIT“: „1000“ trainieren. Das liegt daran, dass dies auf einer Nvidia L4-GPU 20 Minuten dauert.

Kopieren Sie diesen Code nicht in den GCS-Bucket. Im letzten Schritt kopieren wir mlops-dag.py, das alle Schritte in einem Python-Skript enthält.

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation

17. DAG-Aufgabe 3: Dritten Schritt in Airflow erstellen: Modell-Finetuning

Als Übersicht über diese DAG-Einheit führen wir hier „finetune.py“ aus, um das Gemma-Modell mit unserem neuen Dataset zu optimieren.

Kopieren Sie diesen Code nicht in den GCS-Bucket. Im letzten Schritt kopieren wir mlops-dag.py, das alle Schritte in einem Python-Skript enthält.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Task 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Task 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Task 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation >> fine_tuning

18. DAG-Aufgabe 4: Letzten Schritt in Airflow erstellen: Inferenz / Bereitstellung des Modells

vLLM ist eine leistungsstarke Open-Source-Bibliothek, die speziell für die leistungsstarke Inferenz von LLMs entwickelt wurde. Bei der Bereitstellung in Google Kubernetes Engine (GKE) wird die Skalierbarkeit und Effizienz von Kubernetes genutzt, um LLMs effektiv bereitzustellen.

Zusammenfassung der Schritte:

  • Laden Sie die DAG „mlops-dag.py“ in den GCS-Bucket hoch.
  • Kopieren Sie zwei Kubernetes-YAML-Konfigurationsdateien zum Einrichten der Inferenz in einen GCS-Bucket.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

def model_serving():
    config.load_incluster_config()
    k8s_apps_v1 = client.AppsV1Api()
    k8s_core_v1 = client.CoreV1Api()

    while True:
        try:
            k8s_apps_v1.delete_namespaced_deployment(
                    namespace="airflow",
                    name="inference-deployment",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Deployment inference-deployment deleted")
    
    with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_apps_v1.create_namespaced_deployment(
            body=dep, namespace="airflow")
        print(f"Deployment created. Status='{resp.metadata.name}'")
    
    while True:
        try:
            k8s_core_v1.delete_namespaced_service(
                    namespace="airflow",
                    name="llm-service",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Service llm-service deleted")

    with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_core_v1.create_namespaced_service(
            body=dep, namespace="airflow")
        print(f"Service created. Status='{resp.metadata.name}'")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 4: Run GKE Deployment for model serving
        model_serving = PythonOperator(
            task_id="model_serving",
            python_callable=model_serving
        )

        dataset_download >> data_preparation >> fine_tuning >> model_serving

Laden Sie Ihr Python-Skript (DAG-Datei) sowie die Kubernetes-Manifeste in den DAGS-GCS-Bucket hoch.

gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}

In der Airflow-UI wird „mlops-dag“ angezeigt.

  1. Wählen Sie „Fortsetzen“ aus.
  2. Wählen Sie „DAG auslösen“ aus, um einen manuellen MLOps-Zyklus auszuführen.

d537281b92d5e8bb.png

Wenn Ihr DAG abgeschlossen ist, sehen Sie in der Airflow-Benutzeroberfläche eine Ausgabe wie diese.

3ed42abf8987384e.png

Nach dem letzten Schritt können Sie den Modellendpunkt abrufen und einen Prompt senden, um das Modell zu testen.

Warten Sie etwa fünf Minuten, bevor Sie den curl-Befehl ausführen, damit die Inferenz des Modells beginnen und der Load Balancer eine externe IP-Adresse zuweisen kann.

export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')

curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
    "prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
    "temperature": 0.1,
    "top_p": 1.0,
    "max_tokens": 128
}
EOF

Ausgabe:

19. Glückwunsch!

Sie haben Ihren ersten KI-Workflow mit einer DAG-Pipeline mit Airflow 2 in GKE erstellt.

Denken Sie daran, die von Ihnen bereitgestellten Ressourcen aufzuheben.

20. Vorgehensweise in der Produktionsumgebung

In diesem Codelab haben Sie einen guten Einblick in die Einrichtung von Airflow 2 auf GKE erhalten. In der Praxis sollten Sie jedoch einige der folgenden Themen berücksichtigen, wenn Sie dies in der Produktion tun.

Implementieren Sie ein Web-Frontend mit Gradio oder ähnlichen Tools.

Konfigurieren Sie entweder die automatische Anwendungsüberwachung für Arbeitslasten mit GKE hier oder exportieren Sie Messwerte aus Airflow hier.

Möglicherweise benötigen Sie größere GPUs, um das Modell schneller abzustimmen, insbesondere wenn Sie größere Datasets haben. Wenn wir das Modell jedoch auf mehreren GPUs trainieren möchten, müssen wir das Dataset aufteilen und das Training aufteilen. Hier finden Sie eine Erklärung zu FSDP mit PyTorch (Fully Sharded Data Parallel, bei dem GPU-Sharing verwendet wird, um dieses Ziel zu erreichen). Weitere Informationen finden Sie in einem Blogpost von Meta und in dieser Anleitung zu FSDP mit PyTorch.

Google Cloud Composer ist ein verwalteter Airflow-Dienst. Sie müssen Airflow also nicht selbst verwalten, sondern nur Ihren DAG bereitstellen.

Weitere Informationen

Lizenz

Dieser Text ist mit einer Creative Commons Attribution 2.0 Generic License lizenziert.