ایجاد گردش کار MLOps با Airflow 2 در GKE

۱. مرور کلی

۸۵۲dc۸۸۴۴۳۰۹ffb۸.png

این CodeLab نحوه ادغام شیوه‌های DevOps در یادگیری ماشینی (MLops) را با دانلود یک مجموعه داده، اصلاح یک مدل و استقرار LLM در موتور Google Kubernetes (GKE) با استفاده از Airflow DAG با کمترین میزان انتزاع نشان می‌دهد. در نتیجه، ما از دستورات gcloud استفاده می‌کنیم و نه terraform تا بتوانید آزمایشگاه را گام به گام دنبال کنید و به راحتی هر فرآیند را از دیدگاه مهندس پلتفرم و مهندس یادگیری ماشین درک کنید.

این راهنمای عملی، شما را در استفاده از Airflow برای ساده‌سازی گردش‌های کاری هوش مصنوعی راهنمایی می‌کند و با پیکربندی یک DAG، نمایشی واضح و عملی از کل چرخه حیات MLOps ارائه می‌دهد.

آنچه یاد خواهید گرفت

  • با تجزیه سیلوهای دانش و بهبود گردش کار، همکاری و درک بیشتری را بین مهندسان پلتفرم و یادگیری ماشین تقویت کنید.
  • درک نحوه استقرار، استفاده و مدیریت Airflow 2 در GKE
  • پیکربندی DAG جریان هوا از ابتدا تا انتها
  • پایه و اساس سیستم‌های یادگیری ماشینی در سطح تولید را با GKE بسازید
  • ابزار دقیق و عملیاتی کردن سیستم‌های یادگیری ماشینی
  • درک کنید که چگونه مهندسی پلتفرم به یک ستون پشتیبانی حیاتی برای MLOps تبدیل شده است

دستاوردهای این CodeLab

  • شما می‌توانید در مورد فیلم‌هایی که توسط یک LLM که بر اساس Gemma-2-9b-it تنظیم شده و در GKE با vLLM ارائه می‌شود، تنظیم شده‌اند، سؤال بپرسید.

مخاطب هدف

  • مهندسان یادگیری ماشین
  • مهندسان پلتفرم
  • دانشمندان داده
  • مهندسان داده
  • مهندسان DevOps
  • معمار پلتفرم
  • مهندسان مشتری

این CodeLab در نظر گرفته نشده است

  • به عنوان مقدمه‌ای بر گردش‌های کاری GKE یا AI/ML
  • به عنوان مروری بر کل مجموعه ویژگی‌های Airflow

۲. مهندسی پلتفرم به مهندسان/دانشمندان یادگیری ماشینی کمک می‌کند

۱۶۶۳۵a۸۲۸۴b۹۹۴c.png

مهندسی پلتفرم و MLOps رشته‌های وابسته به هم هستند که برای ایجاد یک محیط قوی و کارآمد برای توسعه و استقرار ML با یکدیگر همکاری می‌کنند.

محدوده: مهندسی پلتفرم محدوده وسیع‌تری نسبت به MLOps دارد و کل چرخه حیات توسعه نرم‌افزار را در بر می‌گیرد و ابزارها و زیرساخت‌های لازم را برای آن فراهم می‌کند.

MLOps شکاف بین توسعه، استقرار و استنتاج یادگیری ماشینی را پر می‌کند.

تخصص: مهندسان پلتفرم معمولاً تخصص بالایی در فناوری‌های زیرساختی مانند رایانش ابری، کانتینرسازی و مدیریت داده‌ها دارند.

مهندسان MLOps در توسعه، استقرار و نظارت بر مدل یادگیری ماشین تخصص دارند و اغلب دارای مهارت‌های علوم داده و مهندسی نرم‌افزار هستند.

ابزارها: مهندسان پلتفرم ابزارهایی را برای تأمین زیرساخت، مدیریت پیکربندی و تنظیم کانتینر و داربست‌بندی برنامه ایجاد می‌کنند. مهندسان MLOps از ابزارهایی برای آموزش مدل ML، آزمایش، استقرار، نظارت و نسخه‌بندی استفاده می‌کنند.

۳. تنظیمات و الزامات گوگل کلود

تنظیم محیط خودتنظیم

  1. وارد کنسول گوگل کلود شوید و یک پروژه جدید ایجاد کنید یا از یک پروژه موجود دوباره استفاده کنید. اگر از قبل حساب جیمیل یا گوگل ورک اسپیس ندارید، باید یکی ایجاد کنید .

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • نام پروژه، نام نمایشی برای شرکت‌کنندگان این پروژه است. این یک رشته کاراکتری است که توسط APIهای گوگل استفاده نمی‌شود. شما همیشه می‌توانید آن را به‌روزرسانی کنید.
  • شناسه پروژه در تمام پروژه‌های گوگل کلود منحصر به فرد است و تغییرناپذیر است (پس از تنظیم، قابل تغییر نیست). کنسول کلود به طور خودکار یک رشته منحصر به فرد تولید می‌کند؛ معمولاً برای شما مهم نیست که چیست. در اکثر آزمایشگاه‌های کد، باید شناسه پروژه خود را (که معمولاً با عنوان PROJECT_ID شناخته می‌شود) ارجاع دهید. اگر شناسه تولید شده را دوست ندارید، می‌توانید یک شناسه تصادفی دیگر ایجاد کنید. به عنوان یک جایگزین، می‌توانید شناسه خودتان را امتحان کنید و ببینید که آیا در دسترس است یا خیر. پس از این مرحله قابل تغییر نیست و در طول پروژه باقی می‌ماند.
  • برای اطلاع شما، یک مقدار سوم، شماره پروژه ، وجود دارد که برخی از APIها از آن استفاده می‌کنند. برای کسب اطلاعات بیشتر در مورد هر سه این مقادیر، به مستندات مراجعه کنید.
  1. در مرحله بعد، برای استفاده از منابع/API های ابری، باید پرداخت صورتحساب را در کنسول ابری فعال کنید . اجرای این آزمایشگاه کد هزینه زیادی نخواهد داشت، اگر اصلاً هزینه‌ای داشته باشد. برای خاموش کردن منابع به منظور جلوگیری از پرداخت صورتحساب پس از این آموزش، می‌توانید منابعی را که ایجاد کرده‌اید یا پروژه را حذف کنید. کاربران جدید Google Cloud واجد شرایط برنامه آزمایشی رایگان ۳۰۰ دلاری هستند.

شروع پوسته ابری

اگرچه می‌توان گوگل کلود را از راه دور و از طریق لپ‌تاپ شما مدیریت کرد، اما در این آزمایشگاه کد از Cloud Shell ، یک محیط خط فرمان که در فضای ابری اجرا می‌شود، استفاده خواهید کرد.

فعال کردن پوسته ابری

  1. از کنسول ابری، روی فعال کردن پوسته ابری کلیک کنید 853e55310c205094.png .

3c1dabeca90e44e5.png

اگر این اولین باری است که Cloud Shell را اجرا می‌کنید، یک صفحه میانی برای توضیح آن به شما نمایش داده می‌شود. اگر با یک صفحه میانی مواجه شدید، روی ادامه کلیک کنید.

9c92662c6a846a5c.png

آماده‌سازی و اتصال به Cloud Shell فقط چند لحظه طول می‌کشد.

9f0e51b578fecce5.png

این ماشین مجازی مجهز به تمام ابزارهای توسعه مورد نیاز است. این ماشین یک دایرکتوری خانگی پایدار ۵ گیگابایتی ارائه می‌دهد و در فضای ابری گوگل اجرا می‌شود که عملکرد شبکه و احراز هویت را تا حد زیادی افزایش می‌دهد. بخش عمده‌ای از کار شما در این آزمایشگاه کد، اگر نگوییم همه، را می‌توان با یک مرورگر انجام داد.

پس از اتصال به Cloud Shell، باید ببینید که احراز هویت شده‌اید و پروژه روی شناسه پروژه شما تنظیم شده است.

  1. برای تأیید احراز هویت، دستور زیر را در Cloud Shell اجرا کنید:
gcloud auth list

خروجی دستور

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. دستور زیر را در Cloud Shell اجرا کنید تا تأیید کنید که دستور gcloud از پروژه شما اطلاع دارد:
gcloud config list project

خروجی دستور

[core]
project = <PROJECT_ID>

اگر اینطور نیست، می‌توانید با این دستور آن را تنظیم کنید:

gcloud config set project <PROJECT_ID>

خروجی دستور

Updated property [core/project].

۴. مرحله ۱ - ثبت نام و احراز هویت در Kaggle

برای شروع CodeLab، باید در Kaggle یک حساب کاربری ایجاد کنید. Kaggle یک پلتفرم جامعه آنلاین برای دانشمندان داده و علاقه‌مندان به یادگیری ماشین است که متعلق به گوگل بوده و میزبان مخزن گسترده‌ای از مجموعه داده‌های عمومی برای حوزه‌های مختلف است. از این سایت می‌توانید مجموعه داده‌های RottenTomatoes را که برای آموزش مدل شما استفاده می‌شود، دانلود کنید.

  • در Kaggle ثبت نام کنید، می‌توانید از Google SSO برای ورود استفاده کنید.
  • شرایط و ضوابط را بپذیرید
  • به تنظیمات بروید و نام کاربری خود را وارد کنید
  • در بخش API، گزینه «ایجاد توکن جدید از» Kaggle را انتخاب کنید که kaggle.json را دانلود خواهد کرد.
  • اگر مشکلی دارید، به صفحه پشتیبانی اینجا مراجعه کنید

۵. مرحله ۲ - ثبت نام و احراز هویت در HuggingFace

HuggingFace یک مکان مرکزی برای هر کسی است که می‌خواهد با فناوری یادگیری ماشینی درگیر شود. این مکان میزبان ۹۰۰ هزار مدل، ۲۰۰ هزار مجموعه داده و ۳۰۰ هزار برنامه آزمایشی (Spaces) است که همگی متن‌باز و در دسترس عموم هستند.

  • ثبت نام در HuggingFace - یک حساب کاربری با نام کاربری ایجاد کنید، نمی‌توانید از Google SSO استفاده کنید
  • آدرس ایمیل خود را تأیید کنید
  • به اینجا بروید و مجوز مدل Gemma-2-9b-it را بپذیرید
  • یک توکن HuggingFace اینجا ایجاد کنید
  • اطلاعات توکن را ثبت کنید، بعداً به آن نیاز خواهید داشت

۶. مرحله ۳ - منابع زیرساختی مورد نیاز Google Cloud را ایجاد کنید

شما GKE، GCE و رجیستری Artifact را راه‌اندازی خواهید کرد و نقش‌های IAM را با استفاده از فدراسیون هویت بار کاری اعمال خواهید کرد.

گردش کار هوش مصنوعی شما از دو گره (nodepool) استفاده می‌کند، یکی برای آموزش و دیگری برای استنتاج. گره آموزش از یک ماشین مجازی GCE با استاندارد g2-standard-8 مجهز به یک پردازنده گرافیکی Nvidia L4 Tensor Core استفاده می‌کند. گره استنتاج از یک ماشین مجازی g2-standard-24 مجهز به دو پردازنده گرافیکی Nvidia L4 Tensor Core استفاده می‌کند. هنگام مشخص کردن منطقه، یکی را انتخاب کنید که پردازنده گرافیکی مورد نیاز در آن پشتیبانی می‌شود ( لینک ).

در Cloud Shell خود، دستورات زیر را اجرا کنید:

# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")

SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1

# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto

# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount

# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com

# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
  --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-12 \
  --num-nodes=1

# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
  --accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-24 \
  --num-nodes=1

# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a

# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}

مانیفست‌های YAML خود را ایجاد کنید

mkdir manifests
cd manifests

mlops-sa.yaml

apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
  name: airflow-mlops-sa
  namespace: airflow
  labels:
    tier: airflow

pv-dags.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-dags
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 5Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_DAGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pv-logs.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-logs
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 100Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_LOGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pvc-dags.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-dags
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  volumeName: airflow-dags
  storageClassName: standard

pvc-logs.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  volumeName: airflow-logs
  storageClassName: standard

فضای نام.yaml

kind: Namespace
apiVersion: v1
metadata:
  name: airflow
  labels:
    name: airflow

sa-role.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-deployment-role
rules:
- apiGroups: ["apps"] 
  resources: ["deployments"]
  verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]

sa-rolebinding.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow-deployment-rolebinding
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: airflow-worker
  namespace: airflow
roleRef:
  kind: Role
  name: airflow-deployment-role
  apiGroup: rbac.authorization.k8s.io

استنتاج.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-deployment
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gemma-server
  template:
    metadata:
      labels:
        app: gemma-server
        ai.gke.io/model: gemma-2-9b-it
        ai.gke.io/inference-server: vllm
      annotations:
        gke-gcsfuse/volumes: "true"
    spec:
      serviceAccountName: airflow-mlops-sa
      tolerations:
      - key: "nvidia.com/gpu"
        operator: "Exists"
        effect: "NoSchedule"
      - key: "on-demand"
        value: "true"
        operator: "Equal"
        effect: "NoSchedule"
      containers:
      - name: inference-server
        image: vllm/vllm-openai:v0.6.6
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: "2"
          limits:
            nvidia.com/gpu: "2"
        command: ["/bin/sh", "-c"]
        args:
        - |
          python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
        volumeMounts:
        - mountPath: /dev/shm
          name: dshm
        - name: gcs-fuse-csi-ephemeral
          mountPath: /modeldata
          readOnly: true
      volumes:
      - name: dshm
        emptyDir:
          medium: Memory
      - name: gcs-fuse-csi-ephemeral
        csi:
          driver: gcsfuse.csi.storage.gke.io
          volumeAttributes:
            bucketName: BUCKET_DATA_NAME
            mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
            fileCacheCapacity: "20Gi"
            fileCacheForRangeRead: "true"
            metadataStatCacheCapacity: "-1"
            metadataTypeCacheCapacity: "-1"
            metadataCacheTTLSeconds: "-1"
      nodeSelector:
        cloud.google.com/gke-accelerator: nvidia-l4

سرویس استنتاج.yaml

apiVersion: v1
kind: Service
metadata:
  name: llm-service
  namespace: airflow
spec:
  selector:
    app: gemma-server
  type: LoadBalancer
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

سه باکت (bact) فضای ذخیره‌سازی ابری گوگل (GCS) ایجاد کنید

gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}

# Create the namespace in GKE
kubectl apply -f namespace.yaml

# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml

Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation

gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"

۷. مرحله ۴ - نصب Airflow روی GKE از طریق نمودار فرمان

حالا ما Airflow 2 را با استفاده از Helm مستقر می‌کنیم. Apache Airflow یک پلتفرم مدیریت گردش کار متن‌باز برای خطوط لوله مهندسی داده است. بعداً به مجموعه ویژگی‌های Airflow 2 خواهیم پرداخت.

values.yaml برای نمودار جریان هوا

config:
  webserver:
    expose_config: true
webserver:
  service:
    type: LoadBalancer
  podAnnotations:
    gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
  - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
    value: "30"
logs:
  persistence:
    enabled: true
    existingClaim: "airflow-logs"
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
scheduler:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
triggerer:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
workers:
  podAnnotations:
    gke-gcsfuse/volumes: "true"

جریان هوای ۲ را مستقر کنید

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml

۸. مرحله ۵ - مقداردهی اولیه جریان هوا با اتصالات و متغیرها

پس از استقرار Airflow 2، می‌توانیم پیکربندی آن را شروع کنیم. ما چند متغیر تعریف می‌کنیم که توسط اسکریپت‌های پایتون ما خوانده می‌شوند.

  1. با مرورگر خود به رابط کاربری Airflow روی پورت ۸۰۸۰ دسترسی پیدا کنید.

دریافت آی پی خارجی

kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'

یک مرورگر وب باز کنید و به آدرس http:// <EXTERNAL-IP> :8080 بروید. نام کاربری admin / admin است.

  1. یک اتصال پیش‌فرض GCP در رابط کاربری Airflow ایجاد کنید، بنابراین به مسیر Admin → Connections → + Add a new record بروید.
  • شناسه اتصال: google_cloud_default
  • نوع اتصال: گوگل کلود

روی ذخیره کلیک کنید.

  1. متغیرهای مورد نیاز را ایجاد کنید، بنابراین به Admin → Variables → + Add a new record بروید.
  • کلید: BUCKET_DATA_NAME - مقدار: کپی از echo $BUCKET_DATA_NAME
  • کلید: GCP_PROJECT_ID - مقدار: کپی از echo $DEVSHELL_PROJECT_ID
  • کلید: HF_TOKEN - مقدار: توکن HF خود را وارد کنید
  • کلید: KAGGLE_USERNAME - مقدار: نام کاربری kaggle خود را وارد کنید
  • کلید: KAGGLE_KEY - مقدار: این را از kaggle.json کپی کنید

بعد از هر جفت کلید-مقدار، روی ذخیره کلیک کنید.

رابط کاربری شما باید به این شکل باشد:

771121470131b5ec.png

۹. محفظه کد برنامه شماره ۱ - دانلود داده‌ها

در این اسکریپت پایتون، ما با Kaggle احراز هویت می‌کنیم تا مجموعه داده‌ها را در سطل GCS خود دانلود کنیم.

خود اسکریپت به صورت کانتینری درمی‌آید زیرا این واحد DAG شماره ۱ می‌شود و ما انتظار داریم که مجموعه داده‌ها مرتباً به‌روزرسانی شوند، بنابراین می‌خواهیم این کار را خودکار کنیم.

یک دایرکتوری ایجاد کنید و اسکریپت‌های ما را اینجا کپی کنید

cd .. ; mkdir 1-dataset-download
cd 1-dataset-download

دانلود مجموعه داده.py

import os
import kagglehub
from google.cloud import storage

KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")

print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"

upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)

داکرفایل

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]

الزامات.txt

google-cloud-storage==2.19.0
kagglehub==0.3.4

حالا یک تصویر کانتینر برای دانلود مجموعه داده ایجاد می‌کنیم و آن را به رجیستری مصنوعات ارسال می‌کنیم.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest

۱۰. محفظه کد برنامه شماره ۲ - آماده‌سازی داده‌ها

در طول مرحله آماده‌سازی داده‌ها، به این موارد دست می‌یابیم:

  1. مشخص کنید که چه مقدار از مجموعه داده را می‌خواهیم برای تنظیم دقیق مدل پایه خود استفاده کنیم
  2. مجموعه داده‌ها را بارگذاری می‌کند، یعنی فایل CSV را در یک فریم داده Pandas می‌خواند که یک ساختار داده دو بعدی برای سطرها و ستون‌ها است.
  3. تبدیل/پیش‌پردازش داده‌ها - با مشخص کردن بخش‌هایی از مجموعه داده‌ها که می‌خواهیم نگه داریم، مشخص می‌کنیم که کدام بخش‌ها نامربوط هستند، که در واقع حذف بقیه است.
  4. تابع transform را روی هر سطر از DataFrame اعمال می‌کند.
  5. داده‌های آماده‌شده را دوباره در سطل GCS ذخیره کنید.

یک دایرکتوری ایجاد کنید و اسکریپت‌های ما را اینجا کپی کنید

cd .. ; mkdir 2-data-preparation
cd 2-data-preparation

آماده‌سازی داده‌ها.py

import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset

# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100"))  # Process a limited number of rows, used 100 during testing phase but can be increased

DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"

# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")

def transform(data):
    """
    Transforms a row of the DataFrame into the desired format for fine-tuning.

    Args:
      data: A pandas Series representing a row of the DataFrame.

    Returns:
      A dictionary containing the formatted text.
    """ 
    question = f"Review analysis for movie '{data['id']}'"
    context = data['reviewText']
    answer = data['scoreSentiment']
    template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
    return {'text': template.format(question=question, context=context, answer=answer)}

try:
    df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
    print(f"Dataset loaded successfully.")

    # Drop rows with NaN values in relevant columns
    df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])

    # Apply transformation to the DataFrame
    transformed_data = df.apply(transform, axis=1).tolist()

    # Convert transformed data to a DataFrame and then to a Hugging Face Dataset
    transformed_df = pd.DataFrame(transformed_data)
    dataset = Dataset.from_pandas(transformed_df)

    # Save the prepared dataset to JSON lines format
    with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
        for item in dataset:
            f.write(json.dumps(item) + "\n")

    print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
    
except Exception as e:
    print(f"Error during data loading or preprocessing: {e}")
    import traceback
    print(traceback.format_exc())

داکرفایل

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]

الزامات.txt

datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3

# Now we create a container images for data-preparation and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest

11. محفظه کد برنامه شماره 3 - تنظیم دقیق

در اینجا ما از Gemma-2-9b-it به عنوان مدل پایه استفاده می‌کنیم و سپس آن را با مجموعه داده جدید خود تنظیم دقیق می‌کنیم.

اینها توالی مراحلی هستند که در طول مرحله تنظیم دقیق اتفاق می‌افتند.

۱. راه‌اندازی: وارد کردن کتابخانه‌ها، تعریف پارامترها (برای مدل، داده‌ها و آموزش) و بارگذاری مجموعه داده‌ها از فضای ذخیره‌سازی ابری گوگل.

۲. بارگذاری مدل: یک مدل زبانی از پیش آموزش‌دیده با قابلیت کوانتیزاسیون برای افزایش کارایی بارگذاری کنید و توکن‌ساز مربوطه را بارگذاری کنید.

۳. پیکربندی LoRA: تطبیق رتبه پایین (LoRA) را تنظیم کنید تا مدل را با افزودن ماتریس‌های کوچک قابل آموزش، به طور کارآمد تنظیم کنید.

۴. آموزش: پارامترهای آموزشی را تعریف کنید و از SFTTrainer برای تنظیم دقیق مدل روی مجموعه داده بارگذاری شده با استفاده از نوع کوانتیزاسیون FP16 استفاده کنید.

۵. ذخیره و آپلود: مدل تنظیم‌شده و توکن‌ساز را به‌صورت محلی ذخیره کنید، سپس آن‌ها را در مخزن GCS ما آپلود کنید.

سپس با استفاده از Cloud Build یک تصویر کانتینر ایجاد می‌کنیم و آن را در Artifact Registry ذخیره می‌کنیم.

یک دایرکتوری ایجاد کنید و اسکریپت‌های ما را اینجا کپی کنید

cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning

finetuning.py

import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage

# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")

# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
    "json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)

################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))

################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50

################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False

# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
    attn_implementation="eager",
    pretrained_model_name_or_path=model_name,
    torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")

# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
    lora_alpha=lora_alpha,
    lora_dropout=lora_dropout,
    r=lora_r,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
)

# Set training parameters
training_arguments = SFTConfig(
        bf16=bf16,
        dataset_kwargs={
            "add_special_tokens": False,  
            "append_concat_token": False, 
        },
        dataset_text_field="text",
        disable_tqdm=True,
        fp16=fp16,
        gradient_accumulation_steps=gradient_accumulation_steps,
        gradient_checkpointing=gradient_checkpointing,
        gradient_checkpointing_kwargs={"use_reentrant": False},
        group_by_length=group_by_length,
        log_on_each_node=False,
        logging_steps=logging_steps,
        learning_rate=learning_rate,
        lr_scheduler_type=lr_scheduler_type,
        max_grad_norm=max_grad_norm,
        max_seq_length=max_seq_length,
        max_steps=max_steps,
        num_train_epochs=num_train_epochs,
        optim=optim,
        output_dir=save_model_path,
        packing=packing,
        per_device_train_batch_size=per_device_train_batch_size,
        save_strategy=save_strategy,
        save_steps=save_steps,
        save_total_limit=save_total_limit,
        warmup_ratio=warmup_ratio,
        weight_decay=weight_decay,
    )

print(f"Configuring fine tuning completed")

# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset,
    peft_config=peft_config,
    dataset_text_field="text",
    max_seq_length=max_seq_length,
    tokenizer=tokenizer,
    args=training_arguments,
    packing=packing,
)

print(f"Creating trainer completed")

# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")

# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
    low_cpu_mem_usage=True,
    pretrained_model_name_or_path=model_name,
    return_dict=True,
    torch_dtype=torch.float16,
)

model = PeftModel.from_pretrained(
    model=base_model,
    model_id=new_model,
)
model = model.merge_and_unload()

print(f"Merging the new model with base model completed")

accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")

print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
    is_main_process=accelerator.is_main_process,
    save_directory=save_model_path,
    save_function=accelerator.save,
)
print(f"Save unwrapped model completed")

print(f"Save new tokenizer started")
if accelerator.is_main_process:
    tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")

# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
    """Uploads a directory to GCS."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    for root, _, files in os.walk(model_dir):
        for file in files:
            local_file_path = os.path.join(root, file)
            gcs_file_path = os.path.relpath(local_file_path, model_dir)
            blob = bucket.blob(os.path.join(new_model, gcs_file_path))  # Use new_model_name
            blob.upload_from_filename(local_file_path)

# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")

داکرفایل

# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04

# Install necessary system packages
RUN apt-get update && \
    apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements.txt into the container
COPY requirements.txt .

# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Copy your finetune script into the container
COPY finetuning.py .

# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
 
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]

الزامات.txt

accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4

حالا یک تصویر کانتینر برای تنظیم دقیق ایجاد می‌کنیم و آن را به رجیستری مصنوعات (Artifact Registry) ارسال می‌کنیم.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest

۱۲. بررسی اجمالی جریان هوا ۲ شامل DAG چیست؟

Airflow پلتفرمی برای هماهنگ‌سازی گردش‌های کاری و خطوط لوله داده است. این پلتفرم از DAGها (گراف‌های جهت‌دار غیرمدور) برای تعریف این گردش‌های کاری در کد پایتون استفاده می‌کند و وظایف و وابستگی‌های آنها را به صورت بصری نمایش می‌دهد.

Airflow با DAGهای استاتیک و تعاریف مبتنی بر پایتون، برای برنامه‌ریزی و مدیریت گردش‌های کاری از پیش تعریف‌شده بسیار مناسب است. معماری آن شامل یک رابط کاربری کاربرپسند برای نظارت و مدیریت این گردش‌های کاری است.

اساساً، Airflow به شما امکان می‌دهد تا با استفاده از پایتون، خطوط لوله داده خود را تعریف، برنامه‌ریزی و نظارت کنید، و این آن را به ابزاری انعطاف‌پذیر و قدرتمند برای تنظیم گردش کار تبدیل می‌کند.

۱۳. بررسی اجمالی DAG ما

ec49964ad7d61491.png

DAG مخفف عبارت Directed Acyclic Graph است، در Airflow خود DAG نمایانگر کل گردش کار یا خط لوله است. وظایف، وابستگی‌های آنها و ترتیب اجرا را تعریف می‌کند.

واحدهای گردش کار درون DAG از یک پاد (pod) روی کلاستر GKE اجرا می‌شوند که از پیکربندی Airflow آغاز می‌شود.

خلاصه:

جریان هوا: دانلود داده‌ها - این اسکریپت فرآیند دریافت مجموعه داده‌های نقد فیلم از Kaggle و ذخیره آن در سطل GCS شما را خودکار می‌کند و آن را برای پردازش یا تجزیه و تحلیل بیشتر در محیط ابری شما به راحتی در دسترس قرار می‌دهد.

جریان هوا: آماده‌سازی داده‌ها - کد، مجموعه داده‌های خام نقد فیلم را دریافت می‌کند، ستون‌های داده اضافی که برای مورد استفاده ما مورد نیاز نیستند و مجموعه داده‌های دارای مقادیر گمشده را حذف می‌کند. در مرحله بعد، مجموعه داده‌ها را به قالبی برای پاسخ به سوالات مناسب برای یادگیری ماشین تبدیل می‌کند و آن را برای استفاده بعدی در GCS ذخیره می‌کند.

جریان هوا: تنظیم دقیق مدل - این کد با استفاده از تکنیکی به نام LoRA (تطبیق رتبه پایین) یک مدل زبان بزرگ (LLM) را تنظیم دقیق می‌کند و سپس مدل به‌روزرسانی‌شده را ذخیره می‌کند. این کار با بارگذاری یک LLM از پیش آموزش‌دیده و یک مجموعه داده از Google Cloud Storage شروع می‌شود. سپس، LoRA را برای تنظیم دقیق مدل روی این مجموعه داده اعمال می‌کند. در نهایت، مدل تنظیم‌شده را برای استفاده بعدی در برنامه‌هایی مانند تولید متن یا پاسخ به پرسش، دوباره در Google Cloud Storage ذخیره می‌کند.

جریان هوا: سرویس مدل - سرویس مدل تنظیم‌شده روی GKE با vllm برای استنتاج.

جریان هوا: حلقه بازخورد - آموزش مجدد مدل هر xx بار (ساعتی، روزانه، هفتگی).

این نمودار نحوه عملکرد Airflow 2 را هنگام اجرا روی GKE توضیح می‌دهد.

8691f41166209a5d.png

14. تنظیم دقیق یک مدل در مقابل استفاده از RAG

این CodeLab به جای استفاده از بازیابی افزوده نسل (RAG)، یک LLM را به طور دقیق تنظیم می‌کند.

بیایید این دو رویکرد را با هم مقایسه کنیم:

تنظیم دقیق: یک مدل تخصصی ایجاد می‌کند: تنظیم دقیق، LLM را با یک کار یا مجموعه داده خاص تطبیق می‌دهد و به آن اجازه می‌دهد بدون تکیه بر منابع داده خارجی، مستقل عمل کند.

استنتاج را ساده می‌کند: این امر نیاز به یک سیستم بازیابی و پایگاه داده جداگانه را از بین می‌برد و منجر به پاسخ‌های سریع‌تر و ارزان‌تر، به ویژه برای موارد استفاده مکرر، می‌شود.

RAG: متکی بر دانش خارجی: RAG اطلاعات مرتبط را از یک پایگاه دانش برای هر درخواست بازیابی می‌کند و دسترسی به داده‌های به‌روز و خاص را تضمین می‌کند.

افزایش پیچیدگی: پیاده‌سازی RAG در یک محیط عملیاتی مانند کلاستر Kubernetes اغلب شامل چندین میکروسرویس برای پردازش و بازیابی داده‌ها است که به طور بالقوه باعث افزایش تأخیر و هزینه‌های محاسباتی می‌شود.

چرا تنظیم دقیق انتخاب شد:

اگرچه RAG برای مجموعه داده‌های کوچک مورد استفاده در این CodeLab مناسب است، ما برای نشان دادن یک مورد استفاده معمول برای Airflow، تنظیم دقیق را انتخاب کردیم. این انتخاب به ما امکان می‌دهد تا به جای پرداختن به جزئیات راه‌اندازی زیرساخت‌های اضافی و میکروسرویس‌ها برای RAG، بر جنبه‌های هماهنگ‌سازی گردش کار تمرکز کنیم.

نتیجه‌گیری:

هم تنظیم دقیق و هم RAG تکنیک‌های ارزشمندی با نقاط قوت و ضعف خاص خود هستند. انتخاب بهینه به الزامات خاص پروژه شما، مانند اندازه و پیچیدگی داده‌های شما، نیازهای عملکردی و ملاحظات هزینه بستگی دارد.

۱۵. وظیفه شماره ۱ DAG - اولین گام خود را در Airflow ایجاد کنید: دانلود داده‌ها

به عنوان مروری بر این واحد DAG، کد پایتون ما که در یک تصویر کانتینر میزبانی می‌شود، آخرین مجموعه داده RottenTomatoes را از Kaggle دانلود می‌کند.

این کد را در سطل GCS کپی نکنید. ما فایل mlops-dag.py را به عنوان آخرین مرحله کپی می‌کنیم که شامل تمام مراحل واحد DAG در یک اسکریپت پایتون است.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        dataset_download

۱۶. وظیفه شماره ۲ DAG - مرحله دوم خود را در Airflow ایجاد کنید: آماده‌سازی داده‌ها

به عنوان مروری بر این واحد DAG، یک فایل CSV (rotten_tomatoes_movie_reviews.csv) را از GCS در یک DataFrame از Pandas بارگذاری می‌کنیم.

در مرحله بعد، تعداد ردیف‌های پردازش شده را با استفاده از DATASET_LIMIT برای آزمایش و بهره‌وری منابع محدود می‌کنیم و در نهایت داده‌های تبدیل‌شده را به یک مجموعه داده چهره در آغوش گرفته تبدیل می‌کنیم.

اگر با دقت نگاه کنید، خواهید دید که ما در مدل با "DATASET_LIMIT": "1000" در حال آموزش ۱۰۰۰ ردیف هستیم، دلیل این امر این است که انجام این کار روی یک پردازنده گرافیکی Nvidia L4 بیست دقیقه طول می‌کشد.

این کد را در سطل GCS کپی نکنید. ما mlops-dag.py را در آخرین مرحله کپی می‌کنیم که شامل تمام مراحل درون یک اسکریپت پایتون است.

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation

۱۷. وظیفه شماره ۳ DAG - مرحله سوم خود را در مورد جریان هوا ایجاد کنید: تنظیم دقیق مدل

به عنوان مروری بر این واحد DAG، در اینجا finetune.py را اجرا می‌کنیم تا مدل Gemma را با مجموعه داده جدید خود اصلاح کنیم.

این کد را در سطل GCS کپی نکنید. ما mlops-dag.py را در آخرین مرحله کپی می‌کنیم که شامل تمام مراحل درون یک اسکریپت پایتون است.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Task 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Task 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Task 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation >> fine_tuning

۱۸. وظیفه شماره ۴ DAG - ایجاد گام نهایی در مورد جریان هوا: استنتاج / ارائه مدل

vLLM یک کتابخانه متن‌باز قدرتمند است که به‌طور خاص برای استنتاج با عملکرد بالا از LLMها طراحی شده است. هنگامی که بر روی Google Kubernetes Engine (GKE) مستقر می‌شود، از مقیاس‌پذیری و کارایی Kubernetes برای ارائه خدمات مؤثر به LLMها بهره می‌برد.

خلاصه مراحل:

  • فایل DAG با نام "mlops-dag.py" را در مخزن GCS آپلود کنید.
  • دو فایل پیکربندی Kubernetes YAML را برای تنظیم استنتاج، در یک سطل GCS کپی کنید.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

def model_serving():
    config.load_incluster_config()
    k8s_apps_v1 = client.AppsV1Api()
    k8s_core_v1 = client.CoreV1Api()

    while True:
        try:
            k8s_apps_v1.delete_namespaced_deployment(
                    namespace="airflow",
                    name="inference-deployment",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Deployment inference-deployment deleted")
    
    with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_apps_v1.create_namespaced_deployment(
            body=dep, namespace="airflow")
        print(f"Deployment created. Status='{resp.metadata.name}'")
    
    while True:
        try:
            k8s_core_v1.delete_namespaced_service(
                    namespace="airflow",
                    name="llm-service",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Service llm-service deleted")

    with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_core_v1.create_namespaced_service(
            body=dep, namespace="airflow")
        print(f"Service created. Status='{resp.metadata.name}'")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 4: Run GKE Deployment for model serving
        model_serving = PythonOperator(
            task_id="model_serving",
            python_callable=model_serving
        )

        dataset_download >> data_preparation >> fine_tuning >> model_serving

اسکریپت پایتون (فایل DAG) و همچنین مانیفست‌های Kubernetes را در DAGS GCS آپلود کنید.

gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}

در رابط کاربری Airflow، عبارت mlops-dag را مشاهده خواهید کرد.

  1. لغو توقف موقت را انتخاب کنید.
  2. برای انجام یک چرخه دستی MLOps، گزینه Trigger DAG را انتخاب کنید.

d537281b92d5e8bb.png

پس از تکمیل DAG، خروجی مانند این را در رابط کاربری Airflow مشاهده خواهید کرد.

3ed42abf8987384e.png

پس از مرحله نهایی، می‌توانید نقطه پایانی مدل را بگیرید و یک اعلان برای آزمایش مدل ارسال کنید.

قبل از صدور دستور curl تقریباً ۵ دقیقه صبر کنید تا استنتاج مدل آغاز شود و متعادل‌کننده بار بتواند یک آدرس IP خارجی اختصاص دهد.

export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')

curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
    "prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
    "temperature": 0.1,
    "top_p": 1.0,
    "max_tokens": 128
}
EOF

خروجی:

۱۹. تبریک می‌گویم!

شما اولین گردش کار هوش مصنوعی خود را با استفاده از یک خط لوله DAG با Airflow 2 در GKE ایجاد کرده‌اید.

فراموش نکنید که منابعی را که مستقر کرده‌اید، از حالت آماده‌باش خارج کنید.

۲۰. انجام این کار در محیط تولید

اگرچه CodeLab بینش فوق‌العاده‌ای در مورد نحوه راه‌اندازی Airflow 2 در GKE در اختیار شما قرار داده است، اما در دنیای واقعی، هنگام انجام این کار در محیط تولید، باید برخی از مباحث زیر را در نظر بگیرید.

با استفاده از Gradio یا ابزارهای مشابه، یک رابط کاربری وب پیاده‌سازی کنید.

یا می‌توانید نظارت خودکار بر برنامه‌های کاربردی برای بارهای کاری با GKE را از اینجا پیکربندی کنید یا معیارها را از Airflow از اینجا استخراج کنید.

ممکن است برای تنظیم دقیق‌تر مدل، به پردازنده‌های گرافیکی (GPU) بزرگ‌تری نیاز داشته باشید، به‌خصوص اگر مجموعه داده‌های بزرگ‌تری داشته باشید. با این حال، اگر بخواهیم مدل را در چندین پردازنده گرافیکی آموزش دهیم، باید مجموعه داده‌ها را تقسیم کرده و آموزش را به قطعات کوچک تقسیم کنیم. در اینجا توضیحی در مورد FSDP با PyTorch (داده‌های کاملاً خرد شده موازی، با استفاده از اشتراک‌گذاری GPU برای دستیابی به این هدف) ارائه شده است. برای مطالعه بیشتر می‌توانید به یک پست وبلاگ از Meta و پست دیگری در این آموزش در مورد FSDP با استفاده از Pytorch مراجعه کنید.

Google Cloud Composer یک سرویس مدیریت‌شده‌ی Airflow است، بنابراین نیازی به نگهداری خود Airflow ندارید، فقط DAG خود را مستقر کنید و کار خود را انجام دهید.

بیشتر بدانید

مجوز

این اثر تحت مجوز عمومی Creative Commons Attribution 2.0 منتشر شده است.