إنشاء سير عمل MLOps باستخدام Airflow 2 على GKE

1. نظرة عامة

852dc8844309ffb8.png

يوضّح هذا الدرس العملي كيفية دمج ممارسات DevOps في تعلُّم الآلة (MLOps) من خلال تنزيل مجموعة بيانات وتحسين نموذج ونشر النموذج اللغوي الكبير على Google Kubernetes Engine (GKE) باستخدام مخطّط موجّه غير دوري (DAG) في Airflow مع أقل قدر من التجريد. نتيجةً لذلك، نستخدم أوامر gcloud بدلاً من Terraform حتى تتمكّن من اتّباع خطوات المختبر بالتفصيل وفهم كل عملية بسهولة من منظور كل من مهندس المنصة ومهندس تعلُّم الآلة.

سيرشدك هذا الدليل العملي إلى كيفية الاستفادة من Airflow لتبسيط سير عمل الذكاء الاصطناعي، مع تقديم عرض توضيحي واضح وعملي لدورة حياة MLOps بالكامل من خلال إعداد رسم بياني موجّه غير دوري (DAG).

أهداف الدورة التعليمية

  • تعزيز التعاون والتفاهم بين مهندسي المنصات ومهندسي تعلُّم الآلة من خلال تحليل مستودعات هائلة من البيانات وتحسين سير العمل
  • التعرّف على كيفية نشر Airflow 2 واستخدامه وإدارته على GKE
  • إعداد DAG في Airflow من البداية إلى النهاية
  • إنشاء أساس لأنظمة تعلُّم الآلة ذات مستوى الإنتاج باستخدام GKE
  • تجهيز أنظمة تعلُّم الآلة وتشغيلها
  • التعرّف على كيف أصبحت هندسة المنصات من الركائز الأساسية لدعم عمليات تعلّم الآلة

ما يحقّقه هذا الدرس التطبيقي حول الترميز

  • يمكنك طرح أسئلة حول الأفلام من نموذج لغوي كبير تم ضبطه بدقة استنادًا إلى Gemma-2-9b-it، ويتم عرضه في GKE باستخدام vLLM.

Target Audience

  • مهندسو تعلُّم الآلة
  • مهندسو المنصات
  • علماء البيانات
  • مهندسو البيانات
  • مهندسو DevOps
  • مهندس منصات
  • مهندسو العملاء

هذا الدرس التطبيقي حول الترميز غير مخصّص

  • مقدمة عن مهام سير عمل الذكاء الاصطناعي/تعلُّم الآلة أو GKE
  • نظرة عامة على مجموعة ميزات Airflow بأكملها

2. تساعد هندسة المنصات مهندسي/علماء تعلُّم الآلة

16635a8284b994c.png

هندسة المنصات وMLOps هما مجالان مترابطان يتعاونان لإنشاء بيئة قوية وفعّالة لتطوير نماذج تعلُّم الآلة ونشرها.

النطاق: يمتد نطاق هندسة المنصات إلى ما هو أبعد من عمليات تعلُّم الآلة، إذ يشمل دورة حياة تطوير البرامج بالكامل ويوفر الأدوات والبنية الأساسية اللازمة لها.

تساعد عمليات تعلُّم الآلة في سد الفجوة بين تطوير نماذج تعلُّم الآلة ونشرها واستنتاجها.

الخبرة: يتمتع مهندسو المنصات عادةً بخبرة كبيرة في تكنولوجيات البنية الأساسية، مثل الحوسبة السحابية وتجميع الحاويات وإدارة البيانات.

يتخصّص مهندسو MLOps في تطوير نماذج تعلُّم الآلة وتفعيلها ورصدها، وغالبًا ما يمتلكون مهارات في عِلم البيانات وهندسة البرمجيات.

الأدوات: ينشئ مهندسو المنصات أدوات لتوفير البنية الأساسية وإدارة الإعدادات وتنظيم الحاويات وتوفير بنية التطبيقات. يستخدم مهندسو عمليات تعلُّم الآلة أدوات لتدريب نماذج تعلُّم الآلة وتجربتها ونشرها ومراقبتها وتحديد إصداراتها.

3- إعداد Google Cloud ومتطلباته

إعداد البيئة بالسرعة التي تناسبك

  1. سجِّل الدخول إلى Google Cloud Console وأنشِئ مشروعًا جديدًا أو أعِد استخدام مشروع حالي. إذا لم يكن لديك حساب على Gmail أو Google Workspace، عليك إنشاء حساب.

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • اسم المشروع هو الاسم المعروض للمشاركين في هذا المشروع. وهي سلسلة أحرف لا تستخدمها Google APIs. ويمكنك تعديلها في أي وقت.
  • رقم تعريف المشروع هو معرّف فريد في جميع مشاريع Google Cloud ولا يمكن تغييره بعد ضبطه. تنشئ Cloud Console تلقائيًا سلسلة فريدة، ولا يهمّك عادةً ما هي. في معظم دروس البرمجة، عليك الرجوع إلى رقم تعريف مشروعك (يُشار إليه عادةً باسم PROJECT_ID). إذا لم يعجبك رقم التعريف الذي تم إنشاؤه، يمكنك إنشاء رقم تعريف عشوائي آخر. يمكنك بدلاً من ذلك تجربة اسم مستخدم من اختيارك ومعرفة ما إذا كان متاحًا. لا يمكن تغيير هذا الخيار بعد هذه الخطوة وسيظل ساريًا طوال مدة المشروع.
  • للعلم، هناك قيمة ثالثة، وهي رقم المشروع، تستخدمها بعض واجهات برمجة التطبيقات. يمكنك الاطّلاع على مزيد من المعلومات عن كل هذه القيم الثلاث في المستندات.
  1. بعد ذلك، عليك تفعيل الفوترة في Cloud Console لاستخدام موارد/واجهات برمجة تطبيقات Cloud. لن تكلفك تجربة هذا الدرس التطبيقي حول الترميز الكثير، إن وُجدت أي تكلفة على الإطلاق. لإيقاف الموارد وتجنُّب تحمّل تكاليف تتجاوز هذا البرنامج التعليمي، يمكنك حذف الموارد التي أنشأتها أو حذف المشروع. يمكن لمستخدمي Google Cloud الجدد الاستفادة من برنامج الفترة التجريبية المجانية بقيمة 300 دولار أمريكي.

بدء Cloud Shell

على الرغم من إمكانية تشغيل Google Cloud عن بُعد من الكمبيوتر المحمول، ستستخدم في هذا الدرس العملي Cloud Shell، وهي بيئة سطر أوامر تعمل في السحابة الإلكترونية.

تفعيل Cloud Shell

  1. من Cloud Console، انقر على تفعيل Cloud Shell 853e55310c205094.png.

3c1dabeca90e44e5.png

إذا كانت هذه هي المرة الأولى التي تبدأ فيها Cloud Shell، ستظهر لك شاشة وسيطة توضّح ماهية هذه الخدمة. إذا ظهرت لك شاشة وسيطة، انقر على متابعة.

9c92662c6a846a5c.png

يستغرق توفير Cloud Shell والاتصال به بضع لحظات فقط.

9f0e51b578fecce5.png

يتم تحميل هذا الجهاز الافتراضي بجميع أدوات التطوير اللازمة. توفّر هذه الخدمة دليلًا رئيسيًا دائمًا بسعة 5 غيغابايت وتعمل في Google Cloud، ما يؤدي إلى تحسين أداء الشبكة والمصادقة بشكل كبير. يمكن إنجاز معظم عملك في هذا الدرس التطبيقي حول الترميز، إن لم يكن كله، باستخدام متصفح.

بعد الاتصال بـ Cloud Shell، من المفترض أن يظهر لك أنّه تم إثبات هويتك وأنّه تم ضبط المشروع على رقم تعريف مشروعك.

  1. نفِّذ الأمر التالي في Cloud Shell للتأكّد من إكمال عملية المصادقة:
gcloud auth list

ناتج الأمر

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. نفِّذ الأمر التالي في Cloud Shell للتأكّد من أنّ أمر gcloud يعرف مشروعك:
gcloud config list project

ناتج الأمر

[core]
project = <PROJECT_ID>

إذا لم يكن كذلك، يمكنك تعيينه من خلال هذا الأمر:

gcloud config set project <PROJECT_ID>

ناتج الأمر

Updated property [core/project].

4. الخطوة 1: الاشتراك والمصادقة على Kaggle

لبدء CodeLab، عليك إنشاء حساب على Kaggle، وهي منصة مجتمع على الإنترنت لعلماء البيانات وعشاق تعلُّم الآلة تملكها Google وتستضيف مستودعًا كبيرًا من مجموعات البيانات المتاحة للجميع في مجالات مختلفة. من هذا الموقع الإلكتروني ستنزّل مجموعة بيانات RottenTomatoes المستخدَمة لتدريب النموذج.

  • يمكنك الاشتراك في Kaggle باستخدام ميزة "الدخول المُوحَّد" من Google لتسجيل الدخول.
  • قبول الأحكام والشروط
  • انتقِل إلى "الإعدادات" واحصل على اسم المستخدم username
  • ضمن قسم واجهة برمجة التطبيقات، اختَر "إنشاء رمز مميّز جديد من" Kaggle، ما سيؤدي إلى تنزيل ملف kaggle.json.
  • إذا واجهت أي مشاكل، انتقِل إلى صفحة الدعم هنا.

5- الخطوة 2: الاشتراك وإثبات الملكية على HuggingFace

‫HuggingFace هي منصة مركزية تتيح لأي شخص التفاعل مع تكنولوجيا تعلُّم الآلة. تستضيف المنصة 900 ألف نموذج و200 ألف مجموعة بيانات و300 ألف تطبيق تجريبي (Spaces)، وكلها مفتوحة المصدر ومتاحة للجميع.

  • الاشتراك في HuggingFace: أنشئ حسابًا باستخدام اسم مستخدم، ولا يمكنك استخدام ميزة "الدخول المُوحَّد" من Google
  • تأكيد عنوان بريدك الإلكتروني
  • انتقِل إلى هنا واقبل ترخيص نموذج Gemma-2-9b-it
  • يمكنك إنشاء رمز مميّز على HuggingFace من هنا
  • سجِّل بيانات اعتماد الرمز المميز، ستحتاج إليها لاحقًا.

6. الخطوة 3: إنشاء موارد البنية الأساسية المطلوبة في Google Cloud

ستعمل على إعداد GKE وGCE وArtifact Registry وتطبيق أدوار إدارة الهوية وإمكانية الوصول (IAM) باستخدام اتحاد الهوية الخاص بأحمال العمل.

يستخدم سير عمل الذكاء الاصطناعي مجموعتَي عقد، إحداهما للتدريب والأخرى للاستنتاج. تستخدم مجموعة العُقد التدريبية جهازًا افتراضيًا من Google Compute Engine (GCE) من النوع g2-standard-8 مزوّدًا بوحدة معالجة رسومات Nvidia L4 Tensor Core واحدة. تستخدم مجموعة عقد الاستنتاج جهازًا افتراضيًا من النوع g2-standard-24 مزوّدًا بوحدتَي معالجة رسومات Nvidia L4 Tensor Core. أثناء تحديد المنطقة، اختَر منطقة تتوفّر فيها وحدة معالجة الرسومات المطلوبة ( رابط).

في Cloud Shell، نفِّذ الأوامر التالية:

# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")

SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1

# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto

# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount

# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com

# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
  --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-12 \
  --num-nodes=1

# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
  --accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-24 \
  --num-nodes=1

# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a

# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}

إنشاء ملفات بيان YAML

mkdir manifests
cd manifests

mlops-sa.yaml

apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
  name: airflow-mlops-sa
  namespace: airflow
  labels:
    tier: airflow

pv-dags.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-dags
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 5Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_DAGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pv-logs.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-logs
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 100Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_LOGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pvc-dags.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-dags
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  volumeName: airflow-dags
  storageClassName: standard

pvc-logs.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  volumeName: airflow-logs
  storageClassName: standard

namespace.yaml

kind: Namespace
apiVersion: v1
metadata:
  name: airflow
  labels:
    name: airflow

sa-role.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-deployment-role
rules:
- apiGroups: ["apps"] 
  resources: ["deployments"]
  verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]

sa-rolebinding.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow-deployment-rolebinding
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: airflow-worker
  namespace: airflow
roleRef:
  kind: Role
  name: airflow-deployment-role
  apiGroup: rbac.authorization.k8s.io

inference.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-deployment
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gemma-server
  template:
    metadata:
      labels:
        app: gemma-server
        ai.gke.io/model: gemma-2-9b-it
        ai.gke.io/inference-server: vllm
      annotations:
        gke-gcsfuse/volumes: "true"
    spec:
      serviceAccountName: airflow-mlops-sa
      tolerations:
      - key: "nvidia.com/gpu"
        operator: "Exists"
        effect: "NoSchedule"
      - key: "on-demand"
        value: "true"
        operator: "Equal"
        effect: "NoSchedule"
      containers:
      - name: inference-server
        image: vllm/vllm-openai:v0.6.6
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: "2"
          limits:
            nvidia.com/gpu: "2"
        command: ["/bin/sh", "-c"]
        args:
        - |
          python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
        volumeMounts:
        - mountPath: /dev/shm
          name: dshm
        - name: gcs-fuse-csi-ephemeral
          mountPath: /modeldata
          readOnly: true
      volumes:
      - name: dshm
        emptyDir:
          medium: Memory
      - name: gcs-fuse-csi-ephemeral
        csi:
          driver: gcsfuse.csi.storage.gke.io
          volumeAttributes:
            bucketName: BUCKET_DATA_NAME
            mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
            fileCacheCapacity: "20Gi"
            fileCacheForRangeRead: "true"
            metadataStatCacheCapacity: "-1"
            metadataTypeCacheCapacity: "-1"
            metadataCacheTTLSeconds: "-1"
      nodeSelector:
        cloud.google.com/gke-accelerator: nvidia-l4

inference-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: llm-service
  namespace: airflow
spec:
  selector:
    app: gemma-server
  type: LoadBalancer
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

إنشاء 3 حِزم في Google Cloud Storage (GCS)

gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}

# Create the namespace in GKE
kubectl apply -f namespace.yaml

# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml

Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation

gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"

7. الخطوة 4: تثبيت Airflow على GKE من خلال مخطط Helm

الآن، سنفعّل Airflow 2 باستخدام Helm. ‫Apache Airflow هي منصة مفتوحة المصدر لإدارة مهام سير العمل في مسارات هندسة البيانات. سنتناول مجموعة خصائص Airflow 2 لاحقًا.

ملف values.yaml لمخطط Airflow Helm

config:
  webserver:
    expose_config: true
webserver:
  service:
    type: LoadBalancer
  podAnnotations:
    gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
  - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
    value: "30"
logs:
  persistence:
    enabled: true
    existingClaim: "airflow-logs"
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
scheduler:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
triggerer:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
workers:
  podAnnotations:
    gke-gcsfuse/volumes: "true"

نشر Airflow 2

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml

8. الخطوة 5: إعداد Airflow باستخدام عمليات الربط والمتغيّرات

بعد نشر Airflow 2، يمكننا البدء في إعداده. نحدد بعض المتغيرات التي تقرأها نصوص Python البرمجية.

  1. الوصول إلى واجهة مستخدم Airflow على المنفذ 8080 باستخدام المتصفّح

الحصول على عنوان IP الخارجي

kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'

افتح متصفّح ويب وانتقِل إلى http://<EXTERNAL-IP>:8080 . اسم المستخدم هو admin وكلمة المرور هي admin

  1. أنشئ عملية ربط تلقائية بمنصة Google Cloud Platform ضِمن واجهة مستخدم Airflow، وذلك من خلال الانتقال إلى "المشرف" (Admin) → "عمليات الربط" (Connections) → "إضافة سجل جديد" (+ Add a new record).
  • معرّف الاتصال: google_cloud_default
  • نوع الاتصال: Google Cloud

انقر على "حفظ".

  1. أنشئ المتغيّرات المطلوبة، وذلك بالانتقال إلى "المشرف" → "المتغيّرات" → "+ إضافة سجل جديد"
  • المفتاح: BUCKET_DATA_NAME - القيمة: Copy from echo $BUCKET_DATA_NAME
  • المفتاح: GCP_PROJECT_ID - القيمة: انسخها من echo $DEVSHELL_PROJECT_ID
  • المفتاح: HF_TOKEN - القيمة: أدخِل رمز HF
  • المفتاح: KAGGLE_USERNAME - القيمة: أدخِل اسم المستخدم على Kaggle
  • المفتاح: KAGGLE_KEY - القيمة: انسخ هذه القيمة من ملف kaggle.json

انقر على "حفظ" بعد كل زوج مفتاح/قيمة.

يجب أن تبدو واجهة المستخدم على النحو التالي:

771121470131b5ec.png

9- حاوية الرمز البرمجي للتطبيق 1 - تنزيل البيانات

في نص Python البرمجي هذا، نُجري المصادقة باستخدام Kaggle لتنزيل مجموعة البيانات إلى حزمة GCS.

يتم وضع النص البرمجي نفسه في حاوية لأنّ هذا يصبح وحدة DAG رقم 1، ونتوقّع تعديل مجموعة البيانات بشكل متكرّر، لذا نريد تنفيذ هذه العملية تلقائيًا.

إنشاء دليل ونسخ النصوص البرمجية هنا

cd .. ; mkdir 1-dataset-download
cd 1-dataset-download

dataset-download.py

import os
import kagglehub
from google.cloud import storage

KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")

print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"

upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)

ملف شامل

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]

requirements.txt

google-cloud-storage==2.19.0
kagglehub==0.3.4

الآن، سننشئ صورة حاوية لتنزيل مجموعة البيانات ونحمّلها إلى Artifact Registry.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest

10. حاوية الرمز البرمجي للتطبيق رقم 2 - إعداد البيانات

خلال خطوة إعداد البيانات، نحقّق ما يلي:

  1. تحديد مقدار مجموعة البيانات التي نريد استخدامها لضبط النموذج الأساسي
  2. تحميل مجموعة البيانات، أي قراءة ملف CSV في إطار بيانات Pandas وهو بنية بيانات ثنائية الأبعاد للصفوف والأعمدة
  3. تحويل البيانات / المعالجة المسبقة: تحديد الأجزاء غير ذات الصلة من مجموعة البيانات من خلال تحديد ما نريد الاحتفاظ به، ما يؤدي في الواقع إلى إزالة الباقي
  4. تطبيق الدالة transform على كل صف من DataFrame
  5. حفظ البيانات المُعدّة مرة أخرى في حزمة GCS

إنشاء دليل ونسخ النصوص البرمجية هنا

cd .. ; mkdir 2-data-preparation
cd 2-data-preparation

data-preparation.py

import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset

# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100"))  # Process a limited number of rows, used 100 during testing phase but can be increased

DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"

# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")

def transform(data):
    """
    Transforms a row of the DataFrame into the desired format for fine-tuning.

    Args:
      data: A pandas Series representing a row of the DataFrame.

    Returns:
      A dictionary containing the formatted text.
    """ 
    question = f"Review analysis for movie '{data['id']}'"
    context = data['reviewText']
    answer = data['scoreSentiment']
    template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
    return {'text': template.format(question=question, context=context, answer=answer)}

try:
    df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
    print(f"Dataset loaded successfully.")

    # Drop rows with NaN values in relevant columns
    df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])

    # Apply transformation to the DataFrame
    transformed_data = df.apply(transform, axis=1).tolist()

    # Convert transformed data to a DataFrame and then to a Hugging Face Dataset
    transformed_df = pd.DataFrame(transformed_data)
    dataset = Dataset.from_pandas(transformed_df)

    # Save the prepared dataset to JSON lines format
    with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
        for item in dataset:
            f.write(json.dumps(item) + "\n")

    print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
    
except Exception as e:
    print(f"Error during data loading or preprocessing: {e}")
    import traceback
    print(traceback.format_exc())

ملف شامل

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]

requirements.txt

datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3

# Now we create a container images for data-preparation and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest

11. حاوية الرمز البرمجي للتطبيق رقم 3 - الضبط الدقيق

نستخدم هنا Gemma-2-9b-it كنموذج أساسي، ثم نضبطه بدقة باستخدام مجموعة البيانات الجديدة.

في ما يلي تسلسل الخطوات التي تحدث أثناء خطوة الضبط الدقيق.

1. الإعداد: استيراد المكتبات وتحديد المَعلمات (للنموذج والبيانات والتدريب) وتحميل مجموعة البيانات من Google Cloud Storage

2. تحميل النموذج: حمِّل نموذجًا لغويًا مدرَّبًا مسبقًا مع تحديد الكمية لتحقيق الكفاءة، وحمِّل أداة الترميز المقابلة.

3. ضبط إعدادات LoRA: يمكنك إعداد Low-Rank Adaptation (LoRA) لضبط النموذج بدقة بشكل فعّال من خلال إضافة مصفوفات صغيرة قابلة للتدريب.

4. التدريب: حدِّد مَعلمات التدريب واستخدِم SFTTrainer لضبط النموذج بدقة على مجموعة البيانات التي تم تحميلها باستخدام نوع التكميم FP16.

5. الحفظ والتحميل: احفظ النموذج والرمز المميز المضبوطَين بدقة على جهازك، ثم حمِّلهما إلى حزمة GCS.

بعد ذلك، ننشئ صورة حاوية باستخدام Cloud Build ونخزّنها في Artifact Registry.

إنشاء دليل ونسخ النصوص البرمجية هنا

cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning

finetuning.py

import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage

# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")

# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
    "json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)

################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))

################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50

################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False

# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
    attn_implementation="eager",
    pretrained_model_name_or_path=model_name,
    torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")

# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
    lora_alpha=lora_alpha,
    lora_dropout=lora_dropout,
    r=lora_r,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
)

# Set training parameters
training_arguments = SFTConfig(
        bf16=bf16,
        dataset_kwargs={
            "add_special_tokens": False,  
            "append_concat_token": False, 
        },
        dataset_text_field="text",
        disable_tqdm=True,
        fp16=fp16,
        gradient_accumulation_steps=gradient_accumulation_steps,
        gradient_checkpointing=gradient_checkpointing,
        gradient_checkpointing_kwargs={"use_reentrant": False},
        group_by_length=group_by_length,
        log_on_each_node=False,
        logging_steps=logging_steps,
        learning_rate=learning_rate,
        lr_scheduler_type=lr_scheduler_type,
        max_grad_norm=max_grad_norm,
        max_seq_length=max_seq_length,
        max_steps=max_steps,
        num_train_epochs=num_train_epochs,
        optim=optim,
        output_dir=save_model_path,
        packing=packing,
        per_device_train_batch_size=per_device_train_batch_size,
        save_strategy=save_strategy,
        save_steps=save_steps,
        save_total_limit=save_total_limit,
        warmup_ratio=warmup_ratio,
        weight_decay=weight_decay,
    )

print(f"Configuring fine tuning completed")

# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset,
    peft_config=peft_config,
    dataset_text_field="text",
    max_seq_length=max_seq_length,
    tokenizer=tokenizer,
    args=training_arguments,
    packing=packing,
)

print(f"Creating trainer completed")

# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")

# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
    low_cpu_mem_usage=True,
    pretrained_model_name_or_path=model_name,
    return_dict=True,
    torch_dtype=torch.float16,
)

model = PeftModel.from_pretrained(
    model=base_model,
    model_id=new_model,
)
model = model.merge_and_unload()

print(f"Merging the new model with base model completed")

accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")

print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
    is_main_process=accelerator.is_main_process,
    save_directory=save_model_path,
    save_function=accelerator.save,
)
print(f"Save unwrapped model completed")

print(f"Save new tokenizer started")
if accelerator.is_main_process:
    tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")

# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
    """Uploads a directory to GCS."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    for root, _, files in os.walk(model_dir):
        for file in files:
            local_file_path = os.path.join(root, file)
            gcs_file_path = os.path.relpath(local_file_path, model_dir)
            blob = bucket.blob(os.path.join(new_model, gcs_file_path))  # Use new_model_name
            blob.upload_from_filename(local_file_path)

# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")

ملف شامل

# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04

# Install necessary system packages
RUN apt-get update && \
    apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements.txt into the container
COPY requirements.txt .

# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Copy your finetune script into the container
COPY finetuning.py .

# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
 
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]

requirements.txt

accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4

الآن، سننشئ صور حاويات للضبط الدقيق ونحمّلها إلى Artifact Registry.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest

12. نظرة عامة على Airflow 2، بما في ذلك ما هو DAG

‫Airflow هي منصة لتنظيم سير العمل ومسارات البيانات. تستخدم هذه الأداة الرسوم البيانية الموجّهة غير الدورية (DAG) لتحديد مسارات العمل هذه في رمز Python، وتمثّل المهام وتبعياتها بشكل مرئي.

تتلاءم أداة Airflow، مع مخططات DAG الثابتة والتعريفات المستندة إلى Python، بشكل جيد مع جدولة مهام سير العمل المحدّدة مسبقًا وإدارتها. تتضمّن بنيتها واجهة مستخدم سهلة الاستخدام لتتبُّع إجراءات سير العمل هذه وإدارتها.

بشكل أساسي، تتيح لك Airflow تحديد خطوط نقل البيانات وجدولتها ومراقبتها باستخدام Python، ما يجعلها أداة مرنة وفعالة لتنظيم سير العمل.

13. نظرة عامة على DAG

ec49964ad7d61491.png

يشير DAG إلى "الرسم البياني الموجّه غير الدوري"، وفي Airflow، يمثّل DAG سير العمل أو خط الأنابيب بالكامل. ويحدّد المهام والتبعيات وترتيب التنفيذ.

يتم تنفيذ وحدات سير العمل داخل الرسم البياني الموجّه غير الدوري من وحدة على مجموعة GKE، ويتم بدء التنفيذ من إعدادات Airflow.

الملخص:

‫Airflow: تنزيل البيانات: يعمل هذا النص البرمجي على أتمتة عملية الحصول على مجموعة بيانات مراجعات الأفلام من Kaggle وتخزينها في حزمة GCS، ما يتيح استخدامها بسهولة في المزيد من المعالجة أو التحليل في بيئتك السحابية.

Airflow: إعداد البيانات: يأخذ الرمز مجموعة بيانات مراجعات الأفلام الأولية، ويزيل أعمدة البيانات غير الضرورية لحالة الاستخدام، ويحذف مجموعات البيانات التي تتضمّن قيمًا ناقصة. بعد ذلك، يتم تنظيم مجموعة البيانات بتنسيق مناسب لتعلُّم الآلة، ثم يتم تخزينها مرة أخرى في GCS لاستخدامها لاحقًا.

‫Airflow: ضبط النموذج بدقة: يضبط هذا الرمز نموذجًا لغويًا كبيرًا (LLM) بدقة باستخدام تقنية تُعرف باسم LoRA (التكيّف المنخفض الترتيب)، ثم يحفظ النموذج المعدَّل. يبدأ بتحميل نموذج لغوي كبير مدرَّب مسبقًا ومجموعة بيانات من Google Cloud Storage. بعد ذلك، يتم تطبيق LoRA لضبط النموذج بدقة وفعالية على مجموعة البيانات هذه. وأخيرًا، يتم حفظ النموذج المعدَّل بدقة في Google Cloud Storage لاستخدامه لاحقًا في تطبيقات مثل إنشاء النصوص أو الإجابة عن الأسئلة.

Airflow: Model Serving: عرض النموذج الذي تم ضبطه بدقة على GKE باستخدام vllm للاستدلال

Airflow: حلقة الملاحظات - إعادة تدريب النموذج كل xx مرة (كل ساعة أو يوميًا أو أسبوعيًا)

يوضّح هذا المخطّط البياني طريقة عمل Airflow 2 عند تشغيله على GKE.

8691f41166209a5d.png

14. ضبط النموذج بدقة مقابل استخدام التوليد المعزّز بالاسترجاع

يضبط هذا الدرس التطبيقي نموذج لغة كبيرًا بدلاً من استخدام التوليد المعزّز بالاسترجاع (RAG).

لنقارن بين هذين النهجين:

التحسين: إنشاء نموذج متخصص: يعمل التحسين على تكييف النموذج اللغوي الكبير مع مهمة أو مجموعة بيانات محدّدة، ما يسمح له بالعمل بشكل مستقل بدون الاعتماد على مصادر بيانات خارجية.

تبسيط الاستنتاج: يغني هذا عن الحاجة إلى نظام استرداد وقاعدة بيانات منفصلَين، ما يؤدي إلى الحصول على ردود أسرع وأرخص، لا سيما في حالات الاستخدام المتكررة.

التوليد المعزّز بالاسترجاع: يعتمد على المعرفة الخارجية: يسترجع التوليد المعزّز بالاسترجاع المعلومات ذات الصلة من قاعدة معلومات لكل طلب، ما يضمن الوصول إلى بيانات حديثة ومحدّدة.

زيادة التعقيد: غالبًا ما يتطلّب تنفيذ RAG في بيئة إنتاج مثل مجموعة Kubernetes خدمات مصغّرة متعدّدة لمعالجة البيانات واسترجاعها، ما قد يؤدي إلى زيادة وقت الاستجابة وتكاليف الحوسبة.

سبب اختيار الضبط الدقيق:

على الرغم من أنّ التوليد المعزَّز بالاسترجاع سيكون مناسبًا لمجموعة البيانات الصغيرة المستخدَمة في هذا الدرس العملي، اخترنا الضبط الدقيق لتوضيح حالة استخدام نموذجية لـ Airflow. يتيح لنا هذا الخيار التركيز على جوانب تنظيم سير العمل بدلاً من الخوض في تفاصيل إعداد بنية تحتية وخدمات مصغّرة إضافية لعملية RAG.

الخلاصة:

كلا الأسلوبين مفيدان ولهما نقاط قوة وضعف. يعتمد الخيار الأمثل على المتطلبات المحدّدة لمشروعك، مثل حجم بياناتك ومدى تعقيدها واحتياجات الأداء واعتبارات التكلفة.

15. مهمة DAG رقم 1: إنشاء خطوتك الأولى على Airflow: تنزيل البيانات

بشكل عام، في وحدة DAG هذه، ينزّل رمز Python المستضاف في صورة حاوية أحدث مجموعة بيانات RottenTomatoes من Kaggle.

لا تنسخ هذا الرمز إلى حزمة GCS. ننسخ mlops-dag.py كخطوة أخيرة، وهو يحتوي على جميع خطوات وحدة DAG ضمن نص برمجي واحد بلغة Python.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        dataset_download

16. مهمة DAG رقم 2 - إنشاء الخطوة الثانية على Airflow: إعداد البيانات

كنظرة عامة على وحدة DAG هذه، نحمّل ملف CSV (rotten_tomatoes_movie_reviews.csv) من GCS إلى Pandas DataFrame.

بعد ذلك، نحدّ من عدد الصفوف التي تتم معالجتها باستخدام DATASET_LIMIT للاختبار وكفاءة الموارد، وأخيرًا نحول البيانات المحوّلة إلى مجموعة بيانات Hugging Face.

إذا دقّقت جيدًا، ستلاحظ أنّنا ندرّب 1000 صف في النموذج باستخدام "DATASET_LIMIT": "1000"، والسبب في ذلك هو أنّ الأمر يستغرق 20 دقيقة على وحدة معالجة الرسومات Nvidia L4.

لا تنسخ هذا الرمز إلى حزمة GCS. ننسخ الملف mlops-dag.py في الخطوة الأخيرة، والذي يحتوي على جميع الخطوات ضمن نص برمجي واحد بلغة Python.

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation

17. DAG Task #3 - Create your third step on Airflow: Model Finetuning

كنظرة عامة على وحدة DAG هذه، ننفّذ هنا finetune.py لتحسين نموذج Gemma باستخدام مجموعة البيانات الجديدة.

لا تنسخ هذا الرمز إلى حزمة GCS. ننسخ الملف mlops-dag.py في الخطوة الأخيرة، والذي يحتوي على جميع الخطوات ضمن نص برمجي واحد بلغة Python.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Task 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Task 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Task 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation >> fine_tuning

18 مهمة DAG رقم 4 - إنشاء خطوتك الأخيرة على Airflow: الاستدلال / عرض النموذج

vLLM هي مكتبة قوية مفتوحة المصدر مصمَّمة خصيصًا للاستدلال عالي الأداء في النماذج اللغوية الكبيرة. عند نشرها على Google Kubernetes Engine (GKE)، تستفيد من قابلية التوسّع وكفاءة Kubernetes لتقديم خدمات نماذج اللغات الكبيرة بفعالية.

ملخّص الخطوات:

  • حمِّل DAG "mlops-dag.py" إلى حزمة GCS.
  • انسخ ملفَي إعداد بتنسيق YAML في Kubernetes لإعداد الاستنتاج، وذلك في حزمة GCS.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

def model_serving():
    config.load_incluster_config()
    k8s_apps_v1 = client.AppsV1Api()
    k8s_core_v1 = client.CoreV1Api()

    while True:
        try:
            k8s_apps_v1.delete_namespaced_deployment(
                    namespace="airflow",
                    name="inference-deployment",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Deployment inference-deployment deleted")
    
    with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_apps_v1.create_namespaced_deployment(
            body=dep, namespace="airflow")
        print(f"Deployment created. Status='{resp.metadata.name}'")
    
    while True:
        try:
            k8s_core_v1.delete_namespaced_service(
                    namespace="airflow",
                    name="llm-service",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Service llm-service deleted")

    with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_core_v1.create_namespaced_service(
            body=dep, namespace="airflow")
        print(f"Service created. Status='{resp.metadata.name}'")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 4: Run GKE Deployment for model serving
        model_serving = PythonOperator(
            task_id="model_serving",
            python_callable=model_serving
        )

        dataset_download >> data_preparation >> fine_tuning >> model_serving

حمِّل نص Python البرمجي (ملف DAG)، بالإضافة إلى بيانات Kubernetes إلى حزمة DAGS GCS.

gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}

في واجهة مستخدم Airflow، سيظهر لك mlops-dag.

  1. انقر على "إلغاء الإيقاف المؤقت".
  2. اختَر "تشغيل DAG" لتنفيذ دورة MLOps يدوية.

d537281b92d5e8bb.png

بعد اكتمال DAG، سيظهر لك ناتج مشابه لما يلي في واجهة مستخدم Airflow.

3ed42abf8987384e.png

بعد الخطوة الأخيرة، يمكنك الحصول على نقطة نهاية النموذج وإرسال طلب لاختبار النموذج.

انتظِر حوالي 5 دقائق قبل إصدار أمر curl، حتى يمكن بدء استنتاج النموذج ويمكن لموازنة التحميل تعيين عنوان IP خارجي.

export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')

curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
    "prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
    "temperature": 0.1,
    "top_p": 1.0,
    "max_tokens": 128
}
EOF

إخراج:

19. تهانينا!

لقد أنشأت سير عمل الذكاء الاصطناعي الأول باستخدام مسار DAG مع Airflow 2 على GKE.

لا تنسَ إلغاء توفير الموارد التي نشرتها.

20. إجراء ذلك في مرحلة الإنتاج

على الرغم من أنّ CodeLab قد قدّم لك نظرة رائعة حول كيفية إعداد Airflow 2 على GKE، إلا أنّك ستحتاج إلى مراعاة بعض المواضيع التالية عند إجراء ذلك في مرحلة الإنتاج.

استخدِم واجهة أمامية على الويب باستخدام Gradio أو أدوات مشابهة.

يمكنك إعداد ميزة المراقبة التلقائية للتطبيقات لأحمال العمل باستخدام GKE هنا أو تصدير المقاييس من Airflow هنا.

قد تحتاج إلى وحدات معالجة رسومات أكبر لضبط النموذج بدقة بشكل أسرع، خاصةً إذا كانت لديك مجموعات بيانات أكبر. ومع ذلك، إذا أردنا تدريب النموذج على عدة وحدات معالجة رسومات، علينا تقسيم مجموعة البيانات وتجزئة التدريب. في ما يلي شرح للتوازي الكامل للبيانات المجزأة باستخدام PyTorch (التوازي الكامل للبيانات المجزأة، باستخدام مشاركة وحدة معالجة الرسومات لتحقيق هذا الهدف). يمكنك الاطّلاع على مزيد من المعلومات هنا في منشور مدوّنة من Meta وفي هذا البرنامج التعليمي حول FSDP باستخدام Pytorch.

Google Cloud Composer هي خدمة Airflow مُدارة، لذا ليس عليك صيانة Airflow نفسها، بل عليك فقط نشر DAG والبدء في استخدامها.

مزيد من المعلومات

الترخيص

يخضع هذا العمل لترخيص المشاع الإبداعي مع نسب العمل إلى مؤلفه 2.0 Generic License.