GKE'de Airflow 2 ile MLOps iş akışları oluşturma

1. Genel Bakış

852dc8844309ffb8.png

Bu CodeLab'de, en az düzeyde soyutlama içeren bir Airflow DAG'si kullanarak veri kümesi indirme, modeli iyileştirme ve LLM'yi Google Kubernetes Engine'e (GKE) dağıtma yoluyla DevOps uygulamalarının makine öğrenimine (MLOps) nasıl entegre edileceği gösterilmektedir. Bu nedenle, laboratuvarı adım adım takip edebilmeniz ve her süreci hem Platform Mühendisi hem de Makine Öğrenimi Mühendisi açısından kolayca anlayabilmeniz için Terraform yerine gcloud komutlarını kullanıyoruz.

Bu uygulamalı kılavuzda, Airflow'u kullanarak yapay zeka iş akışlarınızı nasıl kolaylaştıracağınız anlatılmaktadır. Bir DAG yapılandırarak tüm MLOps yaşam döngüsünün net ve pratik bir gösterimi sunulmaktadır.

Neler öğreneceksiniz?

  • Bilgi silolarını ortadan kaldırarak ve iş akışlarını iyileştirerek Platform ve Makine Öğrenimi Mühendisleri arasında daha fazla işbirliği ve anlayış geliştirin.
  • GKE'de Airflow 2'yi dağıtma, kullanma ve yönetme
  • Airflow DAG'sini uçtan uca yapılandırma
  • GKE ile üretime hazır makine öğrenimi sistemlerinin temelini oluşturma
  • Makine öğrenimi sistemlerini kullanıma sunma ve çalıştırma
  • Platform mühendisliğinin MLOps için nasıl kritik bir destek sütunu haline geldiğini anlama

Bu CodeLab'in amaçları

  • Gemma-2-9b-it temel alınarak ince ayar yaptığımız ve vLLM ile GKE'de sunulan bir LLM'den filmler hakkında soru sorabilirsiniz.

Hedef kitle

  • Makine Öğrenimi Mühendisleri
  • Platform Mühendisleri
  • Veri bilimciler
  • Veri Mühendisleri
  • DevOps Mühendisleri
  • Platform Mimarı
  • Customer Engineers

Bu CodeLab,

  • GKE veya yapay zeka/makine öğrenimi iş akışlarına giriş olarak
  • Airflow özelliklerinin tamamını incelemek için

2. Platform mühendisliği, makine öğrenimi mühendislerine/bilim insanlarına yardımcı olur

16635a8284b994c.png

Platform mühendisliği ve MLOps, makine öğrenimi geliştirme ve dağıtımı için sağlam ve verimli bir ortam oluşturmak üzere işbirliği yapan birbirine bağımlı disiplinlerdir.

Kapsam: Platform mühendisliği, MLOps'tan daha geniş bir kapsama sahiptir. Yazılım geliştirme yaşam döngüsünün tamamını kapsar ve bu döngü için araçlar ve altyapı sağlar.

MLOps, makine öğrenimi geliştirme, dağıtım ve çıkarım arasındaki boşluğu kapatır.

Yetkinlik: Platform mühendisleri genellikle bulut bilişim, container mimarisine alma ve veri yönetimi gibi altyapı teknolojileri konusunda güçlü bir yetkinliğe sahiptir.

MLOps mühendisleri, makine öğrenimi modeli geliştirme, dağıtma ve izleme konusunda uzmanlaşmıştır. Genellikle veri bilimi ve yazılım mühendisliği becerilerine sahiptirler.

Araçlar: Platform mühendisleri, altyapı sağlama, yapılandırma yönetimi, kapsayıcı düzenleme ve uygulama iskeleti oluşturma için araçlar geliştirir. MLOps mühendisleri, makine öğrenimi modeli eğitimi, deneme, dağıtım, izleme ve sürüm oluşturma için araçlar kullanır.

3. Google Cloud kurulumu ve şartları

Yönlendirmesiz ortam kurulumu

  1. Google Cloud Console'da oturum açın ve yeni bir proje oluşturun veya mevcut bir projeyi yeniden kullanın. Gmail veya Google Workspace hesabınız yoksa hesap oluşturmanız gerekir.

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • Proje adı, bu projenin katılımcıları için görünen addır. Google API'leri tarafından kullanılmayan bir karakter dizesidir. Bu bilgiyi istediğiniz zaman güncelleyebilirsiniz.
  • Proje kimliği, tüm Google Cloud projelerinde benzersizdir ve sabittir (ayarlandıktan sonra değiştirilemez). Cloud Console, benzersiz bir dizeyi otomatik olarak oluşturur. Genellikle bu dizenin ne olduğuyla ilgilenmezsiniz. Çoğu codelab'de proje kimliğinize (genellikle PROJECT_ID olarak tanımlanır) başvurmanız gerekir. Oluşturulan kimliği beğenmezseniz başka bir rastgele kimlik oluşturabilirsiniz. Dilerseniz kendi adınızı deneyerek kullanılabilir olup olmadığını kontrol edebilirsiniz. Bu adım tamamlandıktan sonra değiştirilemez ve proje süresince geçerli kalır.
  • Bazı API'lerin kullandığı üçüncü bir değer olan Proje Numarası da vardır. Bu üç değer hakkında daha fazla bilgiyi belgelerde bulabilirsiniz.
  1. Ardından, Cloud kaynaklarını/API'lerini kullanmak için Cloud Console'da faturalandırmayı etkinleştirmeniz gerekir. Bu codelab'i tamamlamak neredeyse hiç maliyetli değildir. Bu eğitimin ötesinde faturalandırılmayı önlemek için kaynakları kapatmak üzere oluşturduğunuz kaynakları veya projeyi silebilirsiniz. Yeni Google Cloud kullanıcıları 300 ABD doları değerinde ücretsiz deneme programından yararlanabilir.

Cloud Shell'i başlatma

Google Cloud, dizüstü bilgisayarınızdan uzaktan çalıştırılabilir ancak bu codelab'de bulutta çalışan bir komut satırı ortamı olan Cloud Shell'i kullanacaksınız.

Cloud Shell'i etkinleştirme

  1. Cloud Console'da Cloud Shell'i etkinleştir 'i 853e55310c205094.png tıklayın.

3c1dabeca90e44e5.png

Cloud Shell'i ilk kez başlatıyorsanız ne olduğunu açıklayan bir ara ekran gösterilir. Ara ekran gösterildiyse Devam'ı tıklayın.

9c92662c6a846a5c.png

Cloud Shell'in temel hazırlığı ve bağlanması yalnızca birkaç dakikanızı alır.

9f0e51b578fecce5.png

Bu sanal makineye, ihtiyaç duyacağınız tüm geliştirme araçları yüklenmiştir. 5 GB boyutunda kalıcı bir ana dizin bulunur ve Google Cloud'da çalışır. Bu sayede ağ performansı ve kimlik doğrulama önemli ölçüde güçlenir. Bu codelab'deki çalışmalarınızın neredeyse tamamını tarayıcıyla yapabilirsiniz.

Cloud Shell'e bağlandıktan sonra kimliğinizin doğrulandığını ve projenin, proje kimliğinize ayarlandığını görürsünüz.

  1. Kimliğinizin doğrulandığını onaylamak için Cloud Shell'de şu komutu çalıştırın:
gcloud auth list

Komut çıkışı

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. gcloud komutunun projeniz hakkında bilgi sahibi olduğunu onaylamak için Cloud Shell'de aşağıdaki komutu çalıştırın:
gcloud config list project

Komut çıkışı

[core]
project = <PROJECT_ID>

Değilse şu komutla ayarlayabilirsiniz:

gcloud config set project <PROJECT_ID>

Komut çıkışı

Updated property [core/project].

4. 1. adım: Kaggle'a kaydolun ve kimliğinizi doğrulayın

CodeLab'e başlamak için Kaggle'da hesap oluşturmanız gerekir. Kaggle, Google'a ait olan ve çeşitli alanlarda herkese açık veri kümelerinin bulunduğu geniş bir depoya ev sahipliği yapan, veri bilimciler ve makine öğrenimi meraklıları için bir online topluluk platformudur. Modelinizi eğitmek için kullanılan RottenTomatoes veri kümesini bu siteden indirebilirsiniz.

  • Kaggle'a kaydolun. Oturum açmak için Google TOA'yı kullanabilirsiniz.
  • Şartlar ve koşulları kabul et
  • Ayarlar'a gidip kullanıcı adınızı username alın.
  • API bölümünde, "Create new token from" (Yeni jeton oluştur) Kaggle'ı seçin. Bu işlem, kaggle.json dosyasını indirir.
  • Sorun yaşarsanız buradan destek sayfasına gidin.

5. 2. adım: HuggingFace'e kaydolun ve kimliğinizi doğrulayın

HuggingFace, herkesin makine öğrenimi teknolojisiyle etkileşim kurabileceği merkezi bir yerdir. Tümü açık kaynak ve herkese açık olan 900.000 model, 200.000 veri kümesi ve 300.000 demo uygulaması (Spaces) barındırır.

  • HuggingFace'e kaydolun. Kullanıcı adıyla hesap oluşturun. Google TOA'yı kullanamazsınız.
  • E-posta adresinizi onaylama
  • Buraya gidip Gemma-2-9b-it modelinin lisansını kabul edin.
  • HuggingFace jetonu oluşturmak için burayı ziyaret edin.
  • Jeton kimlik bilgilerini kaydedin. Bunlara daha sonra ihtiyacınız olacaktır.

6. 3. adım: Gerekli Google Cloud altyapı kaynaklarını oluşturun

İş yükü kimliği federasyonunu kullanarak GKE, GCE ve Artifact Registry'yi ayarlayıp IAM rollerini uygulayacaksınız.

Yapay zeka iş akışınızda iki düğüm havuzu kullanılır. Bunlardan biri eğitim, diğeri ise çıkarım içindir. Eğitim düğüm havuzunda, bir Nvidia L4 Tensor Çekirdekli GPU ile donatılmış bir g2-standard-8 GCE VM kullanılmaktadır. Çıkarım düğüm havuzunda, iki Nvidia L4 Tensor Core GPU ile donatılmış bir g2-standard-24 VM kullanılıyor. Bölgeyi belirtirken gerekli GPU'nun desteklendiği bir bölge seçin ( Bağlantı).

Cloud Shell'inizde aşağıdaki komutları çalıştırın:

# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")

SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1

# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto

# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount

# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com

# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
  --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-12 \
  --num-nodes=1

# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
  --accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-24 \
  --num-nodes=1

# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a

# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}

YAML manifestlerinizi oluşturma

mkdir manifests
cd manifests

mlops-sa.yaml

apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
  name: airflow-mlops-sa
  namespace: airflow
  labels:
    tier: airflow

pv-dags.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-dags
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 5Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_DAGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pv-logs.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-logs
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 100Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_LOGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pvc-dags.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-dags
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  volumeName: airflow-dags
  storageClassName: standard

pvc-logs.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  volumeName: airflow-logs
  storageClassName: standard

namespace.yaml

kind: Namespace
apiVersion: v1
metadata:
  name: airflow
  labels:
    name: airflow

sa-role.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-deployment-role
rules:
- apiGroups: ["apps"] 
  resources: ["deployments"]
  verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]

sa-rolebinding.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow-deployment-rolebinding
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: airflow-worker
  namespace: airflow
roleRef:
  kind: Role
  name: airflow-deployment-role
  apiGroup: rbac.authorization.k8s.io

inference.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-deployment
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gemma-server
  template:
    metadata:
      labels:
        app: gemma-server
        ai.gke.io/model: gemma-2-9b-it
        ai.gke.io/inference-server: vllm
      annotations:
        gke-gcsfuse/volumes: "true"
    spec:
      serviceAccountName: airflow-mlops-sa
      tolerations:
      - key: "nvidia.com/gpu"
        operator: "Exists"
        effect: "NoSchedule"
      - key: "on-demand"
        value: "true"
        operator: "Equal"
        effect: "NoSchedule"
      containers:
      - name: inference-server
        image: vllm/vllm-openai:v0.6.6
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: "2"
          limits:
            nvidia.com/gpu: "2"
        command: ["/bin/sh", "-c"]
        args:
        - |
          python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
        volumeMounts:
        - mountPath: /dev/shm
          name: dshm
        - name: gcs-fuse-csi-ephemeral
          mountPath: /modeldata
          readOnly: true
      volumes:
      - name: dshm
        emptyDir:
          medium: Memory
      - name: gcs-fuse-csi-ephemeral
        csi:
          driver: gcsfuse.csi.storage.gke.io
          volumeAttributes:
            bucketName: BUCKET_DATA_NAME
            mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
            fileCacheCapacity: "20Gi"
            fileCacheForRangeRead: "true"
            metadataStatCacheCapacity: "-1"
            metadataTypeCacheCapacity: "-1"
            metadataCacheTTLSeconds: "-1"
      nodeSelector:
        cloud.google.com/gke-accelerator: nvidia-l4

inference-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: llm-service
  namespace: airflow
spec:
  selector:
    app: gemma-server
  type: LoadBalancer
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

3 Google Cloud Storage (GCS) paketi oluşturma

gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}

# Create the namespace in GKE
kubectl apply -f namespace.yaml

# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml

Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation

gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"

7. 4. adım: Helm grafiği aracılığıyla GKE'ye Airflow'u yükleyin

Artık Helm kullanarak Airflow 2'yi dağıtıyoruz. Apache Airflow, veri mühendisliği ardışık düzenleri için açık kaynaklı bir iş akışı yönetim platformudur. Airflow 2'nin özelliklerini daha sonra ayrıntılı olarak ele alacağız.

Airflow Helm grafiği için values.yaml

config:
  webserver:
    expose_config: true
webserver:
  service:
    type: LoadBalancer
  podAnnotations:
    gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
  - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
    value: "30"
logs:
  persistence:
    enabled: true
    existingClaim: "airflow-logs"
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
scheduler:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
triggerer:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
workers:
  podAnnotations:
    gke-gcsfuse/volumes: "true"

Airflow 2'yi dağıtma

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml

8. 5. adım: Airflow'u bağlantılar ve değişkenlerle başlatın

Airflow 2 dağıtıldıktan sonra yapılandırmaya başlayabiliriz. Python komut dosyalarımız tarafından okunan bazı değişkenler tanımlarız.

  1. Tarayıcınızla 8080 numaralı bağlantı noktasında Airflow kullanıcı arayüzüne erişin.

Harici IP'yi alma

kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'

Bir web tarayıcısı açın ve http://<EXTERNAL-IP>:8080 adresine gidin . Giriş, yönetici / yönetici şeklindedir.

  1. Airflow kullanıcı arayüzünde varsayılan bir GCP bağlantısı oluşturun. Bunun için Yönetici → Bağlantılar → + Yeni kayıt ekle'ye gidin.
  • Bağlantı Kimliği: google_cloud_default
  • Bağlantı Türü: Google Cloud

Kaydet'i tıklayın.

  1. Gerekli değişkenleri oluşturmak için Yönetici → Değişkenler → + Yeni bir kayıt ekle'ye gidin.
  • Anahtar: BUCKET_DATA_NAME - Değer: Copy from echo $BUCKET_DATA_NAME
  • Anahtar: GCP_PROJECT_ID - Değer: echo $DEVSHELL_PROJECT_ID komutundan kopyalayın.
  • Anahtar: HF_TOKEN - Değer: HF jetonunuzu ekleyin
  • Anahtar: KAGGLE_USERNAME - Değer: Kaggle kullanıcı adınızı girin
  • Anahtar: KAGGLE_KEY - Değer: Bunu kaggle.json dosyasından kopyalayın.

Her anahtar/değer çiftinden sonra Kaydet'i tıklayın.

Kullanıcı arayüzünüz aşağıdaki gibi görünmelidir:

771121470131b5ec.png

9. 1. uygulama kodu kapsayıcısı - Veri indirme

Bu Python komut dosyasında, veri kümesini GCS paketimize indirmek için Kaggle ile kimlik doğrulaması yapıyoruz.

Bu, DAG Birimi #1 olduğundan ve veri kümesinin sık sık güncellenmesini beklediğimizden, komut dosyasının kendisi container mimarisine alınmıştır. Bu nedenle, bu işlemi otomatikleştirmek istiyoruz.

Dizin oluşturma ve komut dosyalarımızı buraya kopyalama

cd .. ; mkdir 1-dataset-download
cd 1-dataset-download

dataset-download.py

import os
import kagglehub
from google.cloud import storage

KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")

print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"

upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]

requirements.txt

google-cloud-storage==2.19.0
kagglehub==0.3.4

Şimdi dataset-download için bir kapsayıcı görüntüsü oluşturup Artifact Registry'ye gönderiyoruz.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest

10. 2. uygulama kodu kapsayıcısı - Veri hazırlama

Veri hazırlama adımında şunları yaparız:

  1. Temel modelimizi ince ayar yapmak için veri kümesinin ne kadarını kullanmak istediğimizi belirtin.
  2. Veri kümesini yükler.Diğer bir deyişle, satırlar ve sütunlar için 2 boyutlu bir veri yapısı olan Pandas DataFrame'e CSV dosyasını okur.
  3. Veri dönüştürme / ön işleme: Saklamak istediklerimizi belirterek veri kümesinin hangi bölümlerinin alakasız olduğunu belirleyin. Bu işlem, geri kalan bölümleri kaldırmakla aynı etkiyi yaratır.
  4. DataFrame'in her satırına transform işlevini uygular.
  5. Hazırlanan verileri GCS paketine geri kaydetme

Dizin oluşturma ve komut dosyalarımızı buraya kopyalama

cd .. ; mkdir 2-data-preparation
cd 2-data-preparation

data-preparation.py

import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset

# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100"))  # Process a limited number of rows, used 100 during testing phase but can be increased

DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"

# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")

def transform(data):
    """
    Transforms a row of the DataFrame into the desired format for fine-tuning.

    Args:
      data: A pandas Series representing a row of the DataFrame.

    Returns:
      A dictionary containing the formatted text.
    """ 
    question = f"Review analysis for movie '{data['id']}'"
    context = data['reviewText']
    answer = data['scoreSentiment']
    template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
    return {'text': template.format(question=question, context=context, answer=answer)}

try:
    df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
    print(f"Dataset loaded successfully.")

    # Drop rows with NaN values in relevant columns
    df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])

    # Apply transformation to the DataFrame
    transformed_data = df.apply(transform, axis=1).tolist()

    # Convert transformed data to a DataFrame and then to a Hugging Face Dataset
    transformed_df = pd.DataFrame(transformed_data)
    dataset = Dataset.from_pandas(transformed_df)

    # Save the prepared dataset to JSON lines format
    with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
        for item in dataset:
            f.write(json.dumps(item) + "\n")

    print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
    
except Exception as e:
    print(f"Error during data loading or preprocessing: {e}")
    import traceback
    print(traceback.format_exc())

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]

requirements.txt

datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3

# Now we create a container images for data-preparation and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest

11. 3. uygulama kodu kapsayıcısı - İnce ayar

Burada, temel model olarak Gemma-2-9b-it'yi kullanıp yeni veri kümemizle ince ayarını yapıyoruz.

İnce ayar adımında gerçekleşen adımlar şunlardır:

1. Kurulum: Kitaplıkları içe aktarın, parametreleri (model, veri ve eğitim için) tanımlayın ve veri kümesini Google Cloud Storage'dan yükleyin.

2. Modeli Yükle: Verimlilik için nicemleme ile önceden eğitilmiş bir dil modelini ve ilgili belirteçleştiriciyi yükleyin.

3. LoRA'yı yapılandırın: Eğitilebilir küçük matrisler ekleyerek modeli verimli bir şekilde ince ayarlamak için Low-Rank Adaptation (LoRA) yöntemini ayarlayın.

4. Eğitme: Eğitim parametrelerini tanımlayın ve FP16 nicelendirme türünü kullanarak modeli yüklenen veri kümesinde hassaslaştırmak için SFTTrainer simgesini kullanın.

5. Kaydetme ve Yükleme: İnce ayarlı modeli ve belirteç oluşturucuyu yerel olarak kaydedin, ardından bunları GCS paketimize yükleyin.

Ardından Cloud Build'i kullanarak bir container görüntüsü oluşturur ve bunu Artifact Registry'de depolarız.

Dizin oluşturma ve komut dosyalarımızı buraya kopyalama

cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning

finetuning.py

import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage

# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")

# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
    "json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)

################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))

################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50

################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False

# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
    attn_implementation="eager",
    pretrained_model_name_or_path=model_name,
    torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")

# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
    lora_alpha=lora_alpha,
    lora_dropout=lora_dropout,
    r=lora_r,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
)

# Set training parameters
training_arguments = SFTConfig(
        bf16=bf16,
        dataset_kwargs={
            "add_special_tokens": False,  
            "append_concat_token": False, 
        },
        dataset_text_field="text",
        disable_tqdm=True,
        fp16=fp16,
        gradient_accumulation_steps=gradient_accumulation_steps,
        gradient_checkpointing=gradient_checkpointing,
        gradient_checkpointing_kwargs={"use_reentrant": False},
        group_by_length=group_by_length,
        log_on_each_node=False,
        logging_steps=logging_steps,
        learning_rate=learning_rate,
        lr_scheduler_type=lr_scheduler_type,
        max_grad_norm=max_grad_norm,
        max_seq_length=max_seq_length,
        max_steps=max_steps,
        num_train_epochs=num_train_epochs,
        optim=optim,
        output_dir=save_model_path,
        packing=packing,
        per_device_train_batch_size=per_device_train_batch_size,
        save_strategy=save_strategy,
        save_steps=save_steps,
        save_total_limit=save_total_limit,
        warmup_ratio=warmup_ratio,
        weight_decay=weight_decay,
    )

print(f"Configuring fine tuning completed")

# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset,
    peft_config=peft_config,
    dataset_text_field="text",
    max_seq_length=max_seq_length,
    tokenizer=tokenizer,
    args=training_arguments,
    packing=packing,
)

print(f"Creating trainer completed")

# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")

# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
    low_cpu_mem_usage=True,
    pretrained_model_name_or_path=model_name,
    return_dict=True,
    torch_dtype=torch.float16,
)

model = PeftModel.from_pretrained(
    model=base_model,
    model_id=new_model,
)
model = model.merge_and_unload()

print(f"Merging the new model with base model completed")

accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")

print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
    is_main_process=accelerator.is_main_process,
    save_directory=save_model_path,
    save_function=accelerator.save,
)
print(f"Save unwrapped model completed")

print(f"Save new tokenizer started")
if accelerator.is_main_process:
    tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")

# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
    """Uploads a directory to GCS."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    for root, _, files in os.walk(model_dir):
        for file in files:
            local_file_path = os.path.join(root, file)
            gcs_file_path = os.path.relpath(local_file_path, model_dir)
            blob = bucket.blob(os.path.join(new_model, gcs_file_path))  # Use new_model_name
            blob.upload_from_filename(local_file_path)

# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")

Dockerfile

# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04

# Install necessary system packages
RUN apt-get update && \
    apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements.txt into the container
COPY requirements.txt .

# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Copy your finetune script into the container
COPY finetuning.py .

# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
 
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]

requirements.txt

accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4

Şimdi ince ayar için bir container görüntüsü oluşturup Artifact Registry'ye aktarıyoruz.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest

12. Airflow 2'ye Genel Bakış - DAG Nedir?

Airflow, iş akışlarını ve veri ardışık düzenlerini düzenlemek için kullanılan bir platformdur. Bu iş akışlarını Python kodunda tanımlamak için DAG'ler (Yönlü Düz Ağaçlar) kullanılır. Bu sayede görevler ve bağımlılıkları görsel olarak temsil edilir.

Airflow, statik DAG'leri ve Python tabanlı tanımlarıyla önceden tanımlanmış iş akışlarını planlamak ve yönetmek için uygundur. Mimarisinde, bu iş akışlarını izlemek ve yönetmek için kullanıcı dostu bir kullanıcı arayüzü bulunur.

Airflow, Python kullanarak veri ardışık düzenlerinizi tanımlamanıza, planlamanıza ve izlemenize olanak tanır. Bu nedenle, iş akışı düzenleme için esnek ve güçlü bir araçtır.

13. DAG'mize genel bakış

ec49964ad7d61491.png

DAG, yönlü düz ağaç anlamına gelir. Airflow'da DAG'nin kendisi iş akışının veya ardışık düzenin tamamını temsil eder. Görevleri, bağımlılıklarını ve yürütülme sırasını tanımlar.

DAG içindeki iş akışı birimleri, Airflow yapılandırmasından başlatılan GKE kümesindeki bir pod'dan yürütülür.

Özet:

Airflow: Veri indirme - Bu komut dosyası, Kaggle'dan film incelemesi veri kümesi alma ve bunu GCS paketinize kaydetme sürecini otomatikleştirerek bulut ortamınızda daha fazla işleme veya analiz için hazır hale getirir.

Airflow: Veri Hazırlama - Kod, ham film incelemesi veri kümesini alır, kullanım alanımız için gerekli olmayan gereksiz veri sütunlarını kaldırır ve eksik değerlere sahip veri kümelerini siler. Ardından, veri kümesini makine öğrenimine uygun bir soru-cevap biçiminde yapılandırır ve daha sonra kullanılmak üzere tekrar GCS'de depolar.

Airflow: Model İnce Ayarı: Bu kod, LoRA (Low-Rank Adaptation) adı verilen bir teknik kullanarak büyük bir dil modeline (LLM) ince ayar yapar ve ardından güncellenen modeli kaydeder. Önceden eğitilmiş büyük dil modeli ve veri kümesi Google Cloud Storage'dan yüklenerek başlar. Ardından, modeli bu veri kümesinde verimli bir şekilde ince ayarlamak için LoRA'yı uygular. Son olarak, ince ayarlı modeli metin oluşturma veya soru yanıtlama gibi uygulamalarda daha sonra kullanılmak üzere Google Cloud Storage'a geri kaydeder.

Airflow: Model Serving - Çıkarım için vllm ile GKE'de ince ayarlı modeli sunma.

Airflow: Geri bildirim döngüsü - Modelin her xx sürede bir (saatlik, günlük, haftalık) yeniden eğitilmesi.

Bu şemada, GKE'de çalıştırıldığında Airflow 2'nin işleyiş şekli açıklanmaktadır.

8691f41166209a5d.png

14. Modele ince ayar yapma ve RAG kullanma

Bu CodeLab'de, Almayla Artırılmış Üretim (RAG) yerine bir LLM'ye ince ayar yapılır.

Bu iki yaklaşımı karşılaştıralım:

İnce ayar: İnce ayar, LLM'yi belirli bir göreve veya veri kümesine uyarlayarak harici veri kaynaklarına bağlı kalmadan bağımsız olarak çalışmasına olanak tanır.

Çıkarımı basitleştirir: Ayrı bir alma sistemi ve veritabanı ihtiyacını ortadan kaldırarak özellikle sık kullanılan kullanım alanlarında daha hızlı ve daha ucuz yanıtlar sağlar.

RAG: Harici bilgilere dayanır: RAG, her istek için bilgi tabanından alakalı bilgileri alır. Böylece güncel ve belirli verilere erişim sağlanır.

Karmaşıklığı artırır: RAG'yi Kubernetes kümesi gibi bir üretim ortamında uygulamak genellikle veri işleme ve alma için birden fazla mikro hizmet içerir. Bu durum, gecikmeyi ve hesaplama maliyetlerini artırabilir.

Neden ince ayar seçildi?

RAG, bu CodeLab'de kullanılan küçük veri kümesi için uygun olsa da Airflow'un tipik bir kullanım alanını göstermek amacıyla ince ayar yapmayı tercih ettik. Bu seçim, RAG için ek altyapı ve mikro hizmetler oluşturmanın ayrıntılarına girmek yerine iş akışı düzenleme yönlerine odaklanmamızı sağlar.

Sonuç:

Hem ince ayar hem de RAG, kendi güçlü ve zayıf yönleri olan değerli tekniklerdir. En uygun seçim, projenizin özel gereksinimlerine (ör. verilerinizin boyutu ve karmaşıklığı, performans ihtiyaçları ve maliyet hususları) bağlıdır.

15. DAG Görevi #1 - Airflow'da ilk adımınızı oluşturun: Veri indirme

Bu DAG birimine genel bir bakış olarak, kapsayıcı resminde barındırılan Python kodumuz Kaggle'dan en son RottenTomatoes veri kümesini indirir.

Bu kodu GCS paketine kopyalamayın. Son adım olarak, tüm DAG birimi adımlarını tek bir Python komut dosyası içinde içeren mlops-dag.py dosyasını kopyalarız.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        dataset_download

16. DAG Görevi #2 - Airflow'da ikinci adımınızı oluşturun: Veri Hazırlama

Bu DAG birimine genel bakış olarak, GCS'den bir CSV dosyası (rotten_tomatoes_movie_reviews.csv) yükleyip Pandas DataFrame'e aktarıyoruz.

Ardından, test ve kaynak verimliliği için DATASET_LIMIT kullanarak işlenen satır sayısını sınırlıyoruz ve son olarak dönüştürülen verileri Hugging Face veri kümesine dönüştürüyoruz.

Dikkatli bakarsanız modelde "DATASET_LIMIT": "1000" ile 1.000 satır eğittiğimizi görürsünüz. Bunun nedeni, bu işlemin Nvidia L4 GPU'da 20 dakika sürmesidir.

Bu kodu GCS paketine kopyalamayın. Son adımda, tüm adımları tek bir Python komut dosyası içinde içeren mlops-dag.py dosyasını kopyalıyoruz.

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation

17. DAG Görevi #3: Airflow'da üçüncü adımınızı oluşturun: Model İnce Ayarı

Bu DAG birimine genel bir bakış olarak, burada Gemma modelini yeni veri kümemizle hassaslaştırmak için finetune.py dosyasını yürütüyoruz.

Bu kodu GCS paketine kopyalamayın. Son adımda, tüm adımları tek bir Python komut dosyası içinde içeren mlops-dag.py dosyasını kopyalıyoruz.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Task 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Task 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Task 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation >> fine_tuning

18. DAG Görevi #4: Airflow'da son adımınızı oluşturun: Modelin çıkarımı / sunulması

vLLM, LLM'lerin yüksek performanslı çıkarımı için özel olarak tasarlanmış güçlü bir açık kaynaklı kitaplıktır. Google Kubernetes Engine (GKE) üzerinde dağıtıldığında, LLM'leri etkili bir şekilde sunmak için Kubernetes'in ölçeklenebilirliğinden ve verimliliğinden yararlanır.

Adımların özeti:

  • "mlops-dag.py" adlı DAG'yi GCS paketine yükleyin.
  • Çıkarım ayarlamak için iki Kubernetes YAML yapılandırma dosyasını bir GCS paketine kopyalayın.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

def model_serving():
    config.load_incluster_config()
    k8s_apps_v1 = client.AppsV1Api()
    k8s_core_v1 = client.CoreV1Api()

    while True:
        try:
            k8s_apps_v1.delete_namespaced_deployment(
                    namespace="airflow",
                    name="inference-deployment",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Deployment inference-deployment deleted")
    
    with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_apps_v1.create_namespaced_deployment(
            body=dep, namespace="airflow")
        print(f"Deployment created. Status='{resp.metadata.name}'")
    
    while True:
        try:
            k8s_core_v1.delete_namespaced_service(
                    namespace="airflow",
                    name="llm-service",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Service llm-service deleted")

    with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_core_v1.create_namespaced_service(
            body=dep, namespace="airflow")
        print(f"Service created. Status='{resp.metadata.name}'")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 4: Run GKE Deployment for model serving
        model_serving = PythonOperator(
            task_id="model_serving",
            python_callable=model_serving
        )

        dataset_download >> data_preparation >> fine_tuning >> model_serving

Python komut dosyanızı (DAG dosyası) ve Kubernetes manifestlerini DAGS GCS paketine yükleyin.

gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}

Airflow kullanıcı arayüzünüzde mlops-dag'ı görürsünüz.

  1. Devam ettir'i seçin.
  2. Manuel bir MLOps döngüsü gerçekleştirmek için DAG'yi Tetikle'yi seçin.

d537281b92d5e8bb.png

DAG'niz tamamlandığında Airflow kullanıcı arayüzünde aşağıdaki gibi bir çıkış görürsünüz.

3ed42abf8987384e.png

Son adımdan sonra model uç noktasını alıp modeli test etmek için bir istem gönderebilirsiniz.

Modelin çıkarımının başlayabilmesi ve yük dengeleyicinin harici IP adresi atayabilmesi için curl komutunu vermeden önce yaklaşık 5 dakika bekleyin.

export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')

curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
    "prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
    "temperature": 0.1,
    "top_p": 1.0,
    "max_tokens": 128
}
EOF

Çıkış:

19. Tebrikler!

GKE'de Airflow 2 ile DAG işlem hattı kullanarak ilk yapay zeka iş akışınızı oluşturdunuz.

Dağıttığınız kaynakların provizyonunu kaldırmayı unutmayın.

20. Üretimde bu işlemi yapma

CodeLab, GKE'de Airflow 2'yi nasıl kuracağınız konusunda size harika bir fikir verse de gerçek dünyada bunu üretimde yaparken aşağıdaki konulardan bazılarını göz önünde bulundurmanız gerekir.

Gradio veya benzer araçları kullanarak bir web ön ucu uygulayın.

GKE ile iş yükleri için otomatik uygulama izlemeyi buradan yapılandırın veya Airflow'dan metrikleri buradan dışa aktarın.

Özellikle daha büyük veri kümeleriniz varsa modeli daha hızlı ince ayarlamak için daha büyük GPU'lara ihtiyacınız olabilir. Ancak modeli birden fazla GPU'da eğitmek istiyorsak veri kümesini bölmemiz ve eğitimi parçalamamız gerekir. FSDP with PyTorch (fully sharded data parallel) ile ilgili açıklamayı burada bulabilirsiniz. Bu amaçla GPU paylaşımı kullanılır. Daha fazla bilgiyi Meta'nın blog yayınında ve Pytorch kullanarak FSDP ile ilgili bu eğitimde bulabilirsiniz.

Google Cloud Composer, yönetilen bir Airflow hizmetidir. Bu nedenle, Airflow'u kendiniz korumanız gerekmez. Yalnızca DAG'nizi dağıtmanız yeterlidir.

Daha fazla bilgi

Lisans

Bu çalışma, Creative Commons Attribution 2.0 Genel Amaçlı Lisans ile lisans altına alınmıştır.