۱. مرور کلی

این CodeLab نحوه ادغام شیوههای DevOps در یادگیری ماشینی (MLops) را با دانلود یک مجموعه داده، اصلاح یک مدل و استقرار LLM در موتور Google Kubernetes (GKE) با استفاده از Airflow DAG با کمترین میزان انتزاع نشان میدهد. در نتیجه، ما از دستورات gcloud استفاده میکنیم و نه terraform تا بتوانید آزمایشگاه را گام به گام دنبال کنید و به راحتی هر فرآیند را از دیدگاه مهندس پلتفرم و مهندس یادگیری ماشین درک کنید.
این راهنمای عملی، شما را در استفاده از Airflow برای سادهسازی گردشهای کاری هوش مصنوعی راهنمایی میکند و با پیکربندی یک DAG، نمایشی واضح و عملی از کل چرخه حیات MLOps ارائه میدهد.
آنچه یاد خواهید گرفت
- با تجزیه سیلوهای دانش و بهبود گردش کار، همکاری و درک بیشتری را بین مهندسان پلتفرم و یادگیری ماشین تقویت کنید.
- درک نحوه استقرار، استفاده و مدیریت Airflow 2 در GKE
- پیکربندی DAG جریان هوا از ابتدا تا انتها
- پایه و اساس سیستمهای یادگیری ماشینی در سطح تولید را با GKE بسازید
- ابزار دقیق و عملیاتی کردن سیستمهای یادگیری ماشینی
- درک کنید که چگونه مهندسی پلتفرم به یک ستون پشتیبانی حیاتی برای MLOps تبدیل شده است
دستاوردهای این CodeLab
- شما میتوانید در مورد فیلمهایی که توسط یک LLM که بر اساس Gemma-2-9b-it تنظیم شده و در GKE با vLLM ارائه میشود، تنظیم شدهاند، سؤال بپرسید.
مخاطب هدف
- مهندسان یادگیری ماشین
- مهندسان پلتفرم
- دانشمندان داده
- مهندسان داده
- مهندسان DevOps
- معمار پلتفرم
- مهندسان مشتری
این CodeLab در نظر گرفته نشده است
- به عنوان مقدمهای بر گردشهای کاری GKE یا AI/ML
- به عنوان مروری بر کل مجموعه ویژگیهای Airflow
۲. مهندسی پلتفرم به مهندسان/دانشمندان یادگیری ماشینی کمک میکند

مهندسی پلتفرم و MLOps رشتههای وابسته به هم هستند که برای ایجاد یک محیط قوی و کارآمد برای توسعه و استقرار ML با یکدیگر همکاری میکنند.
محدوده: مهندسی پلتفرم محدوده وسیعتری نسبت به MLOps دارد و کل چرخه حیات توسعه نرمافزار را در بر میگیرد و ابزارها و زیرساختهای لازم را برای آن فراهم میکند.
MLOps شکاف بین توسعه، استقرار و استنتاج یادگیری ماشینی را پر میکند.
تخصص: مهندسان پلتفرم معمولاً تخصص بالایی در فناوریهای زیرساختی مانند رایانش ابری، کانتینرسازی و مدیریت دادهها دارند.
مهندسان MLOps در توسعه، استقرار و نظارت بر مدل یادگیری ماشین تخصص دارند و اغلب دارای مهارتهای علوم داده و مهندسی نرمافزار هستند.
ابزارها: مهندسان پلتفرم ابزارهایی را برای تأمین زیرساخت، مدیریت پیکربندی و تنظیم کانتینر و داربستبندی برنامه ایجاد میکنند. مهندسان MLOps از ابزارهایی برای آموزش مدل ML، آزمایش، استقرار، نظارت و نسخهبندی استفاده میکنند.
۳. تنظیمات و الزامات گوگل کلود
تنظیم محیط خودتنظیم
- وارد کنسول گوگل کلود شوید و یک پروژه جدید ایجاد کنید یا از یک پروژه موجود دوباره استفاده کنید. اگر از قبل حساب جیمیل یا گوگل ورک اسپیس ندارید، باید یکی ایجاد کنید .



- نام پروژه، نام نمایشی برای شرکتکنندگان این پروژه است. این یک رشته کاراکتری است که توسط APIهای گوگل استفاده نمیشود. شما همیشه میتوانید آن را بهروزرسانی کنید.
- شناسه پروژه در تمام پروژههای گوگل کلود منحصر به فرد است و تغییرناپذیر است (پس از تنظیم، قابل تغییر نیست). کنسول کلود به طور خودکار یک رشته منحصر به فرد تولید میکند؛ معمولاً برای شما مهم نیست که چیست. در اکثر آزمایشگاههای کد، باید شناسه پروژه خود را (که معمولاً با عنوان
PROJECT_IDشناخته میشود) ارجاع دهید. اگر شناسه تولید شده را دوست ندارید، میتوانید یک شناسه تصادفی دیگر ایجاد کنید. به عنوان یک جایگزین، میتوانید شناسه خودتان را امتحان کنید و ببینید که آیا در دسترس است یا خیر. پس از این مرحله قابل تغییر نیست و در طول پروژه باقی میماند. - برای اطلاع شما، یک مقدار سوم، شماره پروژه ، وجود دارد که برخی از APIها از آن استفاده میکنند. برای کسب اطلاعات بیشتر در مورد هر سه این مقادیر، به مستندات مراجعه کنید.
- در مرحله بعد، برای استفاده از منابع/API های ابری، باید پرداخت صورتحساب را در کنسول ابری فعال کنید . اجرای این آزمایشگاه کد هزینه زیادی نخواهد داشت، اگر اصلاً هزینهای داشته باشد. برای خاموش کردن منابع به منظور جلوگیری از پرداخت صورتحساب پس از این آموزش، میتوانید منابعی را که ایجاد کردهاید یا پروژه را حذف کنید. کاربران جدید Google Cloud واجد شرایط برنامه آزمایشی رایگان ۳۰۰ دلاری هستند.
شروع پوسته ابری
اگرچه میتوان گوگل کلود را از راه دور و از طریق لپتاپ شما مدیریت کرد، اما در این آزمایشگاه کد از Cloud Shell ، یک محیط خط فرمان که در فضای ابری اجرا میشود، استفاده خواهید کرد.
فعال کردن پوسته ابری
- از کنسول ابری، روی فعال کردن پوسته ابری کلیک کنید
.

اگر این اولین باری است که Cloud Shell را اجرا میکنید، یک صفحه میانی برای توضیح آن به شما نمایش داده میشود. اگر با یک صفحه میانی مواجه شدید، روی ادامه کلیک کنید.

آمادهسازی و اتصال به Cloud Shell فقط چند لحظه طول میکشد.

این ماشین مجازی مجهز به تمام ابزارهای توسعه مورد نیاز است. این ماشین یک دایرکتوری خانگی پایدار ۵ گیگابایتی ارائه میدهد و در فضای ابری گوگل اجرا میشود که عملکرد شبکه و احراز هویت را تا حد زیادی افزایش میدهد. بخش عمدهای از کار شما در این آزمایشگاه کد، اگر نگوییم همه، را میتوان با یک مرورگر انجام داد.
پس از اتصال به Cloud Shell، باید ببینید که احراز هویت شدهاید و پروژه روی شناسه پروژه شما تنظیم شده است.
- برای تأیید احراز هویت، دستور زیر را در Cloud Shell اجرا کنید:
gcloud auth list
خروجی دستور
Credentialed Accounts
ACTIVE ACCOUNT
* <my_account>@<my_domain.com>
To set the active account, run:
$ gcloud config set account `ACCOUNT`
- دستور زیر را در Cloud Shell اجرا کنید تا تأیید کنید که دستور gcloud از پروژه شما اطلاع دارد:
gcloud config list project
خروجی دستور
[core] project = <PROJECT_ID>
اگر اینطور نیست، میتوانید با این دستور آن را تنظیم کنید:
gcloud config set project <PROJECT_ID>
خروجی دستور
Updated property [core/project].
۴. مرحله ۱ - ثبت نام و احراز هویت در Kaggle
برای شروع CodeLab، باید در Kaggle یک حساب کاربری ایجاد کنید. Kaggle یک پلتفرم جامعه آنلاین برای دانشمندان داده و علاقهمندان به یادگیری ماشین است که متعلق به گوگل بوده و میزبان مخزن گستردهای از مجموعه دادههای عمومی برای حوزههای مختلف است. از این سایت میتوانید مجموعه دادههای RottenTomatoes را که برای آموزش مدل شما استفاده میشود، دانلود کنید.
۵. مرحله ۲ - ثبت نام و احراز هویت در HuggingFace
HuggingFace یک مکان مرکزی برای هر کسی است که میخواهد با فناوری یادگیری ماشینی درگیر شود. این مکان میزبان ۹۰۰ هزار مدل، ۲۰۰ هزار مجموعه داده و ۳۰۰ هزار برنامه آزمایشی (Spaces) است که همگی متنباز و در دسترس عموم هستند.
- ثبت نام در HuggingFace - یک حساب کاربری با نام کاربری ایجاد کنید، نمیتوانید از Google SSO استفاده کنید
- آدرس ایمیل خود را تأیید کنید
- به اینجا بروید و مجوز مدل Gemma-2-9b-it را بپذیرید
- یک توکن HuggingFace اینجا ایجاد کنید
- اطلاعات توکن را ثبت کنید، بعداً به آن نیاز خواهید داشت
۶. مرحله ۳ - منابع زیرساختی مورد نیاز Google Cloud را ایجاد کنید
شما GKE، GCE و رجیستری Artifact را راهاندازی خواهید کرد و نقشهای IAM را با استفاده از فدراسیون هویت بار کاری اعمال خواهید کرد.
گردش کار هوش مصنوعی شما از دو گره (nodepool) استفاده میکند، یکی برای آموزش و دیگری برای استنتاج. گره آموزش از یک ماشین مجازی GCE با استاندارد g2-standard-8 مجهز به یک پردازنده گرافیکی Nvidia L4 Tensor Core استفاده میکند. گره استنتاج از یک ماشین مجازی g2-standard-24 مجهز به دو پردازنده گرافیکی Nvidia L4 Tensor Core استفاده میکند. هنگام مشخص کردن منطقه، یکی را انتخاب کنید که پردازنده گرافیکی مورد نیاز در آن پشتیبانی میشود ( لینک ).
در Cloud Shell خود، دستورات زیر را اجرا کنید:
# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")
SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1
# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto
# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount
# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com
# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
--accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-12 \
--num-nodes=1
# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
--accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-24 \
--num-nodes=1
# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a
# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}
مانیفستهای YAML خود را ایجاد کنید
mkdir manifests
cd manifests
mlops-sa.yaml
apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
name: airflow-mlops-sa
namespace: airflow
labels:
tier: airflow
pv-dags.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-dags
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 5Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_DAGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pv-logs.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-logs
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 100Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_LOGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pvc-dags.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-dags
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
volumeName: airflow-dags
storageClassName: standard
pvc-logs.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-logs
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
volumeName: airflow-logs
storageClassName: standard
فضای نام.yaml
kind: Namespace
apiVersion: v1
metadata:
name: airflow
labels:
name: airflow
sa-role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: airflow
name: airflow-deployment-role
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["services"]
verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]
sa-rolebinding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow-deployment-rolebinding
namespace: airflow
subjects:
- kind: ServiceAccount
name: airflow-worker
namespace: airflow
roleRef:
kind: Role
name: airflow-deployment-role
apiGroup: rbac.authorization.k8s.io
استنتاج.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: inference-deployment
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: gemma-server
template:
metadata:
labels:
app: gemma-server
ai.gke.io/model: gemma-2-9b-it
ai.gke.io/inference-server: vllm
annotations:
gke-gcsfuse/volumes: "true"
spec:
serviceAccountName: airflow-mlops-sa
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
- key: "on-demand"
value: "true"
operator: "Equal"
effect: "NoSchedule"
containers:
- name: inference-server
image: vllm/vllm-openai:v0.6.6
ports:
- containerPort: 8000
resources:
requests:
nvidia.com/gpu: "2"
limits:
nvidia.com/gpu: "2"
command: ["/bin/sh", "-c"]
args:
- |
python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
volumeMounts:
- mountPath: /dev/shm
name: dshm
- name: gcs-fuse-csi-ephemeral
mountPath: /modeldata
readOnly: true
volumes:
- name: dshm
emptyDir:
medium: Memory
- name: gcs-fuse-csi-ephemeral
csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
bucketName: BUCKET_DATA_NAME
mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
fileCacheCapacity: "20Gi"
fileCacheForRangeRead: "true"
metadataStatCacheCapacity: "-1"
metadataTypeCacheCapacity: "-1"
metadataCacheTTLSeconds: "-1"
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-l4
سرویس استنتاج.yaml
apiVersion: v1
kind: Service
metadata:
name: llm-service
namespace: airflow
spec:
selector:
app: gemma-server
type: LoadBalancer
ports:
- protocol: TCP
port: 8000
targetPort: 8000
سه باکت (bact) فضای ذخیرهسازی ابری گوگل (GCS) ایجاد کنید
gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}
# Create the namespace in GKE
kubectl apply -f namespace.yaml
# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml
Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"
۷. مرحله ۴ - نصب Airflow روی GKE از طریق نمودار فرمان
حالا ما Airflow 2 را با استفاده از Helm مستقر میکنیم. Apache Airflow یک پلتفرم مدیریت گردش کار متنباز برای خطوط لوله مهندسی داده است. بعداً به مجموعه ویژگیهای Airflow 2 خواهیم پرداخت.
values.yaml برای نمودار جریان هوا
config:
webserver:
expose_config: true
webserver:
service:
type: LoadBalancer
podAnnotations:
gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
- name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
value: "30"
logs:
persistence:
enabled: true
existingClaim: "airflow-logs"
dags:
persistence:
enabled: true
existingClaim: "airflow-dags"
scheduler:
podAnnotations:
gke-gcsfuse/volumes: "true"
triggerer:
podAnnotations:
gke-gcsfuse/volumes: "true"
workers:
podAnnotations:
gke-gcsfuse/volumes: "true"
جریان هوای ۲ را مستقر کنید
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml
۸. مرحله ۵ - مقداردهی اولیه جریان هوا با اتصالات و متغیرها
پس از استقرار Airflow 2، میتوانیم پیکربندی آن را شروع کنیم. ما چند متغیر تعریف میکنیم که توسط اسکریپتهای پایتون ما خوانده میشوند.
- با مرورگر خود به رابط کاربری Airflow روی پورت ۸۰۸۰ دسترسی پیدا کنید.
دریافت آی پی خارجی
kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'
یک مرورگر وب باز کنید و به آدرس http:// <EXTERNAL-IP> :8080 بروید. نام کاربری admin / admin است.
- یک اتصال پیشفرض GCP در رابط کاربری Airflow ایجاد کنید، بنابراین به مسیر Admin → Connections → + Add a new record بروید.
- شناسه اتصال: google_cloud_default
- نوع اتصال: گوگل کلود
روی ذخیره کلیک کنید.
- متغیرهای مورد نیاز را ایجاد کنید، بنابراین به Admin → Variables → + Add a new record بروید.
- کلید: BUCKET_DATA_NAME - مقدار: کپی از echo $BUCKET_DATA_NAME
- کلید: GCP_PROJECT_ID - مقدار: کپی از echo $DEVSHELL_PROJECT_ID
- کلید: HF_TOKEN - مقدار: توکن HF خود را وارد کنید
- کلید: KAGGLE_USERNAME - مقدار: نام کاربری kaggle خود را وارد کنید
- کلید: KAGGLE_KEY - مقدار: این را از kaggle.json کپی کنید
بعد از هر جفت کلید-مقدار، روی ذخیره کلیک کنید.
رابط کاربری شما باید به این شکل باشد:

۹. محفظه کد برنامه شماره ۱ - دانلود دادهها
در این اسکریپت پایتون، ما با Kaggle احراز هویت میکنیم تا مجموعه دادهها را در سطل GCS خود دانلود کنیم.
خود اسکریپت به صورت کانتینری درمیآید زیرا این واحد DAG شماره ۱ میشود و ما انتظار داریم که مجموعه دادهها مرتباً بهروزرسانی شوند، بنابراین میخواهیم این کار را خودکار کنیم.
یک دایرکتوری ایجاد کنید و اسکریپتهای ما را اینجا کپی کنید
cd .. ; mkdir 1-dataset-download
cd 1-dataset-download
دانلود مجموعه داده.py
import os
import kagglehub
from google.cloud import storage
KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
def upload_blob(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")
print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"
upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)
داکرفایل
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]
الزامات.txt
google-cloud-storage==2.19.0
kagglehub==0.3.4
حالا یک تصویر کانتینر برای دانلود مجموعه داده ایجاد میکنیم و آن را به رجیستری مصنوعات ارسال میکنیم.
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest
۱۰. محفظه کد برنامه شماره ۲ - آمادهسازی دادهها
در طول مرحله آمادهسازی دادهها، به این موارد دست مییابیم:
- مشخص کنید که چه مقدار از مجموعه داده را میخواهیم برای تنظیم دقیق مدل پایه خود استفاده کنیم
- مجموعه دادهها را بارگذاری میکند، یعنی فایل CSV را در یک فریم داده Pandas میخواند که یک ساختار داده دو بعدی برای سطرها و ستونها است.
- تبدیل/پیشپردازش دادهها - با مشخص کردن بخشهایی از مجموعه دادهها که میخواهیم نگه داریم، مشخص میکنیم که کدام بخشها نامربوط هستند، که در واقع حذف بقیه است.
- تابع
transformرا روی هر سطر از DataFrame اعمال میکند. - دادههای آمادهشده را دوباره در سطل GCS ذخیره کنید.
یک دایرکتوری ایجاد کنید و اسکریپتهای ما را اینجا کپی کنید
cd .. ; mkdir 2-data-preparation
cd 2-data-preparation
آمادهسازی دادهها.py
import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset
# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100")) # Process a limited number of rows, used 100 during testing phase but can be increased
DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"
# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")
def transform(data):
"""
Transforms a row of the DataFrame into the desired format for fine-tuning.
Args:
data: A pandas Series representing a row of the DataFrame.
Returns:
A dictionary containing the formatted text.
"""
question = f"Review analysis for movie '{data['id']}'"
context = data['reviewText']
answer = data['scoreSentiment']
template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
return {'text': template.format(question=question, context=context, answer=answer)}
try:
df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
print(f"Dataset loaded successfully.")
# Drop rows with NaN values in relevant columns
df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])
# Apply transformation to the DataFrame
transformed_data = df.apply(transform, axis=1).tolist()
# Convert transformed data to a DataFrame and then to a Hugging Face Dataset
transformed_df = pd.DataFrame(transformed_data)
dataset = Dataset.from_pandas(transformed_df)
# Save the prepared dataset to JSON lines format
with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
for item in dataset:
f.write(json.dumps(item) + "\n")
print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
except Exception as e:
print(f"Error during data loading or preprocessing: {e}")
import traceback
print(traceback.format_exc())
داکرفایل
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]
الزامات.txt
datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3
# Now we create a container images for data-preparation and push it to the Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest
11. محفظه کد برنامه شماره 3 - تنظیم دقیق
در اینجا ما از Gemma-2-9b-it به عنوان مدل پایه استفاده میکنیم و سپس آن را با مجموعه داده جدید خود تنظیم دقیق میکنیم.
اینها توالی مراحلی هستند که در طول مرحله تنظیم دقیق اتفاق میافتند.
۱. راهاندازی: وارد کردن کتابخانهها، تعریف پارامترها (برای مدل، دادهها و آموزش) و بارگذاری مجموعه دادهها از فضای ذخیرهسازی ابری گوگل.
۲. بارگذاری مدل: یک مدل زبانی از پیش آموزشدیده با قابلیت کوانتیزاسیون برای افزایش کارایی بارگذاری کنید و توکنساز مربوطه را بارگذاری کنید.
۳. پیکربندی LoRA: تطبیق رتبه پایین (LoRA) را تنظیم کنید تا مدل را با افزودن ماتریسهای کوچک قابل آموزش، به طور کارآمد تنظیم کنید.
۴. آموزش: پارامترهای آموزشی را تعریف کنید و از SFTTrainer برای تنظیم دقیق مدل روی مجموعه داده بارگذاری شده با استفاده از نوع کوانتیزاسیون FP16 استفاده کنید.
۵. ذخیره و آپلود: مدل تنظیمشده و توکنساز را بهصورت محلی ذخیره کنید، سپس آنها را در مخزن GCS ما آپلود کنید.
سپس با استفاده از Cloud Build یک تصویر کانتینر ایجاد میکنیم و آن را در Artifact Registry ذخیره میکنیم.
یک دایرکتوری ایجاد کنید و اسکریپتهای ما را اینجا کپی کنید
cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning
finetuning.py
import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage
# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")
# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")
# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
"json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)
################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))
################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03
# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50
################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False
# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
attn_implementation="eager",
pretrained_model_name_or_path=model_name,
torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")
# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
r=lora_r,
bias="none",
task_type="CAUSAL_LM",
target_modules=[
"q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj",
],
)
# Set training parameters
training_arguments = SFTConfig(
bf16=bf16,
dataset_kwargs={
"add_special_tokens": False,
"append_concat_token": False,
},
dataset_text_field="text",
disable_tqdm=True,
fp16=fp16,
gradient_accumulation_steps=gradient_accumulation_steps,
gradient_checkpointing=gradient_checkpointing,
gradient_checkpointing_kwargs={"use_reentrant": False},
group_by_length=group_by_length,
log_on_each_node=False,
logging_steps=logging_steps,
learning_rate=learning_rate,
lr_scheduler_type=lr_scheduler_type,
max_grad_norm=max_grad_norm,
max_seq_length=max_seq_length,
max_steps=max_steps,
num_train_epochs=num_train_epochs,
optim=optim,
output_dir=save_model_path,
packing=packing,
per_device_train_batch_size=per_device_train_batch_size,
save_strategy=save_strategy,
save_steps=save_steps,
save_total_limit=save_total_limit,
warmup_ratio=warmup_ratio,
weight_decay=weight_decay,
)
print(f"Configuring fine tuning completed")
# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
model=model,
train_dataset=dataset,
peft_config=peft_config,
dataset_text_field="text",
max_seq_length=max_seq_length,
tokenizer=tokenizer,
args=training_arguments,
packing=packing,
)
print(f"Creating trainer completed")
# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")
# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")
# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
low_cpu_mem_usage=True,
pretrained_model_name_or_path=model_name,
return_dict=True,
torch_dtype=torch.float16,
)
model = PeftModel.from_pretrained(
model=base_model,
model_id=new_model,
)
model = model.merge_and_unload()
print(f"Merging the new model with base model completed")
accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")
print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
is_main_process=accelerator.is_main_process,
save_directory=save_model_path,
save_function=accelerator.save,
)
print(f"Save unwrapped model completed")
print(f"Save new tokenizer started")
if accelerator.is_main_process:
tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")
# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
"""Uploads a directory to GCS."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
for root, _, files in os.walk(model_dir):
for file in files:
local_file_path = os.path.join(root, file)
gcs_file_path = os.path.relpath(local_file_path, model_dir)
blob = bucket.blob(os.path.join(new_model, gcs_file_path)) # Use new_model_name
blob.upload_from_filename(local_file_path)
# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")
داکرفایل
# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04
# Install necessary system packages
RUN apt-get update && \
apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
rm -rf /var/lib/apt/lists/*
# Copy requirements.txt into the container
COPY requirements.txt .
# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
# Copy your finetune script into the container
COPY finetuning.py .
# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]
الزامات.txt
accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4
حالا یک تصویر کانتینر برای تنظیم دقیق ایجاد میکنیم و آن را به رجیستری مصنوعات (Artifact Registry) ارسال میکنیم.
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest
۱۲. بررسی اجمالی جریان هوا ۲ شامل DAG چیست؟
Airflow پلتفرمی برای هماهنگسازی گردشهای کاری و خطوط لوله داده است. این پلتفرم از DAGها (گرافهای جهتدار غیرمدور) برای تعریف این گردشهای کاری در کد پایتون استفاده میکند و وظایف و وابستگیهای آنها را به صورت بصری نمایش میدهد.
Airflow با DAGهای استاتیک و تعاریف مبتنی بر پایتون، برای برنامهریزی و مدیریت گردشهای کاری از پیش تعریفشده بسیار مناسب است. معماری آن شامل یک رابط کاربری کاربرپسند برای نظارت و مدیریت این گردشهای کاری است.
اساساً، Airflow به شما امکان میدهد تا با استفاده از پایتون، خطوط لوله داده خود را تعریف، برنامهریزی و نظارت کنید، و این آن را به ابزاری انعطافپذیر و قدرتمند برای تنظیم گردش کار تبدیل میکند.
۱۳. بررسی اجمالی DAG ما

DAG مخفف عبارت Directed Acyclic Graph است، در Airflow خود DAG نمایانگر کل گردش کار یا خط لوله است. وظایف، وابستگیهای آنها و ترتیب اجرا را تعریف میکند.
واحدهای گردش کار درون DAG از یک پاد (pod) روی کلاستر GKE اجرا میشوند که از پیکربندی Airflow آغاز میشود.
خلاصه:
جریان هوا: دانلود دادهها - این اسکریپت فرآیند دریافت مجموعه دادههای نقد فیلم از Kaggle و ذخیره آن در سطل GCS شما را خودکار میکند و آن را برای پردازش یا تجزیه و تحلیل بیشتر در محیط ابری شما به راحتی در دسترس قرار میدهد.
جریان هوا: آمادهسازی دادهها - کد، مجموعه دادههای خام نقد فیلم را دریافت میکند، ستونهای داده اضافی که برای مورد استفاده ما مورد نیاز نیستند و مجموعه دادههای دارای مقادیر گمشده را حذف میکند. در مرحله بعد، مجموعه دادهها را به قالبی برای پاسخ به سوالات مناسب برای یادگیری ماشین تبدیل میکند و آن را برای استفاده بعدی در GCS ذخیره میکند.
جریان هوا: تنظیم دقیق مدل - این کد با استفاده از تکنیکی به نام LoRA (تطبیق رتبه پایین) یک مدل زبان بزرگ (LLM) را تنظیم دقیق میکند و سپس مدل بهروزرسانیشده را ذخیره میکند. این کار با بارگذاری یک LLM از پیش آموزشدیده و یک مجموعه داده از Google Cloud Storage شروع میشود. سپس، LoRA را برای تنظیم دقیق مدل روی این مجموعه داده اعمال میکند. در نهایت، مدل تنظیمشده را برای استفاده بعدی در برنامههایی مانند تولید متن یا پاسخ به پرسش، دوباره در Google Cloud Storage ذخیره میکند.
جریان هوا: سرویس مدل - سرویس مدل تنظیمشده روی GKE با vllm برای استنتاج.
جریان هوا: حلقه بازخورد - آموزش مجدد مدل هر xx بار (ساعتی، روزانه، هفتگی).
این نمودار نحوه عملکرد Airflow 2 را هنگام اجرا روی GKE توضیح میدهد.

14. تنظیم دقیق یک مدل در مقابل استفاده از RAG
این CodeLab به جای استفاده از بازیابی افزوده نسل (RAG)، یک LLM را به طور دقیق تنظیم میکند.
بیایید این دو رویکرد را با هم مقایسه کنیم:
تنظیم دقیق: یک مدل تخصصی ایجاد میکند: تنظیم دقیق، LLM را با یک کار یا مجموعه داده خاص تطبیق میدهد و به آن اجازه میدهد بدون تکیه بر منابع داده خارجی، مستقل عمل کند.
استنتاج را ساده میکند: این امر نیاز به یک سیستم بازیابی و پایگاه داده جداگانه را از بین میبرد و منجر به پاسخهای سریعتر و ارزانتر، به ویژه برای موارد استفاده مکرر، میشود.
RAG: متکی بر دانش خارجی: RAG اطلاعات مرتبط را از یک پایگاه دانش برای هر درخواست بازیابی میکند و دسترسی به دادههای بهروز و خاص را تضمین میکند.
افزایش پیچیدگی: پیادهسازی RAG در یک محیط عملیاتی مانند کلاستر Kubernetes اغلب شامل چندین میکروسرویس برای پردازش و بازیابی دادهها است که به طور بالقوه باعث افزایش تأخیر و هزینههای محاسباتی میشود.
چرا تنظیم دقیق انتخاب شد:
اگرچه RAG برای مجموعه دادههای کوچک مورد استفاده در این CodeLab مناسب است، ما برای نشان دادن یک مورد استفاده معمول برای Airflow، تنظیم دقیق را انتخاب کردیم. این انتخاب به ما امکان میدهد تا به جای پرداختن به جزئیات راهاندازی زیرساختهای اضافی و میکروسرویسها برای RAG، بر جنبههای هماهنگسازی گردش کار تمرکز کنیم.
نتیجهگیری:
هم تنظیم دقیق و هم RAG تکنیکهای ارزشمندی با نقاط قوت و ضعف خاص خود هستند. انتخاب بهینه به الزامات خاص پروژه شما، مانند اندازه و پیچیدگی دادههای شما، نیازهای عملکردی و ملاحظات هزینه بستگی دارد.
۱۵. وظیفه شماره ۱ DAG - اولین گام خود را در Airflow ایجاد کنید: دانلود دادهها
به عنوان مروری بر این واحد DAG، کد پایتون ما که در یک تصویر کانتینر میزبانی میشود، آخرین مجموعه داده RottenTomatoes را از Kaggle دانلود میکند.
این کد را در سطل GCS کپی نکنید. ما فایل mlops-dag.py را به عنوان آخرین مرحله کپی میکنیم که شامل تمام مراحل واحد DAG در یک اسکریپت پایتون است.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
dataset_download
۱۶. وظیفه شماره ۲ DAG - مرحله دوم خود را در Airflow ایجاد کنید: آمادهسازی دادهها
به عنوان مروری بر این واحد DAG، یک فایل CSV (rotten_tomatoes_movie_reviews.csv) را از GCS در یک DataFrame از Pandas بارگذاری میکنیم.
در مرحله بعد، تعداد ردیفهای پردازش شده را با استفاده از DATASET_LIMIT برای آزمایش و بهرهوری منابع محدود میکنیم و در نهایت دادههای تبدیلشده را به یک مجموعه داده چهره در آغوش گرفته تبدیل میکنیم.
اگر با دقت نگاه کنید، خواهید دید که ما در مدل با "DATASET_LIMIT": "1000" در حال آموزش ۱۰۰۰ ردیف هستیم، دلیل این امر این است که انجام این کار روی یک پردازنده گرافیکی Nvidia L4 بیست دقیقه طول میکشد.
این کد را در سطل GCS کپی نکنید. ما mlops-dag.py را در آخرین مرحله کپی میکنیم که شامل تمام مراحل درون یک اسکریپت پایتون است.
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation
۱۷. وظیفه شماره ۳ DAG - مرحله سوم خود را در مورد جریان هوا ایجاد کنید: تنظیم دقیق مدل
به عنوان مروری بر این واحد DAG، در اینجا finetune.py را اجرا میکنیم تا مدل Gemma را با مجموعه داده جدید خود اصلاح کنیم.
این کد را در سطل GCS کپی نکنید. ما mlops-dag.py را در آخرین مرحله کپی میکنیم که شامل تمام مراحل درون یک اسکریپت پایتون است.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Task 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Task 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Task 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation >> fine_tuning
۱۸. وظیفه شماره ۴ DAG - ایجاد گام نهایی در مورد جریان هوا: استنتاج / ارائه مدل
vLLM یک کتابخانه متنباز قدرتمند است که بهطور خاص برای استنتاج با عملکرد بالا از LLMها طراحی شده است. هنگامی که بر روی Google Kubernetes Engine (GKE) مستقر میشود، از مقیاسپذیری و کارایی Kubernetes برای ارائه خدمات مؤثر به LLMها بهره میبرد.
خلاصه مراحل:
- فایل DAG با نام "mlops-dag.py" را در مخزن GCS آپلود کنید.
- دو فایل پیکربندی Kubernetes YAML را برای تنظیم استنتاج، در یک سطل GCS کپی کنید.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
def model_serving():
config.load_incluster_config()
k8s_apps_v1 = client.AppsV1Api()
k8s_core_v1 = client.CoreV1Api()
while True:
try:
k8s_apps_v1.delete_namespaced_deployment(
namespace="airflow",
name="inference-deployment",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Deployment inference-deployment deleted")
with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_apps_v1.create_namespaced_deployment(
body=dep, namespace="airflow")
print(f"Deployment created. Status='{resp.metadata.name}'")
while True:
try:
k8s_core_v1.delete_namespaced_service(
namespace="airflow",
name="llm-service",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Service llm-service deleted")
with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_core_v1.create_namespaced_service(
body=dep, namespace="airflow")
print(f"Service created. Status='{resp.metadata.name}'")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 4: Run GKE Deployment for model serving
model_serving = PythonOperator(
task_id="model_serving",
python_callable=model_serving
)
dataset_download >> data_preparation >> fine_tuning >> model_serving
اسکریپت پایتون (فایل DAG) و همچنین مانیفستهای Kubernetes را در DAGS GCS آپلود کنید.
gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}
در رابط کاربری Airflow، عبارت mlops-dag را مشاهده خواهید کرد.
- لغو توقف موقت را انتخاب کنید.
- برای انجام یک چرخه دستی MLOps، گزینه Trigger DAG را انتخاب کنید.

پس از تکمیل DAG، خروجی مانند این را در رابط کاربری Airflow مشاهده خواهید کرد.

پس از مرحله نهایی، میتوانید نقطه پایانی مدل را بگیرید و یک اعلان برای آزمایش مدل ارسال کنید.
قبل از صدور دستور curl تقریباً ۵ دقیقه صبر کنید تا استنتاج مدل آغاز شود و متعادلکننده بار بتواند یک آدرس IP خارجی اختصاص دهد.
export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
"prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
"temperature": 0.1,
"top_p": 1.0,
"max_tokens": 128
}
EOF
خروجی:
۱۹. تبریک میگویم!
شما اولین گردش کار هوش مصنوعی خود را با استفاده از یک خط لوله DAG با Airflow 2 در GKE ایجاد کردهاید.
فراموش نکنید که منابعی را که مستقر کردهاید، از حالت آمادهباش خارج کنید.
۲۰. انجام این کار در محیط تولید
اگرچه CodeLab بینش فوقالعادهای در مورد نحوه راهاندازی Airflow 2 در GKE در اختیار شما قرار داده است، اما در دنیای واقعی، هنگام انجام این کار در محیط تولید، باید برخی از مباحث زیر را در نظر بگیرید.
با استفاده از Gradio یا ابزارهای مشابه، یک رابط کاربری وب پیادهسازی کنید.
یا میتوانید نظارت خودکار بر برنامههای کاربردی برای بارهای کاری با GKE را از اینجا پیکربندی کنید یا معیارها را از Airflow از اینجا استخراج کنید.
ممکن است برای تنظیم دقیقتر مدل، به پردازندههای گرافیکی (GPU) بزرگتری نیاز داشته باشید، بهخصوص اگر مجموعه دادههای بزرگتری داشته باشید. با این حال، اگر بخواهیم مدل را در چندین پردازنده گرافیکی آموزش دهیم، باید مجموعه دادهها را تقسیم کرده و آموزش را به قطعات کوچک تقسیم کنیم. در اینجا توضیحی در مورد FSDP با PyTorch (دادههای کاملاً خرد شده موازی، با استفاده از اشتراکگذاری GPU برای دستیابی به این هدف) ارائه شده است. برای مطالعه بیشتر میتوانید به یک پست وبلاگ از Meta و پست دیگری در این آموزش در مورد FSDP با استفاده از Pytorch مراجعه کنید.
Google Cloud Composer یک سرویس مدیریتشدهی Airflow است، بنابراین نیازی به نگهداری خود Airflow ندارید، فقط DAG خود را مستقر کنید و کار خود را انجام دهید.
بیشتر بدانید
- مستندات جریان هوا: https://airflow.apache.org/
مجوز
این اثر تحت مجوز عمومی Creative Commons Attribution 2.0 منتشر شده است.