פיתוח תהליכי עבודה של MLOps באמצעות Airflow 2 ב-GKE

1. סקירה כללית

852dc8844309ffb8.png

בשיעור הזה תלמדו איך לשלב שיטות עבודה מומלצות של DevOps בלמידת מכונה (MLOps) על ידי הורדה של מערך נתונים, שיפור מודל ופריסת מודל LLM ב-Google Kubernetes Engine ‏ (GKE) באמצעות Airflow DAG עם מינימום הפשטה. לכן אנחנו משתמשים בפקודות gcloud ולא ב-terraform, כדי שתוכלו לעקוב אחרי השלבים במעבדה ולהבין בקלות כל תהליך מנקודת המבט של מהנדס הפלטפורמה ומהנדס למידת המכונה.

במדריך המעשי הזה נסביר איך להשתמש ב-Airflow כדי לייעל את תהליכי העבודה של ה-AI. נגדיר DAG כדי להדגים בצורה ברורה ופרקטית את כל מחזור החיים של MLOps.

מה תלמדו

  • פירוק של סילו נתונים ושיפור תהליכי העבודה כדי לשפר את שיתוף הפעולה וההבנה בין מהנדסי פלטפורמה ומהנדסי למידת מכונה
  • הסבר על פריסה, שימוש וניהול של Airflow 2 ב-GKE
  • הגדרת Airflow DAG מקצה לקצה
  • פיתוח בסיס למערכות למידת מכונה ברמת ייצור באמצעות GKE
  • הטמעה ותפעול של מערכות למידת מכונה
  • הסבר על האופן שבו הנדסת פלטפורמות הפכה לעמוד תמיכה קריטי ל-MLOps

מה אפשר להשיג באמצעות ה-CodeLab הזה

  • אתם יכולים לשאול שאלות על סרטים מ-LLM שעבר כוונון עדין על סמך Gemma-2-9b-it, שמופעל ב-GKE עם vLLM.

קהל היעד

  • מהנדסי למידת מכונה
  • מהנדסי פלטפורמה
  • מדעני נתונים
  • מהנדסי נתונים
  • מהנדסי DevOps
  • Platform Architect
  • Customer Engineers

ה-CodeLab הזה לא מיועד

  • כמבוא ל-GKE או לתהליכי עבודה של AI/ML
  • כסקירה של כל מערך התכונות של Airflow

2. הנדסת פלטפורמות מסייעת למהנדסי למידת מכונה ולמדענים

16635a8284b994c.png

הנדסת פלטפורמות ו-MLOps הם תחומים שקשורים זה לזה, והם פועלים יחד כדי ליצור סביבה חזקה ויעילה לפיתוח ולפריסה של ML.

היקף: הנדסת פלטפורמות היא תחום רחב יותר מ-MLOps, והיא כוללת את כל מחזור החיים של פיתוח התוכנה ומספקת את הכלים והתשתית לכך.

MLOps מגשר על הפער בין פיתוח, פריסה והסקת מסקנות של למידת מכונה.

מומחיות: מהנדסי פלטפורמה בדרך כלל מתמחים בטכנולוגיות תשתית כמו מחשוב ענן, קונטיינריזציה וניהול נתונים.

מהנדסי MLOps מתמחים בפיתוח, בפריסה ובמעקב של מודלים של למידת מכונה, ולרוב יש להם כישורים בתחום מדע הנתונים והנדסת התוכנה.

כלים: מהנדסי פלטפורמה יוצרים כלים להקצאת משאבים לתשתית, לניהול הגדרות, לתיזמור קונטיינרים ולפיגום אפליקציות. מהנדסי MLOps משתמשים בכלים לאימון, לניסוי, לפריסה, למעקב ולניהול גרסאות של מודלים של למידת מכונה.

3. הגדרה ודרישות של Google Cloud

הגדרת סביבה בקצב אישי

  1. נכנסים ל-מסוף Google Cloud ויוצרים פרויקט חדש או משתמשים בפרויקט קיים. אם עדיין אין לכם חשבון Gmail או Google Workspace, אתם צריכים ליצור חשבון.

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • שם הפרויקט הוא השם המוצג של הפרויקט הזה למשתתפים. זו מחרוזת תווים שלא נמצאת בשימוש ב-Google APIs. תמיד אפשר לעדכן את המיקום.
  • מזהה הפרויקט הוא ייחודי לכל הפרויקטים ב-Google Cloud ואי אפשר לשנות אותו אחרי שהוא מוגדר. מסוף Cloud יוצר באופן אוטומטי מחרוזת ייחודית, ובדרך כלל לא צריך לדעת מה היא. ברוב ה-Codelabs, תצטרכו להפנות למזהה הפרויקט (בדרך כלל מסומן כ-PROJECT_ID). אם אתם לא אוהבים את המזהה שנוצר, אתם יכולים ליצור מזהה אקראי אחר. אפשר גם לנסות שם משתמש משלכם ולבדוק אם הוא זמין. אי אפשר לשנות את ההגדרה הזו אחרי השלב הזה, והיא תישאר לאורך הפרויקט.
  • לידיעתכם, יש ערך שלישי, מספר פרויקט, שחלק מממשקי ה-API משתמשים בו. במאמרי העזרה מפורט מידע נוסף על שלושת הערכים האלה.
  1. בשלב הבא, תצטרכו להפעיל את החיוב במסוף Cloud כדי להשתמש במשאבי Cloud או בממשקי API של Cloud. השלמת ה-codelab הזה לא תעלה לכם הרבה, אם בכלל. כדי להשבית את המשאבים ולמנוע חיובים נוספים אחרי שתסיימו את המדריך הזה, תוכלו למחוק את המשאבים שיצרתם או למחוק את הפרויקט. משתמשים חדשים ב-Google Cloud זכאים לתוכנית תקופת ניסיון בחינם בשווי 300$.

מפעילים את Cloud Shell

אפשר להפעיל את Google Cloud מרחוק מהמחשב הנייד, אבל ב-codelab הזה תשתמשו ב-Cloud Shell, סביבת שורת פקודה שפועלת בענן.

הפעלת Cloud Shell

  1. ב-Cloud Console, לוחצים על Activate Cloud Shell 853e55310c205094.png.

3c1dabeca90e44e5.png

אם זו הפעם הראשונה שאתם מפעילים את Cloud Shell, יוצג לכם מסך ביניים עם תיאור של השירות. אם הוצג לכם מסך ביניים, לחצו על המשך.

9c92662c6a846a5c.png

הקצאת המשאבים והחיבור ל-Cloud Shell נמשכים רק כמה רגעים.

9f0e51b578fecce5.png

המכונה הווירטואלית הזו כוללת את כל הכלים הדרושים למפתחים. יש בה ספריית בית בנפח מתמיד של 5GB והיא פועלת ב-Google Cloud, מה שמשפר מאוד את הביצועים והאימות ברשת. אפשר לבצע את רוב העבודה ב-codelab הזה, אם לא את כולה, באמצעות דפדפן.

אחרי שמתחברים ל-Cloud Shell, אמור להופיע אימות ושהפרויקט מוגדר לפי מזהה הפרויקט.

  1. מריצים את הפקודה הבאה ב-Cloud Shell כדי לוודא שעברתם אימות:
gcloud auth list

פלט הפקודה

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. מריצים את הפקודה הבאה ב-Cloud Shell כדי לוודא שפקודת gcloud מכירה את הפרויקט:
gcloud config list project

פלט הפקודה

[core]
project = <PROJECT_ID>

אם לא, אפשר להגדיר אותו באמצעות הפקודה הבאה:

gcloud config set project <PROJECT_ID>

פלט הפקודה

Updated property [core/project].

4. שלב 1: הרשמה ואימות ב-Kaggle

כדי להתחיל את ה-CodeLab, צריך ליצור חשבון ב-Kaggle, פלטפורמת קהילה אונליין למדעני נתונים ולחובבי למידת מכונה. הפלטפורמה נמצאת בבעלות Google ומארחת מאגר עצום של קבוצות נתונים שזמינות לציבור בתחומים שונים. מהאתר הזה תורידו את מערך הנתונים של RottenTomatoes, שמשמש לאימון המודל.

  • נרשמים ל-Kaggle. אפשר להשתמש ב-SSO של Google כדי להיכנס
  • אישור התנאים וההגבלות
  • עוברים להגדרות ומקבלים את שם המשתמש username
  • בקטע API, בוחרים באפשרות 'יצירת אסימון חדש מתוך' Kaggle, והקובץ kaggle.json יורד
  • אם נתקלת בבעיות, אפשר לעבור לדף התמיכה כאן

5. שלב 2 – הרשמה ואימות ב-HuggingFace

‫HuggingFace הוא מיקום מרכזי שבו כל אחד יכול להשתמש בטכנולוגיית למידת מכונה. הוא מארח 900,000 מודלים, 200,000 מערכי נתונים ו-300,000 אפליקציות הדגמה (Spaces), כולם קוד פתוח וזמינים לציבור.

  • נרשמים ל-HuggingFace – יוצרים חשבון עם שם משתמש. אי אפשר להשתמש ב-SSO של Google
  • אישור כתובת האימייל
  • עוברים לדף הזה ומאשרים את הרישיון של מודל Gemma-2-9b-it.
  • יוצרים טוקן HuggingFace כאן
  • חשוב לרשום את פרטי הכניסה של האסימון, כי תצטרכו אותם בהמשך.

6. שלב 3 – יצירת משאבי התשתית הנדרשים ב-Google Cloud

תגדירו את GKE,‏ GCE ו-Artifact Registry ותקצו תפקידי IAM באמצעות איחוד שירותי אימות הזהות של עומסי עבודה.

תהליך העבודה של ה-AI משתמש בשני מאגרי צמתים, אחד לאימון ואחד להסקת מסקנות. מאגר הצמתים לאימון משתמש ב-VM של GCE מסוג g2-standard-8, שמצויד ב-GPU אחד של Nvidia L4 Tensor Core. מאגר הצמתים של ההסקה משתמש במכונה וירטואלית מסוג g2-standard-24 שמצוידת בשתי יחידות GPU מסוג Nvidia L4 Tensor Core. כשמציינים את האזור, בוחרים אזור שבו יש תמיכה ב-GPU הנדרש ( קישור).

ב-Cloud Shell, מריצים את הפקודות הבאות:

# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")

SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1

# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto

# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount

# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com

# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
  --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-12 \
  --num-nodes=1

# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
  --accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-24 \
  --num-nodes=1

# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a

# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}

יצירת קובצי מניפסט של YAML

mkdir manifests
cd manifests

mlops-sa.yaml

apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
  name: airflow-mlops-sa
  namespace: airflow
  labels:
    tier: airflow

pv-dags.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-dags
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 5Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_DAGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pv-logs.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-logs
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 100Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_LOGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pvc-dags.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-dags
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  volumeName: airflow-dags
  storageClassName: standard

pvc-logs.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  volumeName: airflow-logs
  storageClassName: standard

namespace.yaml

kind: Namespace
apiVersion: v1
metadata:
  name: airflow
  labels:
    name: airflow

sa-role.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-deployment-role
rules:
- apiGroups: ["apps"] 
  resources: ["deployments"]
  verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]

sa-rolebinding.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow-deployment-rolebinding
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: airflow-worker
  namespace: airflow
roleRef:
  kind: Role
  name: airflow-deployment-role
  apiGroup: rbac.authorization.k8s.io

inference.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-deployment
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gemma-server
  template:
    metadata:
      labels:
        app: gemma-server
        ai.gke.io/model: gemma-2-9b-it
        ai.gke.io/inference-server: vllm
      annotations:
        gke-gcsfuse/volumes: "true"
    spec:
      serviceAccountName: airflow-mlops-sa
      tolerations:
      - key: "nvidia.com/gpu"
        operator: "Exists"
        effect: "NoSchedule"
      - key: "on-demand"
        value: "true"
        operator: "Equal"
        effect: "NoSchedule"
      containers:
      - name: inference-server
        image: vllm/vllm-openai:v0.6.6
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: "2"
          limits:
            nvidia.com/gpu: "2"
        command: ["/bin/sh", "-c"]
        args:
        - |
          python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
        volumeMounts:
        - mountPath: /dev/shm
          name: dshm
        - name: gcs-fuse-csi-ephemeral
          mountPath: /modeldata
          readOnly: true
      volumes:
      - name: dshm
        emptyDir:
          medium: Memory
      - name: gcs-fuse-csi-ephemeral
        csi:
          driver: gcsfuse.csi.storage.gke.io
          volumeAttributes:
            bucketName: BUCKET_DATA_NAME
            mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
            fileCacheCapacity: "20Gi"
            fileCacheForRangeRead: "true"
            metadataStatCacheCapacity: "-1"
            metadataTypeCacheCapacity: "-1"
            metadataCacheTTLSeconds: "-1"
      nodeSelector:
        cloud.google.com/gke-accelerator: nvidia-l4

inference-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: llm-service
  namespace: airflow
spec:
  selector:
    app: gemma-server
  type: LoadBalancer
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

יוצרים 3 קטגוריות של Google Cloud Storage‏ (GCS)

gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}

# Create the namespace in GKE
kubectl apply -f namespace.yaml

# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml

Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation

gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"

7. שלב 4 – התקנת Airflow ב-GKE באמצעות תרשים Helm

עכשיו נבצע פריסה של Airflow 2 באמצעות Helm. ‫Apache Airflow היא פלטפורמה לניהול תהליכי עבודה של צינורות נתונים, שזמינה בקוד פתוח. בהמשך נסביר על מערך התכונות של Airflow 2.

‫values.yaml לתרשים Airflow helm

config:
  webserver:
    expose_config: true
webserver:
  service:
    type: LoadBalancer
  podAnnotations:
    gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
  - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
    value: "30"
logs:
  persistence:
    enabled: true
    existingClaim: "airflow-logs"
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
scheduler:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
triggerer:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
workers:
  podAnnotations:
    gke-gcsfuse/volumes: "true"

פריסת Airflow 2

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml

8. שלב 5 – הפעלת Airflow עם Connections ו-Variables

אחרי שפורסים את Airflow 2, אפשר להתחיל להגדיר אותו. אנחנו מגדירים כמה משתנים, שנקראים על ידי סקריפטים של Python.

  1. גישה לממשק המשתמש של Airflow ביציאה 8080 באמצעות הדפדפן

קבלת כתובת ה-IP החיצונית

kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'

פותחים דפדפן אינטרנט ועוברים לכתובת http://<EXTERNAL-IP>:8080 . הכניסה היא אדמין / אדמין

  1. יוצרים חיבור GCP שמוגדר כברירת מחדל בממשק המשתמש של Airflow. לשם כך, עוברים אל Admin (ניהול) → Connections (חיבורים) → + Add a new record (הוספת רשומה חדשה).
  • מזהה החיבור: google_cloud_default
  • סוג החיבור: Google Cloud

לוחצים על 'שמירה'.

  1. יוצרים את המשתנים הנדרשים. לשם כך, עוברים אל Admin (אדמין) ← Variables (משתנים) ← + Add a new record (הוספת רשומה חדשה).
  • מפתח: BUCKET_DATA_NAME – ערך: מעתיקים מ-echo $BUCKET_DATA_NAME
  • מפתח: GCP_PROJECT_ID – ערך: מעתיקים מ-echo $DEVSHELL_PROJECT_ID
  • מפתח: HF_TOKEN – ערך: מזינים את טוקן ה-HF
  • מפתח: KAGGLE_USERNAME – ערך: צריך להזין את שם המשתמש ב-Kaggle
  • מפתח: KAGGLE_KEY – ערך: מעתיקים את הערך הזה מקובץ kaggle.json

אחרי כל צמד מפתח/ערך, לוחצים על Save (שמירה).

ממשק המשתמש אמור להיראות כך:

771121470131b5ec.png

9. מאגר קוד האפליקציה מספר 1 – הורדת נתונים

בסקריפט Python הזה, אנחנו מבצעים אימות ב-Kaggle כדי להוריד את מערך הנתונים לקטגוריית GCS שלנו.

הסקריפט עצמו מועבר לקונטיינר כי הוא הופך ליחידת DAG מספר 1, ואנחנו רוצים שהנתונים יתעדכנו בתדירות גבוהה, ולכן אנחנו רוצים להפוך את התהליך הזה לאוטומטי.

יצירת ספרייה והעתקת הסקריפטים שלנו אליה

cd .. ; mkdir 1-dataset-download
cd 1-dataset-download

dataset-download.py

import os
import kagglehub
from google.cloud import storage

KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")

print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"

upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)

קובץ Docker

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]

requirements.txt

google-cloud-storage==2.19.0
kagglehub==0.3.4

עכשיו יוצרים קובץ אימג' של קונטיינר להורדת מערך הנתונים ומעבירים אותו בדחיפה ל-Artifact Registry.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest

10. מאגר נתונים של קוד אפליקציה מספר 2 – הכנת נתונים

במהלך שלב הכנת הנתונים, אנחנו מבצעים את הפעולות הבאות:

  1. מציינים כמה נתונים מתוך מערך הנתונים רוצים להשתמש כדי לבצע כוונון עדין של המודל הבסיסי
  2. טוען את מערך הנתונים, כלומר קורא את קובץ ה-CSV לתוך Pandas dataframe, שהוא מבנה נתונים דו-ממדי לשורות ולעמודות
  3. טרנספורמציה של נתונים / עיבוד מקדים – קובעים אילו חלקים במערך הנתונים לא רלוונטיים על ידי ציון מה רוצים לשמור, ובפועל מסירים את השאר
  4. הפונקציה transform מוחלת על כל שורה ב-DataFrame
  5. שמירת הנתונים המוכנים בחזרה בקטגוריה של GCS

יצירת ספרייה והעתקת הסקריפטים שלנו אליה

cd .. ; mkdir 2-data-preparation
cd 2-data-preparation

data-preparation.py

import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset

# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100"))  # Process a limited number of rows, used 100 during testing phase but can be increased

DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"

# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")

def transform(data):
    """
    Transforms a row of the DataFrame into the desired format for fine-tuning.

    Args:
      data: A pandas Series representing a row of the DataFrame.

    Returns:
      A dictionary containing the formatted text.
    """ 
    question = f"Review analysis for movie '{data['id']}'"
    context = data['reviewText']
    answer = data['scoreSentiment']
    template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
    return {'text': template.format(question=question, context=context, answer=answer)}

try:
    df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
    print(f"Dataset loaded successfully.")

    # Drop rows with NaN values in relevant columns
    df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])

    # Apply transformation to the DataFrame
    transformed_data = df.apply(transform, axis=1).tolist()

    # Convert transformed data to a DataFrame and then to a Hugging Face Dataset
    transformed_df = pd.DataFrame(transformed_data)
    dataset = Dataset.from_pandas(transformed_df)

    # Save the prepared dataset to JSON lines format
    with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
        for item in dataset:
            f.write(json.dumps(item) + "\n")

    print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
    
except Exception as e:
    print(f"Error during data loading or preprocessing: {e}")
    import traceback
    print(traceback.format_exc())

קובץ Docker

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]

requirements.txt

datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3

# Now we create a container images for data-preparation and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest

11. מאגר תגים של קוד אפליקציה מספר 3 – כוונון עדין

בדוגמה הזו אנחנו משתמשים ב-Gemma-2-9b-it כמודל בסיסי, ואז מבצעים כוונון עדין שלו באמצעות מערך הנתונים החדש שלנו.

זהו רצף השלבים שמתרחשים במהלך שלב הכוונון העדין.

1. הגדרה: ייבוא ספריות, הגדרת פרמטרים (למודל, לנתונים ולאימון) וטעינת מערך הנתונים מ-Google Cloud Storage.

2. טעינת מודל: טוענים מודל שפה שעבר אימון מקדים עם קוונטיזציה ליעילות, וטוענים את הטוקנייזר המתאים.

3. מגדירים את LoRA: מגדירים Low-Rank Adaptation (התאמה לדרגה נמוכה, LoRA) כדי לבצע כוונון עדין של המודל בצורה יעילה על ידי הוספת מטריצות קטנות שניתנות לאימון.

4. אימון: מגדירים פרמטרים לאימון ומשתמשים ב-SFTTrainer כדי לבצע כוונון עדין של המודל בקבוצת הנתונים שנטענה באמצעות סוג הכימות FP16.

5. שמירה והעלאה: שומרים את המודל ואת ה-tokenizer שעברו כוונון עדין באופן מקומי, ואז מעלים אותם ל-bucket ב-GCS.

לאחר מכן יוצרים קובץ אימג' של קונטיינר באמצעות Cloud Build ומאחסנים אותו ב-Artifact Registry.

יצירת ספרייה והעתקת הסקריפטים שלנו אליה

cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning

finetuning.py

import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage

# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")

# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
    "json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)

################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))

################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50

################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False

# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
    attn_implementation="eager",
    pretrained_model_name_or_path=model_name,
    torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")

# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
    lora_alpha=lora_alpha,
    lora_dropout=lora_dropout,
    r=lora_r,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
)

# Set training parameters
training_arguments = SFTConfig(
        bf16=bf16,
        dataset_kwargs={
            "add_special_tokens": False,  
            "append_concat_token": False, 
        },
        dataset_text_field="text",
        disable_tqdm=True,
        fp16=fp16,
        gradient_accumulation_steps=gradient_accumulation_steps,
        gradient_checkpointing=gradient_checkpointing,
        gradient_checkpointing_kwargs={"use_reentrant": False},
        group_by_length=group_by_length,
        log_on_each_node=False,
        logging_steps=logging_steps,
        learning_rate=learning_rate,
        lr_scheduler_type=lr_scheduler_type,
        max_grad_norm=max_grad_norm,
        max_seq_length=max_seq_length,
        max_steps=max_steps,
        num_train_epochs=num_train_epochs,
        optim=optim,
        output_dir=save_model_path,
        packing=packing,
        per_device_train_batch_size=per_device_train_batch_size,
        save_strategy=save_strategy,
        save_steps=save_steps,
        save_total_limit=save_total_limit,
        warmup_ratio=warmup_ratio,
        weight_decay=weight_decay,
    )

print(f"Configuring fine tuning completed")

# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset,
    peft_config=peft_config,
    dataset_text_field="text",
    max_seq_length=max_seq_length,
    tokenizer=tokenizer,
    args=training_arguments,
    packing=packing,
)

print(f"Creating trainer completed")

# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")

# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
    low_cpu_mem_usage=True,
    pretrained_model_name_or_path=model_name,
    return_dict=True,
    torch_dtype=torch.float16,
)

model = PeftModel.from_pretrained(
    model=base_model,
    model_id=new_model,
)
model = model.merge_and_unload()

print(f"Merging the new model with base model completed")

accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")

print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
    is_main_process=accelerator.is_main_process,
    save_directory=save_model_path,
    save_function=accelerator.save,
)
print(f"Save unwrapped model completed")

print(f"Save new tokenizer started")
if accelerator.is_main_process:
    tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")

# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
    """Uploads a directory to GCS."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    for root, _, files in os.walk(model_dir):
        for file in files:
            local_file_path = os.path.join(root, file)
            gcs_file_path = os.path.relpath(local_file_path, model_dir)
            blob = bucket.blob(os.path.join(new_model, gcs_file_path))  # Use new_model_name
            blob.upload_from_filename(local_file_path)

# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")

קובץ Docker

# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04

# Install necessary system packages
RUN apt-get update && \
    apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements.txt into the container
COPY requirements.txt .

# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Copy your finetune script into the container
COPY finetuning.py .

# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
 
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]

requirements.txt

accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4

עכשיו ניצור קובץ אימג' של קונטיינר לצורך כוונון עדין ונעביר אותו בדחיפה ל-Artifact Registry.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest

12. סקירה כללית על Airflow 2, כולל הסבר על DAG

‫Airflow היא פלטפורמה לניהול תהליכי עבודה וצינורות עיבוד נתונים. הוא משתמש ב-DAG (גרפים אציקליים מכוונים) כדי להגדיר את תהליכי העבודה האלה בקוד Python, ומציג באופן חזותי את המשימות והתלות שלהן.

‫Airflow, עם DAG סטטי והגדרות מבוססות Python, מתאים לתזמון ולניהול של תהליכי עבודה מוגדרים מראש. הארכיטקטורה שלו כוללת ממשק משתמש ידידותי למשתמש לצורך מעקב אחרי תהליכי העבודה האלה וניהול שלהם.

בעצם, Airflow מאפשר להגדיר, לתזמן ולנטר את צינורות הנתונים באמצעות Python, ולכן הוא כלי גמיש ועוצמתי לתזמור תהליכי עבודה.

13. סקירה כללית של ה-DAG שלנו

ec49964ad7d61491.png

DAG הוא קיצור של Directed Acyclic Graph (גרף אציקלי מכוון). ב-Airflow, ‏ DAG מייצג את תהליך העבודה או את הצינור כולו. הוא מגדיר את המשימות, את יחסי התלות ביניהן ואת סדר הביצוע.

יחידות תהליך העבודה ב-DAG מופעלות מ-pod באשכול GKE, שמופעל מההגדרה של Airflow.

סיכום:

Airflow: הורדת נתונים – הסקריפט הזה מבצע אוטומציה של התהליך של קבלת מערך נתונים של ביקורות על סרטים מ-Kaggle ואחסון שלו בדלי GCS, כך שהוא זמין לעיבוד או לניתוח נוספים בסביבת הענן שלכם.

Airflow: Data Preparation – הקוד לוקח את מערך הנתונים של ביקורות הסרטים הגולמיות, מסיר עמודות נתונים מיותרות שלא נדרשות לתרחיש השימוש שלנו ומוחק מערכי נתונים עם ערכים חסרים. בשלב הבא, הוא יוצר מבנה למערך הנתונים בפורמט של שאלה ותשובה שמתאים ללמידת מכונה, ומאחסן אותו בחזרה ב-GCS לשימוש מאוחר יותר.

Airflow: Model Finetuning – קוד זה מבצע כוונון עדין של מודל שפה גדול (LLM) באמצעות טכניקה שנקראת LoRA (Low-Rank Adaptation) ואז שומר את המודל המעודכן. התהליך מתחיל בטעינה של מודל LLM שעבר אימון מראש (pre-trained) ושל מערך נתונים מ-Google Cloud Storage. לאחר מכן, המערכת משתמשת ב-LoRA כדי לבצע אופטימיזציה יעילה של המודל על מערך הנתונים הזה. לבסוף, המודל שעבר כוונון עדין נשמר בחזרה ב-Google Cloud Storage לשימוש מאוחר יותר באפליקציות כמו יצירת טקסט או מענה לשאלות.

Airflow: Model Serving – הפעלת המודל שעבר כוונון עדין ב-GKE באמצעות vllm להסקת מסקנות.

Airflow: לולאת משוב – אימון מחדש של המודל כל xx זמן (שעתי, יומי, שבועי).

בתרשים הזה מוסבר איך Airflow 2 פועל כשמריצים אותו ב-GKE.

8691f41166209a5d.png

14. התאמה עדינה של מודל לעומת שימוש ב-RAG

ב-CodeLab הזה מתבצע כוונון עדין של מודל LLM במקום שימוש ב-Retrieval Augmented Generation (יצירה משולבת-אחזור, RAG).

בואו נשווה בין שתי הגישות האלה:

כוונון עדין: יצירת מודל ייעודי: כוונון עדין מתאים את ה-LLM למשימה או למערך נתונים ספציפיים, ומאפשר לו לפעול באופן עצמאי בלי להסתמך על מקורות נתונים חיצוניים.

מפשטת את ההסקה: לא צריך יותר מערכת אחזור ומסד נתונים נפרדים, ולכן התשובות מהירות וזולות יותר, במיוחד בתרחישי שימוש נפוצים.

RAG: מסתמך על ידע חיצוני: RAG מאחזר מידע רלוונטי ממאגר ידע לכל בקשה, כדי להבטיח גישה לנתונים עדכניים וספציפיים.

המורכבות גדלה: הטמעה של RAG בסביבת ייצור כמו אשכול Kubernetes לרוב כוללת כמה מיקרו-שירותים לעיבוד נתונים ולאחזור שלהם, מה שעלול להגדיל את זמן האחזור ואת עלויות החישוב.

למה בחרנו בכוונון עדין:

אף על פי ש-RAG מתאים למערך הנתונים הקטן שבו השתמשנו ב-CodeLab הזה, בחרנו בשיטת הכוונון העדין כדי להדגים תרחיש שימוש אופייני ב-Airflow. הבחירה הזו מאפשרת לנו להתמקד בהיבטים של תזמור תהליכי העבודה, במקום להתעמק בניואנסים של הגדרת תשתית נוספת ומיקרו-שירותים ל-RAG.

סיכום:

גם כוונון עדין וגם RAG הן טכניקות חשובות, ולכל אחת מהן יש יתרונות וחסרונות. הבחירה האופטימלית תלויה בדרישות הספציפיות של הפרויקט, כמו גודל הנתונים והמורכבות שלהם, צורכי הביצועים ושיקולי העלות.

15. משימה מספר 1 ב-DAG – יצירת השלב הראשון ב-Airflow: הורדת נתונים

כסקירה כללית של יחידת ה-DAG הזו, קוד ה-Python שלנו שמתארח בקובץ אימג' של קונטיינר מוריד את מערך הנתונים העדכני ביותר של RottenTomatoes מ-Kaggle.

אל תעתיקו את הקוד הזה לקטגוריית GCS. בשלב האחרון, מעתיקים את mlops-dag.py שמכיל את כל השלבים של יחידת ה-DAG בסקריפט Python אחד.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        dataset_download

16. משימה מספר 2 ב-DAG – יצירת השלב השני ב-Airflow: הכנת הנתונים

כסקירה כללית של יחידת ה-DAG הזו, אנחנו טוענים קובץ CSV‏ (rotten_tomatoes_movie_reviews.csv) מ-GCS ל-Pandas DataFrame.

לאחר מכן, אנחנו מגבילים את מספר השורות שעוברות עיבוד באמצעות DATASET_LIMIT לצורך בדיקה ויעילות משאבים, ולבסוף ממירים את הנתונים שעברו טרנספורמציה ל-Hugging Face Dataset.

אם תסתכלו היטב, תראו שאנחנו מאמנים 1,000 שורות במודל עם 'DATASET_LIMIT': '1000', כי זה לוקח 20 דקות ב-GPU של Nvidia L4.

אל תעתיקו את הקוד הזה לקטגוריית GCS. בשלב האחרון, מעתיקים את mlops-dag.py, שמכיל את כל השלבים בסקריפט Python אחד.

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation

17. משימה מספר 3 ב-DAG – יצירת השלב השלישי ב-Airflow: כוונון עדין של המודל

כאן אנחנו מריצים את finetune.py כדי לדייק את מודל Gemma באמצעות מערך הנתונים החדש שלנו.

אל תעתיקו את הקוד הזה לקטגוריית GCS. בשלב האחרון, מעתיקים את mlops-dag.py, שמכיל את כל השלבים בסקריפט Python אחד.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Task 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Task 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Task 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation >> fine_tuning

18. משימה מספר 4 ב-DAG – יצירת השלב האחרון ב-Airflow: הסקת מסקנות / הצגת המודל

vLLM היא ספרייה עוצמתית עם קוד פתוח שנועדה במיוחד להסקת מסקנות (inference) של מודלים גדולים של שפה (LLM) עם ביצועים גבוהים. כשמפעילים אותו ב-Google Kubernetes Engine ‏ (GKE), הוא משתמש ביכולות ההתאמה והיעילות של Kubernetes כדי להפעיל מודלים של שפה גדולה בצורה יעילה.

סיכום השלבים:

  • מעלים את ה-DAG ‏mlops-dag.py לקטגוריית GCS.
  • מעתיקים שני קובצי תצורה של Kubernetes YAML להגדרת הסקה, לקטגוריה ב-GCS.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

def model_serving():
    config.load_incluster_config()
    k8s_apps_v1 = client.AppsV1Api()
    k8s_core_v1 = client.CoreV1Api()

    while True:
        try:
            k8s_apps_v1.delete_namespaced_deployment(
                    namespace="airflow",
                    name="inference-deployment",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Deployment inference-deployment deleted")
    
    with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_apps_v1.create_namespaced_deployment(
            body=dep, namespace="airflow")
        print(f"Deployment created. Status='{resp.metadata.name}'")
    
    while True:
        try:
            k8s_core_v1.delete_namespaced_service(
                    namespace="airflow",
                    name="llm-service",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Service llm-service deleted")

    with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_core_v1.create_namespaced_service(
            body=dep, namespace="airflow")
        print(f"Service created. Status='{resp.metadata.name}'")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 4: Run GKE Deployment for model serving
        model_serving = PythonOperator(
            task_id="model_serving",
            python_callable=model_serving
        )

        dataset_download >> data_preparation >> fine_tuning >> model_serving

מעלים את סקריפט Python (קובץ DAG) ואת מניפסטים של Kubernetes לקטגוריית DAGS GCS.

gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}

בממשק המשתמש של Airflow, יופיע mlops-dag.

  1. בוחרים באפשרות 'ביטול ההשהיה'.
  2. בוחרים באפשרות Trigger DAG (הפעלת DAG) כדי לבצע מחזור MLOps ידני.

d537281b92d5e8bb.png

אחרי ש-DAG יסתיים, תראו פלט כמו זה בממשק המשתמש של Airflow.

3ed42abf8987384e.png

אחרי השלב האחרון, אפשר להעתיק את נקודת הקצה של המודל ולשלוח הנחיה כדי לבדוק את המודל.

מחכים כ-5 דקות לפני שמריצים את פקודת curl, כדי שהסקת המסקנות של המודל תוכל להתחיל ומאזן העומסים יוכל להקצות כתובת IP חיצונית.

export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')

curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
    "prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
    "temperature": 0.1,
    "top_p": 1.0,
    "max_tokens": 128
}
EOF

פלט:

19. מעולה!

יצרתם את תהליך העבודה הראשון שלכם עם AI באמצעות צינור עיבוד נתונים מסוג DAG עם Airflow 2 ב-GKE.

אל תשכחו לבטל את הקצאת המשאבים שפרסתם.

20. ביצוע הפעולה הזו בסביבת הייצור

ה-CodeLab סיפק לכם תובנות חשובות לגבי הגדרת Airflow 2 ב-GKE, אבל בעולם האמיתי כדאי לשקול את הנושאים הבאים כשמבצעים את הפעולה הזו בסביבת ייצור.

מטמיעים חזית אתר באמצעות Gradio או כלי דומה.

אפשר להגדיר מעקב אוטומטי אחרי אפליקציות עבור עומסי עבודה באמצעות GKE כאן, או לייצא מדדים מ-Airflow כאן.

יכול להיות שתצטרכו יחידות GPU גדולות יותר כדי לבצע כוונון עדין של המודל מהר יותר, במיוחד אם יש לכם מערכי נתונים גדולים יותר. עם זאת, אם רוצים לאמן את המודל בכמה יחידות GPU, צריך לפצל את מערך הנתונים ולחלק את האימון. הנה הסבר על FSDP עם PyTorch (מקביליות נתונים עם חלוקה מלאה, באמצעות שיתוף GPU כדי להשיג את המטרה הזו). מידע נוסף זמין בפוסט בבלוג של Meta ובמדריך הזה בנושא FSDP באמצעות Pytorch.

Google Cloud Composer הוא שירות Airflow מנוהל, כך שלא צריך לתחזק את Airflow עצמו, אלא רק לפרוס את ה-DAG ולהתחיל לעבוד.

מידע נוסף

רישיון

עבודה זו מורשית תחת רישיון Creative Commons שמותנה בייחוס 2.0 כללי.