1. Przegląd

W tym module CodeLab dowiesz się, jak zintegrować praktyki DevOps z uczeniem maszynowym (MLOps), pobierając zbiór danych, ulepszając model i wdrażając LLM w Google Kubernetes Engine (GKE) za pomocą DAG Airflow z minimalną abstrakcją. Dlatego używamy poleceń gcloud, a nie Terraform, aby umożliwić Ci przejście przez ćwiczenie krok po kroku i łatwe zrozumienie każdego procesu z perspektywy inżyniera platformy i inżyniera uczenia maszynowego.
Ten praktyczny przewodnik pokaże Ci, jak wykorzystać Airflow do usprawnienia przepływów pracy związanych ze sztuczną inteligencją. Zawiera on jasną i praktyczną demonstrację całego cyklu MLOps poprzez skonfigurowanie DAG-u.
Czego się nauczysz
- Wspieranie współpracy i lepszego zrozumienia między inżynierami platformy i uczenia maszynowego przez eliminowanie silosów wiedzy i usprawnianie przepływów pracy
- Informacje o wdrażaniu, używaniu i zarządzaniu Airflow 2 w GKE
- Konfigurowanie DAG Airflow od początku do końca
- Tworzenie podstaw produkcyjnych systemów uczenia maszynowego za pomocą GKE
- Instrumentowanie i wdrażanie systemów uczących się
- Dowiedz się, dlaczego inżynieria platformy stała się kluczowym filarem wsparcia dla MLOps.
Co osiągniesz dzięki tym ćwiczeniom
- Możesz zadawać pytania o filmy modelowi LLM, który został przez nas dostrojony na podstawie modelu Gemma-2-9b-it i jest obsługiwany w GKE z vLLM.
Odbiorcy docelowi
- Inżynierowie ds. uczenia maszynowego
- Inżynierowie platformy
- Badacze danych
- Inżynierowie danych
- Inżynierowie DevOps
- Platform Architect
- Inżynierowie ds. obsługi klienta
To ćwiczenie nie jest przeznaczone
- Wprowadzenie do GKE lub przepływów pracy AI/ML
- jako omówienie wszystkich funkcji Airflow,
2. Inżynieria platformy pomaga inżynierom i naukowcom zajmującym się uczeniem maszynowym

Inżynieria platformy i MLOps to wzajemnie zależne dziedziny, które współpracują ze sobą, aby stworzyć solidne i wydajne środowisko do programowania i wdrażania modeli ML.
Zakres: inżynieria platform ma szerszy zakres niż MLOps, obejmując cały cykl tworzenia oprogramowania i zapewniając narzędzia oraz infrastrukturę do tego procesu.
MLOps wypełnia lukę między tworzeniem, wdrażaniem i wnioskowaniem modeli ML.
Wiedza specjalistyczna: inżynierowie platform zwykle mają dużą wiedzę specjalistyczną w zakresie technologii infrastrukturalnych, takich jak przetwarzanie w chmurze, konteneryzacja i zarządzanie danymi.
Inżynierowie MLOps specjalizują się w opracowywaniu, wdrażaniu i monitorowaniu modeli ML, często mając umiejętności z zakresu badania danych i inżynierii oprogramowania.
Narzędzia: inżynierowie platformy tworzą narzędzia do udostępniania infrastruktury, zarządzania konfiguracją, orkiestracji kontenerów i szkieletów aplikacji. Inżynierowie MLOps korzystają z narzędzi do trenowania, eksperymentowania, wdrażania, monitorowania i obsługi wersji modeli ML.
3. Konfiguracja i wymagania Google Cloud
Samodzielne konfigurowanie środowiska
- Zaloguj się w konsoli Google Cloud i utwórz nowy projekt lub użyj istniejącego. Jeśli nie masz jeszcze konta Gmail ani Google Workspace, musisz je utworzyć.



- Nazwa projektu to wyświetlana nazwa uczestników tego projektu. Jest to ciąg znaków, który nie jest używany przez interfejsy API Google. Zawsze możesz ją zaktualizować.
- Identyfikator projektu jest unikalny we wszystkich projektach Google Cloud i nie można go zmienić po ustawieniu. Konsola Cloud automatycznie generuje unikalny ciąg znaków. Zwykle nie musisz się tym przejmować. W większości ćwiczeń z programowania musisz odwoływać się do identyfikatora projektu (zwykle oznaczanego jako
PROJECT_ID). Jeśli wygenerowany identyfikator Ci się nie podoba, możesz wygenerować inny losowy identyfikator. Możesz też spróbować własnej nazwy i sprawdzić, czy jest dostępna. Po tym kroku nie można go zmienić i pozostaje on taki przez cały czas trwania projektu. - Warto wiedzieć, że istnieje też trzecia wartość, numer projektu, której używają niektóre interfejsy API. Więcej informacji o tych 3 wartościach znajdziesz w dokumentacji.
- Następnie musisz włączyć płatności w konsoli Cloud, aby korzystać z zasobów i interfejsów API Google Cloud. Wykonanie tego laboratorium nie będzie kosztować dużo, a może nawet nic. Aby wyłączyć zasoby i uniknąć naliczania opłat po zakończeniu tego samouczka, możesz usunąć utworzone zasoby lub projekt. Nowi użytkownicy Google Cloud mogą skorzystać z bezpłatnego okresu próbnego, w którym mają do dyspozycji środki w wysokości 300 USD.
Uruchamianie Cloud Shell
Z Google Cloud można korzystać zdalnie na laptopie, ale w tym module użyjemy Cloud Shell, czyli środowiska wiersza poleceń działającego w chmurze.
Aktywowanie Cloud Shell
- W konsoli Cloud kliknij Aktywuj Cloud Shell
.

Jeśli uruchamiasz Cloud Shell po raz pierwszy, zobaczysz ekran pośredni z opisem tego środowiska. Jeśli pojawił się ekran pośredni, kliknij Dalej.

Uzyskanie dostępu do środowiska Cloud Shell i połączenie się z nim powinno zająć tylko kilka chwil.

Ta maszyna wirtualna zawiera wszystkie potrzebne narzędzia dla programistów. Zawiera również stały katalog domowy o pojemności 5 GB i działa w Google Cloud, co znacznie zwiększa wydajność sieci i usprawnia proces uwierzytelniania. Większość zadań w tym module, a być może wszystkie, możesz wykonać w przeglądarce.
Po połączeniu z Cloud Shell zobaczysz, że uwierzytelnianie zostało już przeprowadzone, a projekt jest już ustawiony na Twój identyfikator projektu.
- Aby potwierdzić, że uwierzytelnianie zostało przeprowadzone, uruchom w Cloud Shell to polecenie:
gcloud auth list
Wynik polecenia
Credentialed Accounts
ACTIVE ACCOUNT
* <my_account>@<my_domain.com>
To set the active account, run:
$ gcloud config set account `ACCOUNT`
- Aby potwierdzić, że polecenie gcloud zna Twój projekt, uruchom w Cloud Shell to polecenie:
gcloud config list project
Wynik polecenia
[core] project = <PROJECT_ID>
Jeśli nie, możesz go ustawić za pomocą tego polecenia:
gcloud config set project <PROJECT_ID>
Wynik polecenia
Updated property [core/project].
4. Krok 1. Zarejestruj się i uwierzytelnij na platformie Kaggle
Aby rozpocząć CodeLab, musisz utworzyć konto na platformie Kaggle. Jest to internetowa platforma społecznościowa dla specjalistów ds. danych i entuzjastów uczenia maszynowego, która należy do Google i zawiera ogromne repozytorium publicznie dostępnych zbiorów danych z różnych dziedzin. Z tej witryny pobierzesz zbiór danych RottenTomatoes, który posłuży do trenowania modelu.
- Zarejestruj się w Kaggle. Możesz użyć logowania jednokrotnego Google.
- Zaakceptuj warunki usługi
- Otwórz Ustawienia i uzyskaj nazwę użytkownika username.
- W sekcji API wybierz „Create new token from” (Utwórz nowy token z) Kaggle, co spowoduje pobranie pliku kaggle.json.
- Jeśli napotkasz jakieś problemy, przejdź na stronę pomocy tutaj.
5. Krok 2. Zarejestruj się i uwierzytelnij w HuggingFace
HuggingFace to centralne miejsce, w którym każdy może korzystać z technologii uczenia maszynowego. Zawiera 900 tys. modeli, 200 tys. zbiorów danych i 300 tys. aplikacji demonstracyjnych (Spaces), które są dostępne publicznie na licencji open source.
- Zarejestruj się w HuggingFace – utwórz konto z nazwą użytkownika. Nie możesz używać logowania jednokrotnego Google.
- Potwierdzanie adresu e-mail
- Kliknij tutaj i zaakceptuj licencję na model Gemma-2-9b-it.
- Utwórz token HuggingFace tutaj
- Zapisz dane logowania tokena, ponieważ będą Ci później potrzebne.
6. Krok 3. Utwórz wymagane zasoby infrastruktury Google Cloud
Skonfigurujesz GKE, GCE i Artifact Registry oraz zastosujesz role IAM za pomocą federacji tożsamości zadań.
Twój przepływ pracy AI wykorzystuje 2 pule węzłów: jedną do trenowania, a drugą do wnioskowania. Pula węzłów trenowania korzysta z maszyny wirtualnej GCE g2-standard-8 wyposażonej w 1 procesor graficzny Nvidia L4 Tensor Core. Pula węzłów wnioskowania korzysta z maszyny wirtualnej g2-standard-24 wyposażonej w 2 procesory graficzne Nvidia L4 Tensor Core. Podczas określania regionu wybierz taki, w którym obsługiwany jest wymagany procesor GPU ( link).
W Cloud Shell uruchom te polecenia:
# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")
SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1
# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto
# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount
# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com
# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
--accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-12 \
--num-nodes=1
# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
--accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-24 \
--num-nodes=1
# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a
# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}
Tworzenie plików manifestu YAML
mkdir manifests
cd manifests
mlops-sa.yaml
apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
name: airflow-mlops-sa
namespace: airflow
labels:
tier: airflow
pv-dags.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-dags
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 5Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_DAGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pv-logs.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-logs
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 100Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_LOGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pvc-dags.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-dags
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
volumeName: airflow-dags
storageClassName: standard
pvc-logs.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-logs
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
volumeName: airflow-logs
storageClassName: standard
namespace.yaml
kind: Namespace
apiVersion: v1
metadata:
name: airflow
labels:
name: airflow
sa-role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: airflow
name: airflow-deployment-role
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["services"]
verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]
sa-rolebinding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow-deployment-rolebinding
namespace: airflow
subjects:
- kind: ServiceAccount
name: airflow-worker
namespace: airflow
roleRef:
kind: Role
name: airflow-deployment-role
apiGroup: rbac.authorization.k8s.io
inference.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: inference-deployment
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: gemma-server
template:
metadata:
labels:
app: gemma-server
ai.gke.io/model: gemma-2-9b-it
ai.gke.io/inference-server: vllm
annotations:
gke-gcsfuse/volumes: "true"
spec:
serviceAccountName: airflow-mlops-sa
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
- key: "on-demand"
value: "true"
operator: "Equal"
effect: "NoSchedule"
containers:
- name: inference-server
image: vllm/vllm-openai:v0.6.6
ports:
- containerPort: 8000
resources:
requests:
nvidia.com/gpu: "2"
limits:
nvidia.com/gpu: "2"
command: ["/bin/sh", "-c"]
args:
- |
python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
volumeMounts:
- mountPath: /dev/shm
name: dshm
- name: gcs-fuse-csi-ephemeral
mountPath: /modeldata
readOnly: true
volumes:
- name: dshm
emptyDir:
medium: Memory
- name: gcs-fuse-csi-ephemeral
csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
bucketName: BUCKET_DATA_NAME
mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
fileCacheCapacity: "20Gi"
fileCacheForRangeRead: "true"
metadataStatCacheCapacity: "-1"
metadataTypeCacheCapacity: "-1"
metadataCacheTTLSeconds: "-1"
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-l4
inference-service.yaml
apiVersion: v1
kind: Service
metadata:
name: llm-service
namespace: airflow
spec:
selector:
app: gemma-server
type: LoadBalancer
ports:
- protocol: TCP
port: 8000
targetPort: 8000
Utwórz 3 zasobniki Google Cloud Storage (GCS).
gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}
# Create the namespace in GKE
kubectl apply -f namespace.yaml
# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml
Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"
7. Krok 4. Zainstaluj Airflow w GKE za pomocą karty Helm
Teraz wdrażamy Airflow 2 za pomocą narzędzia Helm. Apache Airflow to platforma open source do zarządzania przepływami pracy w potokach inżynierii danych. Zestaw funkcji Airflow 2 omówimy później.
values.yaml dla wykresu Helm Airflow
config:
webserver:
expose_config: true
webserver:
service:
type: LoadBalancer
podAnnotations:
gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
- name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
value: "30"
logs:
persistence:
enabled: true
existingClaim: "airflow-logs"
dags:
persistence:
enabled: true
existingClaim: "airflow-dags"
scheduler:
podAnnotations:
gke-gcsfuse/volumes: "true"
triggerer:
podAnnotations:
gke-gcsfuse/volumes: "true"
workers:
podAnnotations:
gke-gcsfuse/volumes: "true"
Wdrażanie Airflow 2
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml
8. Krok 5. Zainicjuj Airflow za pomocą połączeń i zmiennych
Po wdrożeniu Airflow 2 możemy rozpocząć jego konfigurowanie. Definiujemy pewne zmienne, które są odczytywane przez nasze skrypty w Pythonie.
- Otwórz interfejs Airflow na porcie 8080 w przeglądarce.
Uzyskiwanie zewnętrznego adresu IP
kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'
Otwórz przeglądarkę i wpisz adres http://<EXTERNAL-IP>:8080 . Login to admin / admin
- Utwórz domyślne połączenie GCP w interfejsie Airflow. W tym celu kliknij kolejno Admin → Connections → + Add a new record (Administracja → Połączenia → + Dodaj nowy rekord).
- Identyfikator połączenia: google_cloud_default
- Typ połączenia: Google Cloud
Kliknij Zapisz.
- Utwórz potrzebne zmienne, klikając kolejno Administracja → Zmienne → + Dodaj nowy rekord.
- Klucz: BUCKET_DATA_NAME – wartość: skopiuj z echo $BUCKET_DATA_NAME
- Klucz: GCP_PROJECT_ID – wartość: skopiuj z echo $DEVSHELL_PROJECT_ID
- Klucz: HF_TOKEN – wartość: wstaw token HF
- Klucz: KAGGLE_USERNAME – wartość: wpisz nazwę użytkownika Kaggle.
- Klucz: KAGGLE_KEY – wartość: skopiuj ją z pliku kaggle.json
Po każdej parze klucz-wartość kliknij Zapisz.
Interfejs powinien wyglądać tak:

9. Kontener kodu aplikacji 1 – pobieranie danych
W tym skrypcie w Pythonie uwierzytelniamy się w Kaggle, aby pobrać zbiór danych do naszego zasobnika GCS.
Sam skrypt jest skonteneryzowany, ponieważ staje się jednostką DAG nr 1, a zbiór danych ma być często aktualizowany, więc chcemy to zautomatyzować.
Utwórz katalog i skopiuj do niego nasze skrypty
cd .. ; mkdir 1-dataset-download
cd 1-dataset-download
dataset-download.py
import os
import kagglehub
from google.cloud import storage
KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
def upload_blob(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")
print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"
upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]
requirements.txt
google-cloud-storage==2.19.0
kagglehub==0.3.4
Teraz utworzymy obraz kontenera do pobierania zbioru danych i prześlemy go do Artifact Registry.
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest
10. Kontener kodu aplikacji 2 – przygotowanie danych
Podczas przygotowywania danych wykonujemy te czynności:
- Określ, jak dużą część zbioru danych chcesz wykorzystać do dostrajania modelu podstawowego.
- Wczytuje zbiór danych, czyli odczytuje plik CSV do ramki danych Pandas, która jest dwuwymiarową strukturą danych dla wierszy i kolumn.
- Przekształcanie i wstępne przetwarzanie danych – określanie, które części zbioru danych są nieistotne, poprzez wskazanie, które dane chcemy zachować. W efekcie pozostałe dane zostaną usunięte.
- Stosuje funkcję
transformdo każdego wiersza obiektu DataFrame. - Zapisz przygotowane dane z powrotem w zasobniku GCS.
Utwórz katalog i skopiuj do niego nasze skrypty
cd .. ; mkdir 2-data-preparation
cd 2-data-preparation
data-preparation.py
import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset
# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100")) # Process a limited number of rows, used 100 during testing phase but can be increased
DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"
# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")
def transform(data):
"""
Transforms a row of the DataFrame into the desired format for fine-tuning.
Args:
data: A pandas Series representing a row of the DataFrame.
Returns:
A dictionary containing the formatted text.
"""
question = f"Review analysis for movie '{data['id']}'"
context = data['reviewText']
answer = data['scoreSentiment']
template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
return {'text': template.format(question=question, context=context, answer=answer)}
try:
df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
print(f"Dataset loaded successfully.")
# Drop rows with NaN values in relevant columns
df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])
# Apply transformation to the DataFrame
transformed_data = df.apply(transform, axis=1).tolist()
# Convert transformed data to a DataFrame and then to a Hugging Face Dataset
transformed_df = pd.DataFrame(transformed_data)
dataset = Dataset.from_pandas(transformed_df)
# Save the prepared dataset to JSON lines format
with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
for item in dataset:
f.write(json.dumps(item) + "\n")
print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
except Exception as e:
print(f"Error during data loading or preprocessing: {e}")
import traceback
print(traceback.format_exc())
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]
requirements.txt
datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3
# Now we create a container images for data-preparation and push it to the Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest
11. Kontener z kodem aplikacji 3 – dostrajanie
W tym przypadku używamy modelu podstawowego Gemma-2-9b-it, a następnie dostrajamy go za pomocą nowego zbioru danych.
To sekwencja kroków, które są wykonywane podczas dostrajania.
1. Konfiguracja: zaimportuj biblioteki, zdefiniuj parametry (modelu, danych i trenowania) i wczytaj zbiór danych z Google Cloud Storage.
2. Wczytaj model: wczytaj wstępnie wytrenowany model językowy z kwantyzacją, aby zwiększyć wydajność, i wczytaj odpowiedni tokenizer.
3. Skonfiguruj LoRA: skonfiguruj adaptację o niskim rzędzie (LoRA), aby efektywnie dostroić model przez dodanie małych macierzy, które można trenować.
4. Trenowanie: zdefiniuj parametry trenowania i użyj SFTTrainer, aby dostroić model na wczytanym zbiorze danych za pomocą typu kwantyzacji FP16.
5. Zapisz i prześlij: zapisz dostrojony model i tokenizator lokalnie, a następnie prześlij je do naszego zasobnika GCS.
Następnie tworzymy obraz kontenera za pomocą Cloud Build i zapisujemy go w Artifact Registry.
Utwórz katalog i skopiuj do niego nasze skrypty
cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning
finetuning.py
import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage
# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")
# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")
# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
"json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)
################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))
################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03
# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50
################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False
# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
attn_implementation="eager",
pretrained_model_name_or_path=model_name,
torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")
# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
r=lora_r,
bias="none",
task_type="CAUSAL_LM",
target_modules=[
"q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj",
],
)
# Set training parameters
training_arguments = SFTConfig(
bf16=bf16,
dataset_kwargs={
"add_special_tokens": False,
"append_concat_token": False,
},
dataset_text_field="text",
disable_tqdm=True,
fp16=fp16,
gradient_accumulation_steps=gradient_accumulation_steps,
gradient_checkpointing=gradient_checkpointing,
gradient_checkpointing_kwargs={"use_reentrant": False},
group_by_length=group_by_length,
log_on_each_node=False,
logging_steps=logging_steps,
learning_rate=learning_rate,
lr_scheduler_type=lr_scheduler_type,
max_grad_norm=max_grad_norm,
max_seq_length=max_seq_length,
max_steps=max_steps,
num_train_epochs=num_train_epochs,
optim=optim,
output_dir=save_model_path,
packing=packing,
per_device_train_batch_size=per_device_train_batch_size,
save_strategy=save_strategy,
save_steps=save_steps,
save_total_limit=save_total_limit,
warmup_ratio=warmup_ratio,
weight_decay=weight_decay,
)
print(f"Configuring fine tuning completed")
# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
model=model,
train_dataset=dataset,
peft_config=peft_config,
dataset_text_field="text",
max_seq_length=max_seq_length,
tokenizer=tokenizer,
args=training_arguments,
packing=packing,
)
print(f"Creating trainer completed")
# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")
# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")
# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
low_cpu_mem_usage=True,
pretrained_model_name_or_path=model_name,
return_dict=True,
torch_dtype=torch.float16,
)
model = PeftModel.from_pretrained(
model=base_model,
model_id=new_model,
)
model = model.merge_and_unload()
print(f"Merging the new model with base model completed")
accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")
print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
is_main_process=accelerator.is_main_process,
save_directory=save_model_path,
save_function=accelerator.save,
)
print(f"Save unwrapped model completed")
print(f"Save new tokenizer started")
if accelerator.is_main_process:
tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")
# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
"""Uploads a directory to GCS."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
for root, _, files in os.walk(model_dir):
for file in files:
local_file_path = os.path.join(root, file)
gcs_file_path = os.path.relpath(local_file_path, model_dir)
blob = bucket.blob(os.path.join(new_model, gcs_file_path)) # Use new_model_name
blob.upload_from_filename(local_file_path)
# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")
Dockerfile
# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04
# Install necessary system packages
RUN apt-get update && \
apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
rm -rf /var/lib/apt/lists/*
# Copy requirements.txt into the container
COPY requirements.txt .
# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
# Copy your finetune script into the container
COPY finetuning.py .
# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]
requirements.txt
accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4
Teraz utworzymy obrazy kontenerów do dostrajania i prześlemy je do Artifact Registry.
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest
12. Omówienie Airflow 2, w tym wyjaśnienie, czym jest DAG
Airflow to platforma do orkiestracji przepływów pracy i potoków danych. Do definiowania tych przepływów pracy w kodzie Pythona używa skierowanych grafów acyklicznych (DAG), które wizualnie przedstawiają zadania i ich zależności.
Airflow ze statycznymi DAG-ami i definicjami opartymi na Pythonie dobrze nadaje się do planowania i zarządzania wstępnie zdefiniowanymi przepływami pracy. Jego architektura obejmuje przyjazny interfejs użytkownika do monitorowania tych przepływów pracy i zarządzania nimi.
Airflow umożliwia definiowanie, planowanie i monitorowanie potoków danych za pomocą Pythona, co czyni go elastycznym i zaawansowanym narzędziem do orkiestracji przepływów pracy.
13. Omówienie naszego DAG-a

DAG to skrót od Directed Acyclic Graph (skierowany graf acykliczny). W Airflow DAG reprezentuje cały przepływ pracy lub potok. Określa zadania, ich zależności i kolejność wykonywania.
Jednostki przepływu pracy w DAG-u są wykonywane z poziomu poda w klastrze GKE, który jest inicjowany na podstawie konfiguracji Airflow.
Podsumowanie:
Airflow: pobieranie danych – ten skrypt automatyzuje proces pobierania zbioru danych z opiniami o filmach z Kaggle i przechowywania go w zasobniku GCS, dzięki czemu jest on łatwo dostępny do dalszego przetwarzania lub analizy w środowisku chmury.
Airflow: przygotowywanie danych – kod pobiera surowy zbiór danych z opiniami o filmach, usuwa zbędne kolumny danych, które nie są wymagane w naszym przypadku użycia, i usuwa zbiory danych z brakującymi wartościami. Następnie strukturyzuje zbiór danych w formacie pytań i odpowiedzi odpowiednim do uczenia maszynowego i zapisuje go z powrotem w GCS do późniejszego wykorzystania.
Airflow: dostrajanie modelu – ten kod dostraja duży model językowy (LLM) za pomocą techniki adaptacji o niskim rzędzie (LoRA), a następnie zapisuje zaktualizowany model. Zaczyna się od wczytania wstępnie wytrenowanego modelu LLM i zbioru danych z Google Cloud Storage. Następnie stosuje LoRA, aby skutecznie dostroić model na tym zbiorze danych. Na koniec zapisuje dostrojony model z powrotem w Google Cloud Storage, aby można go było później używać w aplikacjach takich jak generowanie tekstu czy odpowiadanie na pytania.
Airflow: Model Serving – udostępnianie dostrojonego modelu w GKE za pomocą vllm na potrzeby wnioskowania.
Airflow: pętla opinii – ponowne trenowanie modelu co xx czasu (co godzinę, codziennie, co tydzień).
Ten diagram wyjaśnia, jak działa Airflow 2 uruchomiony w GKE.

14. Dostrajanie modelu a korzystanie z RAG
W tym laboratorium kodu dostrajamy model LLM, zamiast korzystać z techniki Retrieval Augmented Generation (RAG).
Porównajmy te 2 podejścia:
Dostrajanie: tworzy wyspecjalizowany model. Dostrajanie dostosowuje model LLM do konkretnego zadania lub zbioru danych, dzięki czemu może on działać niezależnie bez korzystania z zewnętrznych źródeł danych.
Upraszcza wnioskowanie: eliminuje potrzebę stosowania osobnego systemu wyszukiwania i bazy danych, co skutkuje szybszymi i tańszymi odpowiedziami, zwłaszcza w przypadku częstych zastosowań.
RAG: korzysta z wiedzy zewnętrznej. W przypadku każdego żądania RAG pobiera z bazy wiedzy odpowiednie informacje, zapewniając dostęp do aktualnych i szczegółowych danych.
Zwiększa złożoność: wdrożenie RAG w środowisku produkcyjnym, takim jak klaster Kubernetes, często wymaga wielu mikrousług do przetwarzania i pobierania danych, co może zwiększyć opóźnienia i koszty obliczeniowe.
Uzasadnienie wyboru dostrajania:
Chociaż RAG byłby odpowiedni w przypadku małego zbioru danych użytego w tym CodeLabie, zdecydowaliśmy się na dostrajanie, aby zademonstrować typowy przypadek użycia Airflow. Dzięki temu możemy skupić się na aspektach związanych z orkiestracją przepływu pracy, zamiast zagłębiać się w niuanse konfigurowania dodatkowej infrastruktury i mikrousług na potrzeby RAG.
Podsumowanie:
Zarówno dostrajanie, jak i RAG to przydatne techniki, które mają swoje zalety i wady. Optymalny wybór zależy od konkretnych wymagań projektu, takich jak rozmiar i złożoność danych, potrzeby w zakresie wydajności oraz kwestie związane z kosztami.
15. Zadanie 1 w DAG – utwórz pierwszy krok w Airflow: pobieranie danych
W ramach tego modułu DAG nasz kod w Pythonie hostowany w obrazie kontenera pobiera najnowszy zbiór danych RottenTomatoes z Kaggle.
Nie kopiuj tego kodu do zasobnika GCS. Na koniec kopiujemy plik mlops-dag.py, który zawiera wszystkie kroki jednostki DAG w jednym skrypcie w Pythonie.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
dataset_download
16. Zadanie DAG nr 2 – utwórz drugi etap w Airflow: przygotowanie danych
W ramach omówienia tej jednostki DAG wczytujemy plik CSV (rotten_tomatoes_movie_reviews.csv) z GCS do obiektu DataFrame biblioteki Pandas.
Następnie ograniczamy liczbę przetwarzanych wierszy za pomocą zmiennej DATASET_LIMIT na potrzeby testowania i wydajności zasobów, a na koniec przekształcamy przekształcone dane w zbiór danych Hugging Face.
Jeśli przyjrzysz się uważnie, zobaczysz, że w modelu trenujemy 1000 wierszy z parametrem „DATASET_LIMIT”: „1000”, ponieważ zajmuje to 20 minut na procesorze graficznym Nvidia L4.
Nie kopiuj tego kodu do zasobnika GCS. W ostatnim kroku kopiujemy plik mlops-dag.py, który zawiera wszystkie kroki w jednym skrypcie Pythona.
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation
17. Zadanie 3 w DAG – utwórz trzeci krok w Airflow: dostrajanie modelu
W ramach tego modułu DAG wykonujemy skrypt finetune.py, aby dostroić model Gemma za pomocą nowego zbioru danych.
Nie kopiuj tego kodu do zasobnika GCS. W ostatnim kroku kopiujemy plik mlops-dag.py, który zawiera wszystkie kroki w jednym skrypcie Pythona.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Task 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Task 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Task 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation >> fine_tuning
18. Zadanie DAG nr 4 – utwórz ostatni krok w Airflow: wnioskowanie / udostępnianie modelu
vLLM to zaawansowana biblioteka open source zaprojektowana specjalnie na potrzeby wnioskowania o wysokiej wydajności w przypadku dużych modeli językowych. Po wdrożeniu w Google Kubernetes Engine (GKE) wykorzystuje skalowalność i wydajność Kubernetes, aby skutecznie obsługiwać duże modele językowe.
Podsumowanie kroków:
- Prześlij DAG „mlops-dag.py” do zasobnika GCS.
- Skopiuj do zasobnika GCS 2 pliki konfiguracji YAML Kubernetes, aby skonfigurować wnioskowanie.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
def model_serving():
config.load_incluster_config()
k8s_apps_v1 = client.AppsV1Api()
k8s_core_v1 = client.CoreV1Api()
while True:
try:
k8s_apps_v1.delete_namespaced_deployment(
namespace="airflow",
name="inference-deployment",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Deployment inference-deployment deleted")
with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_apps_v1.create_namespaced_deployment(
body=dep, namespace="airflow")
print(f"Deployment created. Status='{resp.metadata.name}'")
while True:
try:
k8s_core_v1.delete_namespaced_service(
namespace="airflow",
name="llm-service",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Service llm-service deleted")
with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_core_v1.create_namespaced_service(
body=dep, namespace="airflow")
print(f"Service created. Status='{resp.metadata.name}'")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 4: Run GKE Deployment for model serving
model_serving = PythonOperator(
task_id="model_serving",
python_callable=model_serving
)
dataset_download >> data_preparation >> fine_tuning >> model_serving
Prześlij skrypt Pythona (plik DAG) oraz pliki manifestu Kubernetes do zasobnika GCS DAGS.
gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}
W interfejsie Airflow zobaczysz mlops-dag.
- Wybierz Wznów.
- Aby ręcznie uruchomić cykl MLOps, kliknij Uruchom DAG-a.

Po zakończeniu DAG w interfejsie Airflow zobaczysz dane wyjściowe podobne do tych poniżej.

Po wykonaniu ostatniego kroku możesz pobrać punkt końcowy modelu i wysłać prompt, aby przetestować model.
Przed wydaniem polecenia curl odczekaj około 5 minut, aby można było rozpocząć wnioskowanie modelu, a system równoważenia obciążenia mógł przypisać zewnętrzny adres IP.
export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
"prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
"temperature": 0.1,
"top_p": 1.0,
"max_tokens": 128
}
EOF
Dane wyjściowe:
19. Gratulacje!
Udało Ci się utworzyć pierwszy przepływ pracy AI za pomocą potoku DAG z Airflow 2 w GKE.
Nie zapomnij cofnąć aprowizacji wdrożonych zasobów.
20. Wersja produkcyjna
Ten CodeLab dostarczył Ci świetnych informacji o tym, jak skonfigurować Airflow 2 na GKE, ale w rzeczywistości podczas wdrażania tego rozwiązania w środowisku produkcyjnym warto wziąć pod uwagę te kwestie:
Wdróż interfejs internetowy za pomocą Gradio lub podobnego narzędzia.
Skonfiguruj automatyczne monitorowanie aplikacji dla zadań w GKE tutaj lub wyeksportuj wskaźniki z Airflow tutaj.
Aby szybciej dostroić model, zwłaszcza w przypadku większych zbiorów danych, możesz potrzebować większych procesorów graficznych. Jeśli jednak chcemy trenować model na wielu procesorach GPU, musimy podzielić zbiór danych i rozdzielić trenowanie. Oto wyjaśnienie FSDP z PyTorch (w pełni rozproszone równoległe przetwarzanie danych z udostępnianiem GPU). Więcej informacji znajdziesz w tym poście na blogu Meta oraz w tym samouczku dotyczącym FSDP z użyciem Pytorch.
Google Cloud Composer to zarządzana usługa Airflow, więc nie musisz utrzymywać samego Airflow – wystarczy wdrożyć DAG i możesz zacząć pracę.
Więcej informacji
- Dokumentacja Airflow: https://airflow.apache.org/
Licencja
To zadanie jest licencjonowane na podstawie ogólnej licencji Creative Commons Attribution 2.0.