Membuat Alur Kerja MLOps dengan Airflow 2 di GKE

Membuat Alur Kerja MLOps dengan Airflow 2 di GKE

Tentang codelab ini

subjectTerakhir diperbarui Des 10, 2024
account_circleDitulis oleh Laurent Grangeau, Darren Evans, Merve Gül Sayan

1. Ringkasan

852dc8844309ffb8.png

CodeLab ini menunjukkan cara mengintegrasikan praktik DevOps ke dalam machine learning (MLOps) dengan mendownload set data, meningkatkan kualitas model, dan men-deploy LLM di Google Kubernetes Engine (GKE) menggunakan DAG Airflow dengan jumlah abstraksi paling sedikit. Oleh karena itu, kita menggunakan perintah gcloud, bukan terraform, sehingga Anda dapat mengikuti lab langkah demi langkah dan dengan mudah memahami setiap proses dari perspektif Platform Engineer dan Machine Learning Engineer.

Panduan praktis ini akan memandu Anda memanfaatkan Airflow untuk menyederhanakan alur kerja AI, memberikan demonstrasi yang jelas dan praktis tentang seluruh siklus proses MLOps dengan mengonfigurasi DAG.

Yang akan Anda pelajari

  • Membina kolaborasi dan pemahaman yang lebih baik antara Engineer Platform dan Machine Learning dengan memecah silo pengetahuan dan meningkatkan alur kerja
  • Memahami cara men-deploy, menggunakan, dan mengelola Airflow 2 di GKE
  • Mengonfigurasi DAG Airflow dari awal hingga akhir
  • Membuat dasar untuk sistem machine learning kelas produksi dengan GKE
  • Menginstrumentasikan dan mengoperasikan sistem machine learning
  • Memahami bagaimana Platform Engineering telah menjadi pilar dukungan penting untuk MLOps

Yang dicapai CodeLab ini

  • Anda dapat mengajukan pertanyaan tentang film dari LLM yang kami optimalkan berdasarkan Gemma-2-9b-it, yang ditayangkan di GKE dengan vLLM.

Audiens Target

  • Machine Learning Engineer
  • Engineer Platform
  • Data scientist
  • Data Engineer
  • DevOps Engineer
  • Platform Architect
  • Customer Engineer

CodeLab ini tidak ditujukan

  • Sebagai pengantar alur kerja GKE atau AI/ML
  • Sebagai panduan untuk seluruh rangkaian fitur Airflow

2. Engineering Platform membantu Machine Learning Engineer/Scientist

16635a8284b994c.png

Rekayasa platform dan MLOps adalah disiplin yang saling bergantung yang berkolaborasi untuk menciptakan lingkungan yang andal dan efisien untuk pengembangan dan deployment ML.

Cakupan: Engineering platform memiliki cakupan yang lebih luas daripada MLOps, yang mencakup seluruh siklus proses pengembangan software dan menyediakan alat serta infrastruktur untuknya.

MLOps menjembatani kesenjangan antara pengembangan, deployment, dan inferensi ML.

Keahlian: Engineer platform biasanya memiliki keahlian yang kuat dalam teknologi infrastruktur seperti cloud computing, containerization, dan pengelolaan data.

Engineer MLOps berspesialisasi dalam pengembangan, deployment, dan pemantauan model ML, dan sering kali memiliki keterampilan data science dan software engineering.

Alat: Engineer platform membuat alat untuk penyediaan infrastruktur, pengelolaan konfigurasi, orkestrasi container, dan scaffolding aplikasi. Engineer MLOps menggunakan alat untuk pelatihan, eksperimen, deployment, pemantauan, dan pembuatan versi model ML.

3. Penyiapan dan persyaratan Google Cloud

Penyiapan lingkungan mandiri

  1. Login ke Google Cloud Console dan buat project baru atau gunakan kembali project yang sudah ada. Jika belum memiliki akun Gmail atau Google Workspace, Anda harus membuatnya.

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • Project name adalah nama tampilan untuk peserta project ini. String ini adalah string karakter yang tidak digunakan oleh Google API. Anda dapat memperbaruinya kapan saja.
  • Project ID bersifat unik di semua project Google Cloud dan tidak dapat diubah (tidak dapat diubah setelah ditetapkan). Cloud Console otomatis membuat string unik; biasanya Anda tidak mementingkan kata-katanya. Di sebagian besar codelab, Anda harus merujuk Project ID-nya (umumnya diidentifikasi sebagai PROJECT_ID). Jika tidak suka dengan ID yang dibuat, Anda dapat membuat ID acak lainnya. Atau, Anda dapat mencobanya sendiri, dan lihat apakah ID tersebut tersedia. ID tidak dapat diubah setelah langkah ini dan tersedia selama durasi project.
  • Sebagai informasi, ada nilai ketiga, Project Number, yang digunakan oleh beberapa API. Pelajari lebih lanjut ketiga nilai ini di dokumentasi.
  1. Selanjutnya, Anda harus mengaktifkan penagihan di Konsol Cloud untuk menggunakan resource/API Cloud. Menjalankan operasi dalam codelab ini tidak akan memakan banyak biaya, bahkan mungkin tidak sama sekali. Guna mematikan resource agar tidak menimbulkan penagihan di luar tutorial ini, Anda dapat menghapus resource yang dibuat atau menghapus project-nya. Pengguna baru Google Cloud memenuhi syarat untuk mengikuti program Uji Coba Gratis senilai $300 USD.

Mulai Cloud Shell

Meskipun Google Cloud dapat dioperasikan dari jarak jauh menggunakan laptop Anda, dalam codelab ini, Anda akan menggunakan Cloud Shell, lingkungan command line yang berjalan di Cloud.

Mengaktifkan Cloud Shell

  1. Dari Cloud Console, klik Aktifkan Cloud Shell 853e55310c205094.png.

3c1dabeca90e44e5.png

Jika ini adalah pertama kalinya Anda memulai Cloud Shell, Anda akan melihat layar perantara yang menjelaskan apa itu Cloud Shell. Jika Anda melihat layar perantara, klik Lanjutkan.

9c92662c6a846a5c.png

Perlu waktu beberapa saat untuk penyediaan dan terhubung ke Cloud Shell.

9f0e51b578fecce5.png

Virtual machine ini berisi semua alat pengembangan yang diperlukan. VM ini menawarkan direktori beranda tetap sebesar 5 GB dan beroperasi di Google Cloud, sehingga sangat meningkatkan performa dan autentikasi jaringan. Sebagian besar pekerjaan Anda dalam codelab ini dapat dilakukan dengan browser.

Setelah terhubung ke Cloud Shell, Anda akan melihat bahwa Anda telah diautentikasi dan project telah ditetapkan ke project ID Anda.

  1. Jalankan perintah berikut di Cloud Shell untuk mengonfirmasi bahwa Anda telah diautentikasi:
gcloud auth list

Output perintah

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. Jalankan perintah berikut di Cloud Shell untuk mengonfirmasi bahwa perintah gcloud mengetahui project Anda:
gcloud config list project

Output perintah

[core]
project = <PROJECT_ID>

Jika tidak, Anda dapat menyetelnya dengan perintah ini:

gcloud config set project <PROJECT_ID>

Output perintah

Updated property [core/project].

4. Langkah #1 - Daftar dan lakukan autentikasi di Kaggle

Untuk memulai CodeLab, Anda perlu membuat akun di Kaggle, yang merupakan platform komunitas online untuk data scientist dan penggemar machine learning yang dimiliki oleh Google dan menghosting repositori set data yang tersedia secara publik untuk berbagai domain. Dari situs ini, Anda akan mendownload set data RottenTomatoes, yang digunakan untuk melatih model.

  • Daftar ke Kaggle, Anda dapat menggunakan SSO Google untuk login
  • Setujui persyaratan dan ketentuan
  • Buka Setelan dan dapatkan nama pengguna Anda username
  • Di bagian API, pilih "Create new token from" Kaggle yang akan mendownload kaggle.json
  • Jika Anda mengalami masalah, buka halaman dukungan di sini

5. Langkah #2 - Daftar dan lakukan autentikasi di HuggingFace

HuggingFace adalah lokasi terpusat bagi siapa saja untuk berinteraksi dengan teknologi Machine Learning. Platform ini menghosting 900 ribu model, 200 ribu set data, dan 300 ribu aplikasi demo (Ruang), semuanya open source dan tersedia untuk publik.

  • Daftar ke HuggingFace - Buat akun dengan nama pengguna, Anda tidak dapat menggunakan SSO Google
  • Konfirmasi alamat email Anda
  • Buka di sini dan setujui lisensi untuk model Gemma-2-9b-it
  • Buat token HuggingFace di sini
  • Catat kredensial token, Anda akan memerlukannya nanti

6. Langkah #3 - Buat resource infrastruktur Google Cloud yang diperlukan

Anda akan menyiapkan GKE, GCE, Artifact Registry, dan menerapkan peran IAM menggunakan workload identity federation.

Alur kerja AI Anda menggunakan dua nodepool, satu untuk pelatihan dan satu untuk inferensi. Nodepool pelatihan menggunakan VM GCE g2-standard-8 yang dilengkapi dengan satu GPU Nvidia L4 Tensor Core. Nodepool inferensi menggunakan VM g2-standard-24 yang dilengkapi dengan dua GPU Nvidia L4 Tensor Core. Saat menentukan region, pilih region tempat GPU yang diperlukan didukung ( Link).

Di Cloud Shell, jalankan perintah berikut:

# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")

SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1

# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto

# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount

# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com

# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
  --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-12 \
  --num-nodes=1

# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
  --accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-24 \
  --num-nodes=1

# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a

# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}

Membuat manifes YAML

mkdir manifests
cd manifests

mlops-sa.yaml

apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
  name: airflow-mlops-sa
  namespace: airflow
  labels:
    tier: airflow

pv-dags.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-dags
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 5Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_DAGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pv-logs.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-logs
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 100Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_LOGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pvc-dags.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-dags
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  volumeName: airflow-dags
  storageClassName: standard

pvc-logs.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  volumeName: airflow-logs
  storageClassName: standard

namespace.yaml

kind: Namespace
apiVersion: v1
metadata:
  name: airflow
  labels:
    name: airflow

sa-role.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-deployment-role
rules:
- apiGroups: ["apps"]
  resources: ["deployments"]
  verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]

sa-rolebinding.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow-deployment-rolebinding
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: airflow-worker
  namespace: airflow
roleRef:
  kind: Role
  name: airflow-deployment-role
  apiGroup: rbac.authorization.k8s.io

inference.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
 
name: inference-deployment
 
namespace: airflow
spec:
 
replicas: 1
 
selector:
   
matchLabels:
     
app: gemma-server
 
template:
   
metadata:
     
labels:
       
app: gemma-server
       
ai.gke.io/model: gemma-2-9b-it
       
ai.gke.io/inference-server: vllm
     
annotations:
       
gke-gcsfuse/volumes: "true"
   
spec:
     
serviceAccountName: airflow-mlops-sa
     
tolerations:
     
- key: "nvidia.com/gpu"
       
operator: "Exists"
       
effect: "NoSchedule"
     
- key: "on-demand"
       
value: "true"
       
operator: "Equal"
       
effect: "NoSchedule"
     
containers:
     
- name: inference-server
       
image: vllm/vllm-openai:latest
       
ports:
       
- containerPort: 8000
       
resources:
         
requests:
           
nvidia.com/gpu: "2"
         
limits:
           
nvidia.com/gpu: "2"
       
command: ["/bin/sh", "-c"]
       
args:
       
- |
         
python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
       
volumeMounts:
       
- mountPath: /dev/shm
         
name: dshm
       
- name: gcs-fuse-csi-ephemeral
         
mountPath: /modeldata
         
readOnly: true
     
volumes:
     
- name: dshm
       
emptyDir:
         
medium: Memory
     
- name: gcs-fuse-csi-ephemeral
       
csi:
         
driver: gcsfuse.csi.storage.gke.io
         
volumeAttributes:
           
bucketName: BUCKET_DATA_NAME
           
mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
           
fileCacheCapacity: "20Gi"
           
fileCacheForRangeRead: "true"
           
metadataStatCacheCapacity: "-1"
           
metadataTypeCacheCapacity: "-1"
           
metadataCacheTTLSeconds: "-1"
     
nodeSelector:
       
cloud.google.com/gke-accelerator: nvidia-l4

inference-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: llm-service
  namespace: airflow
spec:
  selector:
    app: gemma-server
  type: LoadBalancer
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

Membuat 3 bucket Google Cloud Storage (GCS)

gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud
storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud
storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}

# Create the namespace in GKE
kubectl
apply -f namespace.yaml

# Create the PV and PVC in GKE for Airflow DAGs storage
sed
-i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed
-i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed
-i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl
apply -f pv-dags.yaml
kubectl
apply -f pv-logs.yaml
kubectl
apply -f pvc-dags.yaml
kubectl
apply -f pvc-logs.yaml
kubectl
apply -f mlops-sa.yaml
kubectl
apply -f sa-role.yaml
kubectl
apply -f sa-rolebinding.yaml

Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation

gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"

7. Langkah #4 - Instal Airflow di GKE melalui diagram helm

Sekarang kita akan men-deploy Airflow 2 menggunakan Helm. Apache Airflow adalah platform manajemen alur kerja open source untuk pipeline engineering data. Kita akan membahas kumpulan fitur Airflow 2 nanti.

values.yaml untuk diagram helm Airflow

config:
  webserver:
    expose_config: true
webserver:
  service:
    type: LoadBalancer
  podAnnotations:
    gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
  - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
    value: "30"
logs:
  persistence:
    enabled: true
    existingClaim: "airflow-logs"
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
scheduler:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
triggerer:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
workers:
  podAnnotations:
    gke-gcsfuse/volumes: "true"

Men-deploy Airflow 2

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade
--install airflow apache-airflow/airflow --namespace airflow -f values.yaml

8. Langkah #5 - Lakukan Inisialisasi Airflow dengan Koneksi dan Variabel

Setelah Airflow 2 di-deploy, kita dapat mulai mengonfigurasinya. Kita menentukan beberapa variabel, yang dibaca oleh skrip Python.

  1. Mengakses UI Airflow di port 8080 dengan browser

Mendapatkan IP eksternal

kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'

Buka browser web dan buka http://<EXTERNAL-IP>:8080 . Login adalah admin / admin

  1. Buat koneksi GCP default dalam UI Airflow, jadi buka Admin → Connections → + Add a new record
  • ID Koneksi: google_cloud_default
  • Jenis Koneksi: Google Cloud

Klik Simpan.

  1. Buat variabel yang diperlukan, jadi buka Admin → Variabel → + Tambahkan data baru
  • Kunci: BUCKET_DATA_NAME - Nilai: Salin dari echo $BUCKET_DATA_NAME
  • Kunci: GCP_PROJECT_ID - Nilai: Salin dari echo $DEVSHELL_PROJECT_ID
  • Kunci: HF_TOKEN - Nilai: Masukkan token HF Anda
  • Kunci: KAGGLE_USERNAME - Nilai: Masukkan nama pengguna kaggle Anda
  • Kunci: KAGGLE_KEY - Nilai: Salin ini dari kaggle.json

Klik Simpan setelah setiap pasangan nilai kunci.

UI Anda akan terlihat seperti ini:

771121470131b5ec.png

9. Penampung kode aplikasi #1 - Mendownload data

Dalam skrip Python ini, kita mengautentikasi dengan Kaggle untuk mendownload set data ke bucket GCS.

Skrip itu sendiri di-containerisasi karena menjadi Unit DAG #1 dan kami berharap set data akan sering diperbarui, jadi kami ingin mengotomatiskan hal ini.

Buat direktori dan salin skrip kita di sini

cd .. ; mkdir 1-dataset-download
cd 1-dataset-download

dataset-download.py

import os
import kagglehub
from google.cloud import storage

KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
   
storage_client = storage.Client()
   
bucket = storage_client.bucket(bucket_name)
   
blob = bucket.blob(destination_blob_name)
   
blob.upload_from_filename(source_file_name)
   
print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")

print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"

upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]

requirements.txt

google-cloud-storage==2.19.0
kagglehub
==0.3.4

Sekarang kita membuat image container untuk download set data dan mengirimkannya ke Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest

10. Penampung kode aplikasi #2 - Persiapan data

Selama langkah persiapan data, inilah yang kita capai:

  1. Tentukan jumlah set data yang ingin kita gunakan untuk meningkatkan kualitas model dasar
  2. Memuat set data, yaitu membaca file CSV ke dalam dataframe Pandas yang merupakan struktur data 2 dimensi untuk baris dan kolom
  3. Transformasi data / prapemrosesan - Menentukan bagian set data yang tidak relevan dengan menentukan bagian yang ingin kita simpan, yang pada akhirnya akan menghapus bagian lainnya
  4. Menerapkan fungsi transform ke setiap baris DataFrame
  5. Simpan data yang disiapkan kembali ke bucket GCS

Buat direktori dan salin skrip kita di sini

cd .. ; mkdir 2-data-preparation
cd
2-data-preparation

data-preparation.py

import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset

# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100"))  # Process a limited number of rows, used 100 during testing phase but can be increased

DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"

# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")

def transform(data):
    """
    Transforms a row of the DataFrame into the desired format for fine-tuning.

    Args:
      data: A pandas Series representing a row of the DataFrame.

    Returns:
      A dictionary containing the formatted text.
    """
   
question = f"Review analysis for movie '{data['id']}'"
   
context = data['reviewText']
   
answer = data['scoreSentiment']
   
template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
   
return {'text': template.format(question=question, context=context, answer=answer)}

try:
   
df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
   
print(f"Dataset loaded successfully.")

   
# Drop rows with NaN values in relevant columns
   
df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])

   
# Apply transformation to the DataFrame
   
transformed_data = df.apply(transform, axis=1).tolist()

   
# Convert transformed data to a DataFrame and then to a Hugging Face Dataset
   
transformed_df = pd.DataFrame(transformed_data)
   
dataset = Dataset.from_pandas(transformed_df)

   
# Save the prepared dataset to JSON lines format
   
with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
       
for item in dataset:
           
f.write(json.dumps(item) + "\n")

   
print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
   
except Exception as e:
   
print(f"Error during data loading or preprocessing: {e}")
   
import traceback
   
print(traceback.format_exc())

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR
/app
COPY requirements
.txt .
RUN pip install
--no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data
-preparation.py .
CMD
["python", "data-preparation.py"]

requirements.txt

datasets==3.1.0
gcsfs
==2024.9.0
pandas
==2.2.3

# Now we create a container images for data-preparation and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest

11. Penampung kode aplikasi #3 - Penyesuaian

Di sini, kita menggunakan Gemma-2-9b-it sebagai model dasar, lalu menyesuaikannya dengan set data baru.

Berikut adalah urutan langkah yang terjadi selama langkah penyesuaian.

1. Penyiapan: Mengimpor library, menentukan parameter (untuk model, data, dan pelatihan), dan memuat set data dari Google Cloud Storage.

2. Load Model: Memuat model bahasa terlatih dengan kuantisasi untuk efisiensi, dan memuat tokenizer yang sesuai.

3. Konfigurasi LoRA: Siapkan Low-Rank Adaptation (LoRA) untuk menyesuaikan model secara efisien dengan menambahkan matriks kecil yang dapat dilatih.

4. Latih: Tentukan parameter pelatihan dan gunakan SFTTrainer untuk menyesuaikan model pada set data yang dimuat menggunakan jenis kuantisasi FP16.

5. Simpan dan Upload: Simpan model dan tokenizer yang dioptimalkan secara lokal, lalu upload ke bucket GCS.

Kemudian, kita membuat image container menggunakan Cloud Build dan menyimpannya di Artifact Registry.

Buat direktori dan salin skrip kita di sini

cd .. ; mkdir 3-fine-tuning
cd
3-fine-tuning

finetuning.py

import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage

# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")

# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
   
"json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)

################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))

################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50

################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False

# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
   
attn_implementation="eager",
   
pretrained_model_name_or_path=model_name,
   
torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")

# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
   
lora_alpha=lora_alpha,
   
lora_dropout=lora_dropout,
   
r=lora_r,
   
bias="none",
   
task_type="CAUSAL_LM",
   
target_modules=[
       
"q_proj",
       
"k_proj",
       
"v_proj",
       
"o_proj",
       
"gate_proj",
       
"up_proj",
       
"down_proj",
   
],
)

# Set training parameters
training_arguments = SFTConfig(
       
bf16=bf16,
       
dataset_kwargs={
           
"add_special_tokens": False,  
           
"append_concat_token": False,
       
},
       
dataset_text_field="text",
       
disable_tqdm=True,
       
fp16=fp16,
       
gradient_accumulation_steps=gradient_accumulation_steps,
       
gradient_checkpointing=gradient_checkpointing,
       
gradient_checkpointing_kwargs={"use_reentrant": False},
       
group_by_length=group_by_length,
       
log_on_each_node=False,
       
logging_steps=logging_steps,
       
learning_rate=learning_rate,
       
lr_scheduler_type=lr_scheduler_type,
       
max_grad_norm=max_grad_norm,
       
max_seq_length=max_seq_length,
       
max_steps=max_steps,
       
num_train_epochs=num_train_epochs,
       
optim=optim,
       
output_dir=save_model_path,
       
packing=packing,
       
per_device_train_batch_size=per_device_train_batch_size,
       
save_strategy=save_strategy,
       
save_steps=save_steps,
       
save_total_limit=save_total_limit,
       
warmup_ratio=warmup_ratio,
       
weight_decay=weight_decay,
   
)

print(f"Configuring fine tuning completed")

# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
   
model=model,
   
train_dataset=dataset,
   
peft_config=peft_config,
   
dataset_text_field="text",
   
max_seq_length=max_seq_length,
   
tokenizer=tokenizer,
   
args=training_arguments,
   
packing=packing,
)

print(f"Creating trainer completed")

# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")

# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
   
low_cpu_mem_usage=True,
   
pretrained_model_name_or_path=model_name,
   
return_dict=True,
   
torch_dtype=torch.float16,
)

model = PeftModel.from_pretrained(
   
model=base_model,
   
model_id=new_model,
)
model = model.merge_and_unload()

print(f"Merging the new model with base model completed")

accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")

print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
   
is_main_process=accelerator.is_main_process,
   
save_directory=save_model_path,
   
save_function=accelerator.save,
)
print(f"Save unwrapped model completed")

print(f"Save new tokenizer started")
if accelerator.is_main_process:
   
tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")

# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
    """Uploads a directory to GCS."""
   
storage_client = storage.Client()
   
bucket = storage_client.bucket(bucket_name)
   
for root, _, files in os.walk(model_dir):
       
for file in files:
           
local_file_path = os.path.join(root, file)
           
gcs_file_path = os.path.relpath(local_file_path, model_dir)
           
blob = bucket.blob(os.path.join(new_model, gcs_file_path))  # Use new_model_name
           
blob.upload_from_filename(local_file_path)

# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")

Dockerfile

# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04

# Install necessary system packages
RUN apt-get update && \
    apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements.txt into the container
COPY requirements.txt .

# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Copy your finetune script into the container
COPY finetuning.py .

# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
 
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]

requirements.txt

accelerate==1.1.1
bitsandbytes
==0.45.0
datasets
==3.1.0
gcsfs
==2024.9.0
peft
==v0.13.2
torch
==2.5.1
transformers
==4.47.0
trl
==v0.11.4

Sekarang kita membuat image container untuk melakukan penyesuaian dan mengirimkannya ke Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest

12. Ringkasan Airflow 2 termasuk apa yang dimaksud dengan DAG

Airflow adalah platform untuk mengatur alur kerja dan pipeline data. Cloud Composer menggunakan DAG (Directed Acyclic Graph) untuk menentukan alur kerja ini dalam kode Python, yang secara visual merepresentasikan tugas dan dependensinya.

Airflow, dengan DAG statis dan definisi berbasis Python, sangat cocok untuk menjadwalkan dan mengelola alur kerja yang telah ditentukan sebelumnya. Arsitekturnya mencakup UI yang mudah digunakan untuk memantau dan mengelola alur kerja ini.

Pada dasarnya, Airflow memungkinkan Anda menentukan, menjadwalkan, dan memantau pipeline data menggunakan Python, sehingga menjadikannya alat yang fleksibel dan canggih untuk orkestrasi alur kerja.

13. Ringkasan DAG kami

ec49964ad7d61491.png

DAG adalah singkatan dari Directed Acyclic Graph. Di Airflow, DAG sendiri mewakili seluruh alur kerja atau pipeline. File ini menentukan tugas, dependensinya, dan urutan eksekusi.

Unit alur kerja dalam DAG dijalankan dari pod di cluster GKE, yang dimulai dari konfigurasi Airflow.

Ringkasan:

Airflow: Download data - Skrip ini mengotomatiskan proses mendapatkan set data ulasan film dari Kaggle dan menyimpannya di bucket GCS, sehingga tersedia untuk pemrosesan atau analisis lebih lanjut di lingkungan cloud Anda.

Airflow: Persiapan Data - Kode ini mengambil set data ulasan film mentah, menghapus kolom data yang tidak diperlukan untuk kasus penggunaan kita, dan menghapus set data dengan nilai yang hilang. Selanjutnya, model ini menyusun set data ke dalam format menjawab pertanyaan yang sesuai untuk machine learning, dan menyimpannya kembali di GCS untuk digunakan nanti.

Airflow: Model Finetuning - Kode ini menyesuaikan model bahasa besar (LLM) menggunakan teknik yang disebut LoRA (Low-Rank Adaptation), lalu menyimpan model yang diperbarui. Proses ini dimulai dengan memuat LLM terlatih dan set data dari Google Cloud Storage. Kemudian, model ini menerapkan LoRA untuk menyesuaikan model secara efisien pada set data ini. Terakhir, model yang telah disesuaikan akan disimpan kembali ke Google Cloud Storage untuk digunakan nanti dalam aplikasi seperti pembuatan teks atau menjawab pertanyaan.

Airflow: Penyaluran Model - Menyajikan model yang telah dioptimalkan di GKE dengan vllm untuk inferensi.

Airflow: Feedback loop - Melatih ulang model setiap xx waktu (per jam, harian, mingguan).

Diagram ini menjelaskan cara kerja Airflow 2, saat dijalankan di GKE.

8691f41166209a5d.png

14. Menyesuaikan model vs menggunakan RAG

CodeLab ini akan meningkatkan kualitas LLM, bukan menggunakan Retrieval Augmented Generation (RAG).

Mari kita bandingkan kedua pendekatan ini:

Penyesuaian: Membuat model khusus: Penyesuaian akan menyesuaikan LLM dengan tugas atau set data tertentu, sehingga dapat beroperasi secara independen tanpa mengandalkan sumber data eksternal.

Menyederhanakan inferensi: Hal ini menghilangkan kebutuhan akan sistem pengambilan dan database terpisah, sehingga menghasilkan respons yang lebih cepat dan lebih murah, terutama untuk kasus penggunaan yang sering.

RAG: Bergantung pada pengetahuan eksternal: RAG mengambil informasi yang relevan dari pusat informasi untuk setiap permintaan, sehingga memastikan akses ke data terbaru dan spesifik.

Meningkatkan kompleksitas: Menerapkan RAG di lingkungan produksi seperti cluster Kubernetes sering kali melibatkan beberapa microservice untuk pemrosesan dan pengambilan data, yang berpotensi meningkatkan latensi dan biaya komputasi.

Alasan penyesuaian dipilih:

Meskipun RAG akan cocok untuk set data kecil yang digunakan dalam CodeLab ini, kami memilih untuk melakukan penyesuaian untuk mendemonstrasikan kasus penggunaan umum untuk Airflow. Pilihan ini memungkinkan kita berfokus pada aspek orkestrasi alur kerja, bukan mempelajari nuansa penyiapan infrastruktur dan microservice tambahan untuk RAG.

Kesimpulan:

Penyesuaian dan RAG adalah teknik yang berharga dengan kelebihan dan kekurangannya masing-masing. Pilihan yang optimal bergantung pada persyaratan spesifik project Anda, seperti ukuran dan kompleksitas data, kebutuhan performa, dan pertimbangan biaya.

15. Tugas DAG #1 - Buat langkah pertama Anda di Airflow: Download data

Sebagai ringkasan unit DAG ini, kode Python kami yang dihosting dalam image container mendownload set data RottenTomatoes terbaru dari Kaggle.

Jangan menyalin kode ini ke bucket GCS. Kita menyalin mlops-dag.py sebagai langkah terakhir, yang berisi semua langkah Unit DAG dalam satu skrip Python.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
           
start_date=datetime(2024,11,1),
           
schedule_interval="@daily",
           
catchup=False) as dag:

       
# Step 1: Fetch raw data to GCS Bucket
       
dataset_download = KubernetesPodOperator(
           
task_id="dataset_download_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
           
name="dataset-download",
           
service_account_name="airflow-mlops-sa",
           
env_vars={
                   
"KAGGLE_USERNAME":KAGGLE_USERNAME,
                   
"KAGGLE_KEY":KAGGLE_KEY,
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
           
}
       
)

       
dataset_download

16. Tugas DAG #2 - Buat langkah kedua Anda di Airflow: Persiapan Data

Sebagai ringkasan unit DAG ini, kita memuat file CSV (rotten_tomatoes_movie_reviews.csv) dari GCS ke dalam DataFrame Pandas.

Selanjutnya, kita membatasi jumlah baris yang diproses menggunakan DATASET_LIMIT untuk pengujian dan efisiensi resource, lalu akhirnya mengonversi data yang ditransformasi menjadi Set Data Hugging Face.

Jika Anda melihat dengan cermat, Anda akan melihat bahwa kita melatih 1.000 baris dalam model dengan "DATASET_LIMIT": "1000", ini karena memerlukan waktu 20 menit di GPU Nvidia L4 untuk melakukannya.

Jangan menyalin kode ini ke bucket GCS. Kita menyalin mlops-dag.py pada langkah terakhir, yang berisi semua langkah dalam satu skrip Python.

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
           
start_date=datetime(2024,11,1),
           
schedule_interval="@daily",
           
catchup=False) as dag:

       
# Step 1: Fetch raw data to GCS Bucket
       
dataset_download = KubernetesPodOperator(
           
task_id="dataset_download_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
           
name="dataset-download",
           
service_account_name="airflow-mlops-sa",
           
env_vars={
                   
"KAGGLE_USERNAME":KAGGLE_USERNAME,
                   
"KAGGLE_KEY":KAGGLE_KEY,
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
           
}
       
)

       
# Step 2: Run GKEJob for data preparation
       
data_preparation = KubernetesPodOperator(
           
task_id="data_pipeline_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
           
name="data-preparation",
           
service_account_name="airflow-mlops-sa",
           
env_vars={
                   
"GCP_PROJECT_ID":GCP_PROJECT_ID,
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                   
"DATASET_LIMIT": "1000",
                   
"HF_TOKEN":HF_TOKEN
           
}
       
)

       
dataset_download >> data_preparation

17. Tugas DAG #3 - Buat langkah ketiga Anda di Airflow: Penyesuaian Model

Sebagai ringkasan unit DAG ini, di sini kita mengeksekusi finetune.py untuk meningkatkan kualitas model Gemma dengan set data baru.

Jangan menyalin kode ini ke bucket GCS. Kita menyalin mlops-dag.py pada langkah terakhir, yang berisi semua langkah dalam satu skrip Python.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
           
start_date=datetime(2024,11,1),
           
schedule_interval="@daily",
           
catchup=False) as dag:

       
# DAG Task 1: Fetch raw data to GCS Bucket
       
dataset_download = KubernetesPodOperator(
           
task_id="dataset_download_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
           
name="dataset-download",
           
service_account_name="airflow-mlops-sa",
           
env_vars={
                   
"KAGGLE_USERNAME":KAGGLE_USERNAME,
                   
"KAGGLE_KEY":KAGGLE_KEY,
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
           
}
       
)

       
# DAG Task 2: Run GKEJob for data preparation
       
data_preparation = KubernetesPodOperator(
           
task_id="data_pipeline_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
           
name="data-preparation",
           
service_account_name="airflow-mlops-sa",
           
env_vars={
                   
"GCP_PROJECT_ID":GCP_PROJECT_ID,
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                   
"DATASET_LIMIT": "1000",
                   
"HF_TOKEN":HF_TOKEN
           
}
       
)

       
# DAG Task 3: Run GKEJob for fine tuning
       
fine_tuning = KubernetesPodOperator(
           
task_id="fine_tuning_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
           
name="fine-tuning",
           
service_account_name="airflow-mlops-sa",
           
startup_timeout_seconds=600,
           
container_resources=models.V1ResourceRequirements(
                   
requests={"nvidia.com/gpu": "1"},
                   
limits={"nvidia.com/gpu": "1"}
           
),
           
env_vars={
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                   
"HF_TOKEN":HF_TOKEN
           
}
       
)

       
dataset_download >> data_preparation >> fine_tuning

18. Tugas DAG #4 - Buat langkah terakhir Anda di Airflow: Inferensi / Menayangkan model

vLLM adalah library open source canggih yang dirancang khusus untuk inferensi LLM berperforma tinggi. Saat di-deploy di Google Kubernetes Engine (GKE), LLM memanfaatkan skalabilitas dan efisiensi Kubernetes untuk menayangkan LLM secara efektif.

Ringkasan langkah:

  • Upload DAG "mlops-dag.py" ke bucket GCS.
  • Salin dua file konfigurasi YAML Kubernetes untuk menyiapkan inferensi, ke bucket GCS.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

def model_serving():
   
config.load_incluster_config()
   
k8s_apps_v1 = client.AppsV1Api()
   
k8s_core_v1 = client.CoreV1Api()

   
while True:
       
try:
           
k8s_apps_v1.delete_namespaced_deployment(
                   
namespace="airflow",
                   
name="inference-deployment",
                   
body=client.V1DeleteOptions(
                   
propagation_policy="Foreground", grace_period_seconds=5
                   
)
           
)
       
except ApiException:
           
break
   
print("Deployment inference-deployment deleted")
   
   
with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
       
dep = yaml.safe_load(f)
       
resp = k8s_apps_v1.create_namespaced_deployment(
           
body=dep, namespace="airflow")
       
print(f"Deployment created. Status='{resp.metadata.name}'")
   
   
while True:
       
try:
           
k8s_core_v1.delete_namespaced_service(
                   
namespace="airflow",
                   
name="llm-service",
                   
body=client.V1DeleteOptions(
                   
propagation_policy="Foreground", grace_period_seconds=5
                   
)
           
)
       
except ApiException:
           
break
   
print("Service llm-service deleted")

   
with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
       
dep = yaml.safe_load(f)
       
resp = k8s_core_v1.create_namespaced_service(
           
body=dep, namespace="airflow")
       
print(f"Service created. Status='{resp.metadata.name}'")

with DAG(dag_id="mlops-dag",
           
start_date=datetime(2024,11,1),
           
schedule_interval="@daily",
           
catchup=False) as dag:

       
# DAG Step 1: Fetch raw data to GCS Bucket
       
dataset_download = KubernetesPodOperator(
           
task_id="dataset_download_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
           
name="dataset-download",
           
service_account_name="airflow-mlops-sa",
           
env_vars={
                   
"KAGGLE_USERNAME":KAGGLE_USERNAME,
                   
"KAGGLE_KEY":KAGGLE_KEY,
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
           
}
       
)

       
# DAG Step 2: Run GKEJob for data preparation
       
data_preparation = KubernetesPodOperator(
           
task_id="data_pipeline_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
           
name="data-preparation",
           
service_account_name="airflow-mlops-sa",
           
env_vars={
                   
"GCP_PROJECT_ID":GCP_PROJECT_ID,
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                   
"DATASET_LIMIT": "1000",
                   
"HF_TOKEN":HF_TOKEN
           
}
       
)

       
# DAG Step 3: Run GKEJob for fine tuning
       
fine_tuning = KubernetesPodOperator(
           
task_id="fine_tuning_task",
           
namespace=JOB_NAMESPACE,
           
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
           
name="fine-tuning",
           
service_account_name="airflow-mlops-sa",
           
startup_timeout_seconds=600,
           
container_resources=models.V1ResourceRequirements(
                   
requests={"nvidia.com/gpu": "1"},
                   
limits={"nvidia.com/gpu": "1"}
           
),
           
env_vars={
                   
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                   
"HF_TOKEN":HF_TOKEN
           
}
       
)

       
# DAG Step 4: Run GKE Deployment for model serving
       
model_serving = PythonOperator(
           
task_id="model_serving",
           
python_callable=model_serving
       
)

       
dataset_download >> data_preparation >> fine_tuning >> model_serving

Upload skrip Python (file DAG), serta manifes Kubernetes ke bucket GCS DAGS.

gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud
storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud
storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}

Di UI Airflow, Anda akan melihat mlops-dag.

  1. Pilih hentikan jeda.
  2. Pilih Trigger DAG untuk melakukan siklus MLOps manual.

d537281b92d5e8bb.png

Setelah DAG selesai, Anda akan melihat output seperti ini di UI Airflow.

3ed42abf8987384e.png

Setelah langkah terakhir, Anda dapat mengambil endpoint model dan mengirim perintah untuk menguji model.

Tunggu sekitar 5 menit sebelum mengeluarkan perintah curl, sehingga inferensi model dapat dimulai dan load balancer dapat menetapkan Alamat IP eksternal.

export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')

curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
    "prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
    "temperature": 0.1,
    "top_p": 1.0,
    "max_tokens": 128
}
EOF

Output:

19. Selamat!

Anda telah membuat alur kerja AI pertama menggunakan pipeline DAG dengan Airflow 2 di GKE.

Jangan lupa untuk membatalkan penyediaan resource yang telah Anda deploy.

20. Melakukannya dalam Produksi

Meskipun CodeLab telah memberi Anda insight yang luar biasa tentang cara menyiapkan Airflow 2 di GKE, di dunia nyata, Anda sebaiknya mempertimbangkan beberapa topik berikut saat melakukannya dalam produksi.

Terapkan frontend web menggunakan Gradio atau alat serupa.

Konfigurasikan pemantauan aplikasi otomatis untuk beban kerja dengan GKE di sini atau ekspor metrik dari Airflow di sini.

Anda mungkin memerlukan GPU yang lebih besar untuk menyesuaikan model dengan lebih cepat, terutama jika memiliki set data yang lebih besar. Namun, jika ingin melatih model di beberapa GPU, kita harus membagi set data dan membuat shard pelatihan. Berikut adalah penjelasan tentang FSDP dengan PyTorch (paralel data yang di-shard sepenuhnya, menggunakan berbagi GPU untuk mencapai tujuan tersebut. Bacaan lebih lanjut dapat ditemukan di sini dalam postingan blog dari Meta dan lainnya dalam tutorial tentang FSDP menggunakan Pytorch.

Google Cloud Composer adalah layanan Airflow terkelola, sehingga Anda tidak perlu mengelola Airflow itu sendiri. Cukup deploy DAG dan Anda siap menggunakannya.

Pelajari lebih lanjut

Lisensi

Karya ini dilisensikan berdasarkan Lisensi Umum Creative Commons Attribution 2.0.