1. ภาพรวม

Codelab นี้แสดงวิธีผสานรวมแนวทางปฏิบัติของ DevOps เข้ากับแมชชีนเลิร์นนิง (MLOps) โดยการดาวน์โหลดชุดข้อมูล ปรับแต่งโมเดล และทำให้ LLM ใช้งานได้ใน Google Kubernetes Engine (GKE) โดยใช้ DAG ของ Airflow ที่มีการแยกข้อมูลน้อยที่สุด ด้วยเหตุนี้ เราจึงใช้คำสั่ง gcloud แทน Terraform เพื่อให้คุณทำตามขั้นตอนใน Lab ได้ทีละขั้นตอนและเข้าใจแต่ละกระบวนการได้อย่างง่ายดายจากมุมมองของทั้งวิศวกรแพลตฟอร์มและวิศวกรแมชชีนเลิร์นนิง
คู่มือเชิงปฏิบัติฉบับนี้จะแนะนำวิธีใช้ประโยชน์จาก Airflow เพื่อปรับปรุงเวิร์กโฟลว์ AI โดยจะแสดงให้เห็นวงจร MLOps ทั้งหมดอย่างชัดเจนและเป็นประโยชน์ด้วยการกำหนดค่า DAG
สิ่งที่คุณจะได้เรียนรู้
- ส่งเสริมการทำงานร่วมกันและความเข้าใจที่ดียิ่งขึ้นระหว่างวิศวกรแพลตฟอร์มและวิศวกรแมชชีนเลิร์นนิงด้วยการทำลายการทำงานแบบแยกส่วนและความรู้ที่กระจัดกระจาย และปรับปรุงเวิร์กโฟลว์
- ทำความเข้าใจวิธีทําให้ใช้งานได้ ใช้ และจัดการ Airflow 2 ใน GKE
- กำหนดค่า DAG ของ Airflow ตั้งแต่ต้นจนจบ
- สร้างพื้นฐานสำหรับระบบแมชชีนเลิร์นนิงระดับการใช้งานจริงด้วย GKE
- วัดผลและนำระบบแมชชีนเลิร์นนิงไปใช้
- ทำความเข้าใจว่าวิศวกรรมแพลตฟอร์มกลายเป็นเสาหลักด้านการสนับสนุนที่สำคัญสำหรับ MLOps ได้อย่างไร
สิ่งที่ Codelab นี้ทำให้สำเร็จ
- คุณถามคำถามเกี่ยวกับภาพยนตร์จาก LLM ที่เราปรับแต่งโดยอิงตาม Gemma-2-9b-it ซึ่งให้บริการใน GKE ด้วย vLLM ได้
กลุ่มเป้าหมาย
- วิศวกรแมชชีนเลิร์นนิง
- วิศวกรแพลตฟอร์ม
- นักวิทยาศาสตร์ข้อมูล
- วิศวกรข้อมูล
- วิศวกร DevOps
- สถาปนิกแพลตฟอร์ม
- วิศวกรลูกค้า
Codelab นี้ไม่ได้มีไว้สำหรับ
- เป็นข้อมูลเบื้องต้นเกี่ยวกับเวิร์กโฟลว์ GKE หรือ AI/ML
- เป็นการทดลองใช้ชุดฟีเจอร์ทั้งหมดของ Airflow
2. วิศวกรรมแพลตฟอร์มช่วยวิศวกร/นักวิทยาศาสตร์ด้านแมชชีนเลิร์นนิง

วิศวกรรมแพลตฟอร์มและ MLOps เป็นสาขาที่พึ่งพาซึ่งกันและกันซึ่งทำงานร่วมกันเพื่อสร้างสภาพแวดล้อมที่มีประสิทธิภาพและมีประสิทธิภาพสำหรับการพัฒนาและการใช้งาน ML
ขอบเขต: วิศวกรรมแพลตฟอร์มมีขอบเขตที่กว้างกว่า MLOps โดยครอบคลุมวงจรการพัฒนาซอฟต์แวร์ทั้งหมด และจัดหาเครื่องมือและโครงสร้างพื้นฐานสำหรับวงจรดังกล่าว
MLOps ช่วยลดช่องว่างระหว่างการพัฒนา การติดตั้งใช้งาน และการอนุมาน ML
ความเชี่ยวชาญ: วิศวกรแพลตฟอร์มมักมีความเชี่ยวชาญด้านเทคโนโลยีโครงสร้างพื้นฐาน เช่น การประมวลผลแบบคลาวด์ การใช้คอนเทนเนอร์ และการจัดการข้อมูล
วิศวกร MLOps เชี่ยวชาญด้านการพัฒนา การติดตั้งใช้งาน และการตรวจสอบโมเดล ML โดยมักมีทักษะด้านวิทยาศาสตร์ข้อมูลและวิศวกรรมซอฟต์แวร์
เครื่องมือ: วิศวกรแพลตฟอร์มสร้างเครื่องมือสำหรับการจัดสรรโครงสร้างพื้นฐาน การจัดการการกำหนดค่า การจัดระเบียบคอนเทนเนอร์ และการจัดโครงสร้างแอปพลิเคชัน วิศวกร MLOps ใช้เครื่องมือในการฝึก ทดลอง ทดสอบ ติดตาม และควบคุมเวอร์ชันโมเดล ML
3. การตั้งค่าและข้อกำหนดของ Google Cloud
การตั้งค่าสภาพแวดล้อมแบบเรียนรู้ด้วยตนเอง
- ลงชื่อเข้าใช้ Google Cloud Console แล้วสร้างโปรเจ็กต์ใหม่หรือใช้โปรเจ็กต์ที่มีอยู่ซ้ำ หากยังไม่มีบัญชี Gmail หรือ Google Workspace คุณต้องสร้างบัญชี



- ชื่อโปรเจ็กต์คือชื่อที่แสดงสำหรับผู้เข้าร่วมโปรเจ็กต์นี้ ซึ่งเป็นสตริงอักขระที่ Google APIs ไม่ได้ใช้ คุณอัปเดตได้ทุกเมื่อ
- รหัสโปรเจ็กต์จะไม่ซ้ำกันในโปรเจ็กต์ Google Cloud ทั้งหมดและเปลี่ยนแปลงไม่ได้ (เปลี่ยนไม่ได้หลังจากตั้งค่าแล้ว) Cloud Console จะสร้างสตริงที่ไม่ซ้ำกันโดยอัตโนมัติ ซึ่งโดยปกติแล้วคุณไม่จำเป็นต้องสนใจว่าสตริงนั้นคืออะไร ใน Codelab ส่วนใหญ่ คุณจะต้องอ้างอิงรหัสโปรเจ็กต์ (โดยทั่วไปจะระบุเป็น
PROJECT_ID) หากไม่ชอบรหัสที่สร้างขึ้น คุณอาจสร้างรหัสแบบสุ่มอีกรหัสหนึ่งได้ หรือคุณอาจลองใช้ชื่อของคุณเองและดูว่ามีชื่อนั้นหรือไม่ คุณจะเปลี่ยนแปลงรหัสนี้หลังจากขั้นตอนนี้ไม่ได้ และรหัสจะคงอยู่ตลอดระยะเวลาของโปรเจ็กต์ - โปรดทราบว่ายังมีค่าที่ 3 ซึ่งคือหมายเลขโปรเจ็กต์ที่ API บางตัวใช้ ดูข้อมูลเพิ่มเติมเกี่ยวกับค่าทั้ง 3 นี้ได้ในเอกสารประกอบ
- จากนั้นคุณจะต้องเปิดใช้การเรียกเก็บเงินใน Cloud Console เพื่อใช้ทรัพยากร/API ของ Cloud การทำตาม Codelab นี้จะไม่มีค่าใช้จ่ายมากนัก หรืออาจไม่มีค่าใช้จ่ายเลย หากต้องการปิดทรัพยากรเพื่อหลีกเลี่ยงการเรียกเก็บเงินนอกเหนือจากบทแนะนำนี้ คุณสามารถลบทรัพยากรที่สร้างขึ้นหรือลบโปรเจ็กต์ได้ ผู้ใช้ Google Cloud รายใหม่มีสิทธิ์เข้าร่วมโปรแกรมช่วงทดลองใช้ฟรีมูลค่า$300 USD
เริ่มต้น Cloud Shell
แม้ว่าคุณจะใช้งาน Google Cloud จากแล็ปท็อประยะไกลได้ แต่ใน Codelab นี้คุณจะใช้ Cloud Shell ซึ่งเป็นสภาพแวดล้อมบรรทัดคำสั่งที่ทำงานในระบบคลาวด์
เปิดใช้งาน Cloud Shell
- จาก Cloud Console ให้คลิกเปิดใช้งาน Cloud Shell


หากคุณเริ่มใช้ Cloud Shell เป็นครั้งแรก คุณจะเห็นหน้าจอระดับกลางที่อธิบายว่า Cloud Shell คืออะไร หากเห็นหน้าจอระดับกลาง ให้คลิกต่อไป

การจัดสรรและเชื่อมต่อกับ Cloud Shell จะใช้เวลาไม่นาน

เครื่องเสมือนนี้โหลดเครื่องมือพัฒนาซอฟต์แวร์ทั้งหมดที่จำเป็นไว้แล้ว โดยมีไดเรกทอรีหลักแบบถาวรขนาด 5 GB และทำงานใน Google Cloud ซึ่งช่วยเพิ่มประสิทธิภาพเครือข่ายและการตรวจสอบสิทธิ์ได้อย่างมาก คุณสามารถทำงานส่วนใหญ่หรือทั้งหมดในโค้ดแล็บนี้ได้ด้วยเบราว์เซอร์
เมื่อเชื่อมต่อกับ Cloud Shell แล้ว คุณควรเห็นว่าคุณได้รับการตรวจสอบสิทธิ์และระบบได้ตั้งค่าโปรเจ็กต์เป็นรหัสโปรเจ็กต์ของคุณ
- เรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell เพื่อยืนยันว่าคุณได้รับการตรวจสอบสิทธิ์แล้ว
gcloud auth list
เอาต์พุตของคำสั่ง
Credentialed Accounts
ACTIVE ACCOUNT
* <my_account>@<my_domain.com>
To set the active account, run:
$ gcloud config set account `ACCOUNT`
- เรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell เพื่อยืนยันว่าคำสั่ง gcloud รู้จักโปรเจ็กต์ของคุณ
gcloud config list project
เอาต์พุตของคำสั่ง
[core] project = <PROJECT_ID>
หากไม่ได้ตั้งค่าไว้ คุณตั้งค่าได้ด้วยคำสั่งนี้
gcloud config set project <PROJECT_ID>
เอาต์พุตของคำสั่ง
Updated property [core/project].
4. ขั้นตอนที่ 1 - ลงชื่อสมัครใช้และตรวจสอบสิทธิ์ใน Kaggle
หากต้องการเริ่ม CodeLab คุณต้องสร้างบัญชีใน Kaggle ซึ่งเป็นแพลตฟอร์มชุมชนออนไลน์สำหรับนักวิทยาศาสตร์ข้อมูลและผู้ที่ชื่นชอบแมชชีนเลิร์นนิงที่ Google เป็นเจ้าของ และโฮสต์ที่เก็บข้อมูลขนาดใหญ่ของชุดข้อมูลที่เผยแพร่ต่อสาธารณะสำหรับโดเมนต่างๆ คุณจะดาวน์โหลดชุดข้อมูล RottenTomatoes ที่ใช้ฝึกโมเดลได้จากเว็บไซต์นี้
- ลงชื่อสมัครใช้ Kaggle คุณสามารถใช้ Google SSO เพื่อลงชื่อเข้าใช้ได้
- ยอมรับข้อกำหนดในการให้บริการ
- ไปที่การตั้งค่าและรับชื่อผู้ใช้ ชื่อผู้ใช้
- ในส่วน API ให้เลือก "สร้างโทเค็นใหม่จาก" Kaggle ซึ่งจะดาวน์โหลด kaggle.json
- หากพบปัญหา ให้ไปที่หน้าการสนับสนุนที่นี่
5. ขั้นตอนที่ 2 - ลงชื่อสมัครใช้และตรวจสอบสิทธิ์ใน HuggingFace
HuggingFace เป็นศูนย์กลางสำหรับทุกคนที่ต้องการมีส่วนร่วมกับเทคโนโลยีแมชชีนเลิร์นนิง โดยโฮสต์โมเดล 900, 000 รายการ ชุดข้อมูล 200, 000 รายการ และแอปเดโม (Spaces) 300,000 รายการ ซึ่งทั้งหมดเป็นโอเพนซอร์สและพร้อมให้บริการแก่สาธารณะ
- ลงชื่อสมัครใช้ HuggingFace - สร้างบัญชีด้วยชื่อผู้ใช้ คุณจะใช้ Google SSO ไม่ได้
- ยืนยันที่อยู่อีเมลของคุณ
- ไปที่นี่และยอมรับใบอนุญาตสำหรับโมเดล Gemma-2-9b-it
- สร้างโทเค็น HuggingFace ที่นี่
- บันทึกข้อมูลเข้าสู่ระบบโทเค็นไว้เพื่อใช้ในภายหลัง
6. ขั้นตอนที่ 3 - สร้างทรัพยากรโครงสร้างพื้นฐานของ Google Cloud ที่จำเป็น
คุณจะต้องตั้งค่า GKE, GCE, Artifact Registry และใช้บทบาท IAM โดยใช้การรวมข้อมูลประจำตัวของเวิร์กโหลด
เวิร์กโฟลว์ AI ของคุณใช้ Node Pool 2 รายการ ได้แก่ รายการหนึ่งสำหรับการฝึก และอีกรายการหนึ่งสำหรับการอนุมาน กลุ่มโหนดการฝึกใช้ VM ของ GCE ที่มี g2-standard-8 ซึ่งติดตั้ง GPU Tensor Core ของ Nvidia L4 ไว้ 1 ตัว กลุ่มโหนดการอนุมานใช้ VM g2-standard-24 ที่ติดตั้ง GPU Tensor Core Nvidia L4 จำนวน 2 ตัว ขณะระบุภูมิภาค ให้เลือกภูมิภาคที่รองรับ GPU ที่จำเป็น ( ลิงก์)
เรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell
# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")
SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1
# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto
# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount
# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com
# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
--accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-12 \
--num-nodes=1
# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
--accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-24 \
--num-nodes=1
# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a
# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}
สร้างไฟล์ Manifest YAML
mkdir manifests
cd manifests
mlops-sa.yaml
apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
name: airflow-mlops-sa
namespace: airflow
labels:
tier: airflow
pv-dags.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-dags
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 5Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_DAGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pv-logs.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-logs
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 100Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_LOGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pvc-dags.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-dags
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
volumeName: airflow-dags
storageClassName: standard
pvc-logs.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-logs
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
volumeName: airflow-logs
storageClassName: standard
namespace.yaml
kind: Namespace
apiVersion: v1
metadata:
name: airflow
labels:
name: airflow
sa-role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: airflow
name: airflow-deployment-role
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["services"]
verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]
sa-rolebinding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow-deployment-rolebinding
namespace: airflow
subjects:
- kind: ServiceAccount
name: airflow-worker
namespace: airflow
roleRef:
kind: Role
name: airflow-deployment-role
apiGroup: rbac.authorization.k8s.io
inference.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: inference-deployment
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: gemma-server
template:
metadata:
labels:
app: gemma-server
ai.gke.io/model: gemma-2-9b-it
ai.gke.io/inference-server: vllm
annotations:
gke-gcsfuse/volumes: "true"
spec:
serviceAccountName: airflow-mlops-sa
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
- key: "on-demand"
value: "true"
operator: "Equal"
effect: "NoSchedule"
containers:
- name: inference-server
image: vllm/vllm-openai:v0.6.6
ports:
- containerPort: 8000
resources:
requests:
nvidia.com/gpu: "2"
limits:
nvidia.com/gpu: "2"
command: ["/bin/sh", "-c"]
args:
- |
python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
volumeMounts:
- mountPath: /dev/shm
name: dshm
- name: gcs-fuse-csi-ephemeral
mountPath: /modeldata
readOnly: true
volumes:
- name: dshm
emptyDir:
medium: Memory
- name: gcs-fuse-csi-ephemeral
csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
bucketName: BUCKET_DATA_NAME
mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
fileCacheCapacity: "20Gi"
fileCacheForRangeRead: "true"
metadataStatCacheCapacity: "-1"
metadataTypeCacheCapacity: "-1"
metadataCacheTTLSeconds: "-1"
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-l4
inference-service.yaml
apiVersion: v1
kind: Service
metadata:
name: llm-service
namespace: airflow
spec:
selector:
app: gemma-server
type: LoadBalancer
ports:
- protocol: TCP
port: 8000
targetPort: 8000
สร้าง Bucket ของ Google Cloud Storage (GCS) 3 รายการ
gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}
# Create the namespace in GKE
kubectl apply -f namespace.yaml
# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml
Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"
7. ขั้นตอนที่ 4 - ติดตั้ง Airflow ใน GKE ผ่านแผนภูมิ Helm
ตอนนี้เราจะติดตั้งใช้งาน Airflow 2 โดยใช้ Helm Apache Airflow เป็นแพลตฟอร์มการจัดการเวิร์กโฟลว์แบบโอเพนซอร์ส สำหรับไปป์ไลน์วิศวกรรมข้อมูล เราจะพูดถึงชุดฟีเจอร์ของ Airflow 2 ในภายหลัง
values.yaml สำหรับแผนภูมิ Helm ของ Airflow
config:
webserver:
expose_config: true
webserver:
service:
type: LoadBalancer
podAnnotations:
gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
- name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
value: "30"
logs:
persistence:
enabled: true
existingClaim: "airflow-logs"
dags:
persistence:
enabled: true
existingClaim: "airflow-dags"
scheduler:
podAnnotations:
gke-gcsfuse/volumes: "true"
triggerer:
podAnnotations:
gke-gcsfuse/volumes: "true"
workers:
podAnnotations:
gke-gcsfuse/volumes: "true"
ติดตั้งใช้งาน Airflow 2
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml
8. ขั้นตอนที่ 5 - เริ่มต้น Airflow ด้วยการเชื่อมต่อและตัวแปร
เมื่อติดตั้งใช้งาน Airflow 2 แล้ว เราจะเริ่มกำหนดค่าได้ เรากำหนดตัวแปรบางอย่างซึ่งสคริปต์ Python ของเราจะอ่าน
- เข้าถึง UI ของ Airflow บนพอร์ต 8080 ด้วยเบราว์เซอร์
รับ IP ภายนอก
kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'
เปิดเว็บเบราว์เซอร์แล้วไปที่ http://<EXTERNAL-IP>:8080 โดยข้อมูลเข้าสู่ระบบคือ admin / admin
- สร้างการเชื่อมต่อ GCP เริ่มต้นภายใน UI ของ Airflow โดยไปที่ผู้ดูแลระบบ → การเชื่อมต่อ → + เพิ่มระเบียนใหม่
- รหัสการเชื่อมต่อ: google_cloud_default
- ประเภทการเชื่อมต่อ: Google Cloud
คลิกบันทึก
- สร้างตัวแปรที่จำเป็นโดยไปที่ผู้ดูแลระบบ → ตัวแปร → + เพิ่มระเบียนใหม่
- คีย์: BUCKET_DATA_NAME - ค่า: คัดลอกจาก echo $BUCKET_DATA_NAME
- คีย์: GCP_PROJECT_ID - ค่า: คัดลอกจาก echo $DEVSHELL_PROJECT_ID
- คีย์: HF_TOKEN - ค่า: ใส่โทเค็น HF
- คีย์: KAGGLE_USERNAME - ค่า: ป้อนชื่อผู้ใช้ Kaggle
- คีย์: KAGGLE_KEY - ค่า: คัดลอกจาก kaggle.json
คลิกบันทึกหลังจากคู่คีย์-ค่าแต่ละคู่
UI ควรมีลักษณะดังนี้

9. คอนเทนเนอร์โค้ดแอปพลิเคชัน #1 - การดาวน์โหลดข้อมูล
ในสคริปต์ Python นี้ เราจะตรวจสอบสิทธิ์กับ Kaggle เพื่อดาวน์โหลดชุดข้อมูลไปยัง Bucket ของ GCS
สคริปต์เองก็อยู่ในคอนเทนเนอร์ด้วย เนื่องจากสคริปต์นี้จะกลายเป็น DAG Unit #1 และเราคาดหวังว่าชุดข้อมูลจะได้รับการอัปเดตบ่อยครั้ง จึงต้องการทำให้กระบวนการนี้เป็นแบบอัตโนมัติ
สร้างไดเรกทอรีและคัดลอกสคริปต์ของเราที่นี่
cd .. ; mkdir 1-dataset-download
cd 1-dataset-download
dataset-download.py
import os
import kagglehub
from google.cloud import storage
KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
def upload_blob(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")
print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"
upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]
requirements.txt
google-cloud-storage==2.19.0
kagglehub==0.3.4
ตอนนี้เราจะสร้างอิมเมจคอนเทนเนอร์สำหรับการดาวน์โหลดชุดข้อมูลและพุชไปยัง Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest
10. คอนเทนเนอร์โค้ดแอปพลิเคชัน #2 - การเตรียมข้อมูล
ในขั้นตอนการเตรียมข้อมูล เราจะทำสิ่งต่อไปนี้
- ระบุปริมาณชุดข้อมูลที่ต้องการใช้ในการปรับแต่งโมเดลพื้นฐาน
- โหลดชุดข้อมูล กล่าวคือ อ่านไฟล์ CSV ลงใน Pandas DataFrame ซึ่งเป็นโครงสร้างข้อมูล 2 มิติสำหรับแถวและคอลัมน์
- การเปลี่ยนรูปแบบ / การประมวลผลข้อมูลเบื้องต้น - ระบุส่วนของชุดข้อมูลที่ไม่เกี่ยวข้องโดยการระบุสิ่งที่เราต้องการเก็บไว้ ซึ่งจะเท่ากับการนำส่วนที่เหลือออก
- ใช้
transformฟังก์ชันกับแต่ละแถวของ DataFrame - บันทึกข้อมูลที่เตรียมไว้กลับลงใน Bucket ของ GCS
สร้างไดเรกทอรีและคัดลอกสคริปต์ของเราที่นี่
cd .. ; mkdir 2-data-preparation
cd 2-data-preparation
data-preparation.py
import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset
# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100")) # Process a limited number of rows, used 100 during testing phase but can be increased
DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"
# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")
def transform(data):
"""
Transforms a row of the DataFrame into the desired format for fine-tuning.
Args:
data: A pandas Series representing a row of the DataFrame.
Returns:
A dictionary containing the formatted text.
"""
question = f"Review analysis for movie '{data['id']}'"
context = data['reviewText']
answer = data['scoreSentiment']
template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
return {'text': template.format(question=question, context=context, answer=answer)}
try:
df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
print(f"Dataset loaded successfully.")
# Drop rows with NaN values in relevant columns
df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])
# Apply transformation to the DataFrame
transformed_data = df.apply(transform, axis=1).tolist()
# Convert transformed data to a DataFrame and then to a Hugging Face Dataset
transformed_df = pd.DataFrame(transformed_data)
dataset = Dataset.from_pandas(transformed_df)
# Save the prepared dataset to JSON lines format
with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
for item in dataset:
f.write(json.dumps(item) + "\n")
print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
except Exception as e:
print(f"Error during data loading or preprocessing: {e}")
import traceback
print(traceback.format_exc())
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]
requirements.txt
datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3
# Now we create a container images for data-preparation and push it to the Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest
11. คอนเทนเนอร์โค้ดของแอปพลิเคชัน #3 - การปรับแต่ง
ในที่นี้ เราใช้ Gemma-2-9b-it เป็นโมเดลพื้นฐาน แล้วจึงปรับแต่งด้วยชุดข้อมูลใหม่
นี่คือลําดับขั้นตอนที่เกิดขึ้นในระหว่างขั้นตอนการปรับแต่ง
1. การตั้งค่า: นำเข้าไลบรารี กำหนดพารามิเตอร์ (สำหรับโมเดล ข้อมูล และการฝึก) และโหลดชุดข้อมูลจาก Google Cloud Storage
2. โหลดโมเดล: โหลดโมเดลภาษาที่ฝึกไว้ล่วงหน้าพร้อมการวัดปริมาณเพื่อประสิทธิภาพ และโหลดโทเค็นไนเซอร์ที่เกี่ยวข้อง
3. กำหนดค่า LoRA: ตั้งค่า Low-Rank Adaptation (LoRA) เพื่อปรับแต่งโมเดลอย่างมีประสิทธิภาพโดยการเพิ่มเมทริกซ์ขนาดเล็กที่ฝึกได้
4. ฝึก: กำหนดพารามิเตอร์การฝึกและใช้ SFTTrainer เพื่อปรับแต่งโมเดลในชุดข้อมูลที่โหลดโดยใช้ประเภทการหาปริมาณ FP16
5. บันทึกและอัปโหลด: บันทึกโมเดลและโทเค็นไนเซอร์ที่ปรับแต่งแล้วในเครื่อง จากนั้นอัปโหลดไปยัง Bucket ของ GCS
จากนั้นเราจะสร้างอิมเมจคอนเทนเนอร์โดยใช้ Cloud Build และจัดเก็บไว้ใน Artifact Registry
สร้างไดเรกทอรีและคัดลอกสคริปต์ของเราที่นี่
cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning
finetuning.py
import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage
# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")
# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")
# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
"json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)
################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))
################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03
# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50
################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False
# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
attn_implementation="eager",
pretrained_model_name_or_path=model_name,
torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")
# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
r=lora_r,
bias="none",
task_type="CAUSAL_LM",
target_modules=[
"q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj",
],
)
# Set training parameters
training_arguments = SFTConfig(
bf16=bf16,
dataset_kwargs={
"add_special_tokens": False,
"append_concat_token": False,
},
dataset_text_field="text",
disable_tqdm=True,
fp16=fp16,
gradient_accumulation_steps=gradient_accumulation_steps,
gradient_checkpointing=gradient_checkpointing,
gradient_checkpointing_kwargs={"use_reentrant": False},
group_by_length=group_by_length,
log_on_each_node=False,
logging_steps=logging_steps,
learning_rate=learning_rate,
lr_scheduler_type=lr_scheduler_type,
max_grad_norm=max_grad_norm,
max_seq_length=max_seq_length,
max_steps=max_steps,
num_train_epochs=num_train_epochs,
optim=optim,
output_dir=save_model_path,
packing=packing,
per_device_train_batch_size=per_device_train_batch_size,
save_strategy=save_strategy,
save_steps=save_steps,
save_total_limit=save_total_limit,
warmup_ratio=warmup_ratio,
weight_decay=weight_decay,
)
print(f"Configuring fine tuning completed")
# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
model=model,
train_dataset=dataset,
peft_config=peft_config,
dataset_text_field="text",
max_seq_length=max_seq_length,
tokenizer=tokenizer,
args=training_arguments,
packing=packing,
)
print(f"Creating trainer completed")
# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")
# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")
# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
low_cpu_mem_usage=True,
pretrained_model_name_or_path=model_name,
return_dict=True,
torch_dtype=torch.float16,
)
model = PeftModel.from_pretrained(
model=base_model,
model_id=new_model,
)
model = model.merge_and_unload()
print(f"Merging the new model with base model completed")
accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")
print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
is_main_process=accelerator.is_main_process,
save_directory=save_model_path,
save_function=accelerator.save,
)
print(f"Save unwrapped model completed")
print(f"Save new tokenizer started")
if accelerator.is_main_process:
tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")
# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
"""Uploads a directory to GCS."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
for root, _, files in os.walk(model_dir):
for file in files:
local_file_path = os.path.join(root, file)
gcs_file_path = os.path.relpath(local_file_path, model_dir)
blob = bucket.blob(os.path.join(new_model, gcs_file_path)) # Use new_model_name
blob.upload_from_filename(local_file_path)
# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")
Dockerfile
# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04
# Install necessary system packages
RUN apt-get update && \
apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
rm -rf /var/lib/apt/lists/*
# Copy requirements.txt into the container
COPY requirements.txt .
# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
# Copy your finetune script into the container
COPY finetuning.py .
# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]
requirements.txt
accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4
ตอนนี้เราจะสร้างอิมเมจคอนเทนเนอร์สำหรับการปรับแต่งอย่างละเอียดและพุชไปยัง Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest
12. ภาพรวมของ Airflow 2 รวมถึง DAG คืออะไร
Airflow เป็นแพลตฟอร์มสำหรับการจัดระเบียบเวิร์กโฟลว์และไปป์ไลน์ข้อมูล โดยจะใช้ DAG (Directed Acyclic Graph) เพื่อกำหนดเวิร์กโฟลว์เหล่านี้ในโค้ด Python ซึ่งแสดงภาพงานและการอ้างอิง
Airflow ที่มี DAG แบบคงที่และคำจำกัดความที่ใช้ Python เหมาะสำหรับการกำหนดเวลาและจัดการเวิร์กโฟลว์ที่กำหนดไว้ล่วงหน้า สถาปัตยกรรมของเครื่องมือนี้มี UI ที่ใช้งานง่ายสำหรับการตรวจสอบและจัดการเวิร์กโฟลว์เหล่านี้
โดยพื้นฐานแล้ว Airflow ช่วยให้คุณกำหนดเวลาและตรวจสอบไปป์ไลน์ข้อมูลโดยใช้ Python ซึ่งทำให้เป็นเครื่องมือที่ยืดหยุ่นและมีประสิทธิภาพสำหรับการจัดเวิร์กโฟลว์
13. ภาพรวมของ DAG

DAG ย่อมาจาก Directed Acyclic Graph ซึ่งใน Airflow นั้น DAG จะแสดงถึงเวิร์กโฟลว์หรือไปป์ไลน์ทั้งหมด โดยจะกำหนดงาน ทรัพยากร Dependency และลำดับการดำเนินการ
ระบบจะเรียกใช้หน่วยของเวิร์กโฟลว์ภายใน DAG จากพ็อดในคลัสเตอร์ GKE ซึ่งเริ่มต้นจากการกำหนดค่า Airflow
สรุป:
Airflow: การดาวน์โหลดข้อมูล - สคริปต์นี้จะทำให้กระบวนการรับชุดข้อมูลรีวิวภาพยนตร์จาก Kaggle และจัดเก็บไว้ในที่เก็บข้อมูล GCS เป็นไปโดยอัตโนมัติ ซึ่งจะทำให้พร้อมใช้งานสำหรับการประมวลผลหรือการวิเคราะห์เพิ่มเติมในสภาพแวดล้อมระบบคลาวด์
Airflow: การเตรียมข้อมูล - โค้ดจะใช้ชุดข้อมูลรีวิวภาพยนตร์ดิบ นำคอลัมน์ข้อมูลที่ไม่เกี่ยวข้องซึ่งไม่จำเป็นสำหรับกรณีการใช้งานของเราออก และลบชุดข้อมูลที่มีค่าที่ขาดหายไป จากนั้นจะจัดโครงสร้างชุดข้อมูลเป็นรูปแบบการตอบคำถามที่เหมาะกับแมชชีนเลิร์นนิง และจัดเก็บไว้ใน GCS เพื่อใช้ในภายหลัง
Airflow: การปรับแต่งโมเดล - โค้ดนี้จะปรับแต่งโมเดลภาษาขนาดใหญ่ (LLM) โดยใช้เทคนิคที่เรียกว่า LoRA (Low-Rank Adaptation) แล้วบันทึกโมเดลที่อัปเดต โดยจะเริ่มต้นด้วยการโหลด LLM ที่ผ่านการฝึกมาก่อนและชุดข้อมูลจาก Google Cloud Storage จากนั้นจะใช้ LoRA เพื่อปรับแต่งโมเดลในชุดข้อมูลนี้อย่างมีประสิทธิภาพ สุดท้ายนี้ ระบบจะบันทึกโมเดลที่ปรับแต่งแล้วกลับไปยัง Google Cloud Storage เพื่อใช้ในภายหลังในแอปพลิเคชันต่างๆ เช่น การสร้างข้อความหรือการตอบคำถาม
Airflow: Model Serving - แสดงโมเดลที่ปรับแต่งแล้วใน GKE ด้วย vllm สำหรับการอนุมาน
Airflow: วงจรความคิดเห็น - ฝึกโมเดลใหม่ทุก xx เวลา (รายชั่วโมง รายวัน รายสัปดาห์)
แผนภาพนี้อธิบายวิธีการทำงานของ Airflow 2 เมื่อเรียกใช้ใน GKE

14. การปรับแต่งโมเดลเทียบกับการใช้ RAG
Codelab นี้จะปรับแต่ง LLM แทนการใช้ Retrieval Augmented Generation (RAG)
มาเปรียบเทียบ 2 วิธีนี้กัน
การปรับแต่ง:สร้างโมเดลเฉพาะทาง: การปรับแต่งจะปรับ LLM ให้เข้ากับงานหรือชุดข้อมูลที่เฉพาะเจาะจง ซึ่งช่วยให้ LLM ทำงานได้อย่างอิสระโดยไม่ต้องอาศัยแหล่งข้อมูลภายนอก
ลดความซับซ้อนของการอนุมาน: ไม่จำเป็นต้องมีระบบการดึงข้อมูลและฐานข้อมูลแยกต่างหาก จึงตอบสนองได้เร็วขึ้นและถูกลง โดยเฉพาะอย่างยิ่งสำหรับ Use Case ที่ใช้บ่อย
RAG: อาศัยความรู้ภายนอก: RAG ดึงข้อมูลที่เกี่ยวข้องจากฐานความรู้สำหรับแต่ละคำขอ เพื่อให้มั่นใจว่าสามารถเข้าถึงข้อมูลล่าสุดและเฉพาะเจาะจงได้
เพิ่มความซับซ้อน: การติดตั้งใช้งาน RAG ในสภาพแวดล้อมการผลิต เช่น คลัสเตอร์ Kubernetes มักเกี่ยวข้องกับไมโครเซอร์วิสหลายรายการสำหรับการประมวลผลและการดึงข้อมูล ซึ่งอาจเพิ่มเวลาในการตอบสนองและค่าใช้จ่ายในการคำนวณ
เหตุผลที่เลือกการปรับแต่ง
แม้ว่า RAG จะเหมาะกับชุดข้อมูลขนาดเล็กที่ใช้ใน CodeLab นี้ แต่เราเลือกใช้การปรับแต่งเพื่อแสดงกรณีการใช้งานทั่วไปสำหรับ Airflow การเลือกนี้ช่วยให้เรามุ่งเน้นไปที่ด้านการจัดเวิร์กโฟลว์แทนที่จะเจาะลึกถึงความแตกต่างของการตั้งค่าโครงสร้างพื้นฐานและไมโครเซอร์วิสเพิ่มเติมสำหรับ RAG
บทสรุป:
ทั้งการปรับแต่งและการสร้างคำตอบโดยอิงตามเอกสารมีความสำคัญและมีจุดแข็งและจุดอ่อนของตัวเอง ตัวเลือกที่เหมาะสมที่สุดขึ้นอยู่กับข้อกำหนดเฉพาะของโปรเจ็กต์ เช่น ขนาดและความซับซ้อนของข้อมูล ความต้องการด้านประสิทธิภาพ และการพิจารณาต้นทุน
15. งาน DAG #1 - สร้างขั้นตอนแรกใน Airflow: การดาวน์โหลดข้อมูล
ภาพรวมของหน่วย DAG นี้คือโค้ด Python ที่โฮสต์ในอิมเมจคอนเทนเนอร์จะดาวน์โหลดชุดข้อมูล RottenTomatoes ล่าสุดจาก Kaggle
อย่าคัดลอกโค้ดนี้ลงในที่เก็บข้อมูล GCS เราคัดลอก mlops-dag.py เป็นขั้นตอนสุดท้าย ซึ่งมีขั้นตอนหน่วย DAG ทั้งหมดภายในสคริปต์ Python เดียว
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
dataset_download
16. งาน DAG #2 - สร้างขั้นตอนที่ 2 ใน Airflow: การจัดเตรียมข้อมูล
ภาพรวมของหน่วย DAG นี้คือเราโหลดไฟล์ CSV (rotten_tomatoes_movie_reviews.csv) จาก GCS ลงใน Pandas DataFrame
จากนั้นเราจะจำกัดจำนวนแถวที่ประมวลผลโดยใช้ DATASET_LIMIT เพื่อการทดสอบและประสิทธิภาพของทรัพยากร และสุดท้ายจะแปลงข้อมูลที่เปลี่ยนรูปแบบเป็นชุดข้อมูล Hugging Face
หากสังเกตดีๆ คุณจะเห็นว่าเรากำลังฝึกโมเดลด้วยข้อมูล 1,000 แถวโดยมี "DATASET_LIMIT": "1000" ซึ่งเป็นเพราะต้องใช้เวลา 20 นาทีใน GPU ของ Nvidia L4
อย่าคัดลอกโค้ดนี้ลงในที่เก็บข้อมูล GCS เราจะคัดลอก mlops-dag.py ในขั้นตอนสุดท้าย ซึ่งมีขั้นตอนทั้งหมดภายในสคริปต์ Python เดียว
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation
17. งาน DAG #3 - สร้างขั้นตอนที่ 3 ใน Airflow: การปรับแต่งโมเดล
ภาพรวมของหน่วย DAG นี้คือเราจะเรียกใช้ finetune.py เพื่อปรับแต่งโมเดล Gemma ด้วยชุดข้อมูลใหม่
อย่าคัดลอกโค้ดนี้ลงในที่เก็บข้อมูล GCS เราจะคัดลอก mlops-dag.py ในขั้นตอนสุดท้าย ซึ่งมีขั้นตอนทั้งหมดภายในสคริปต์ Python เดียว
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Task 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Task 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Task 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation >> fine_tuning
18. งาน DAG #4 - สร้างขั้นตอนสุดท้ายใน Airflow: การอนุมาน / การแสดงโมเดล
vLLM เป็นไลบรารีโอเพนซอร์สที่มีประสิทธิภาพซึ่งออกแบบมาโดยเฉพาะสำหรับการอนุมาน LLM ที่มีประสิทธิภาพสูง เมื่อติดตั้งใช้งานใน Google Kubernetes Engine (GKE) โมเดลจะใช้ประโยชน์จากความสามารถในการปรับขนาดและประสิทธิภาพของ Kubernetes เพื่อให้บริการ LLM ได้อย่างมีประสิทธิภาพ
สรุปขั้นตอน
- อัปโหลด DAG "mlops-dag.py" ไปยังที่เก็บข้อมูล GCS
- คัดลอกไฟล์กำหนดค่า Kubernetes YAML 2 ไฟล์เพื่อตั้งค่าการอนุมานลงในที่เก็บข้อมูล GCS
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
def model_serving():
config.load_incluster_config()
k8s_apps_v1 = client.AppsV1Api()
k8s_core_v1 = client.CoreV1Api()
while True:
try:
k8s_apps_v1.delete_namespaced_deployment(
namespace="airflow",
name="inference-deployment",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Deployment inference-deployment deleted")
with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_apps_v1.create_namespaced_deployment(
body=dep, namespace="airflow")
print(f"Deployment created. Status='{resp.metadata.name}'")
while True:
try:
k8s_core_v1.delete_namespaced_service(
namespace="airflow",
name="llm-service",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Service llm-service deleted")
with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_core_v1.create_namespaced_service(
body=dep, namespace="airflow")
print(f"Service created. Status='{resp.metadata.name}'")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 4: Run GKE Deployment for model serving
model_serving = PythonOperator(
task_id="model_serving",
python_callable=model_serving
)
dataset_download >> data_preparation >> fine_tuning >> model_serving
อัปโหลดสคริปต์ Python (ไฟล์ DAG) รวมถึงไฟล์ Manifest ของ Kubernetes ลงใน DAGS GCS Bucket
gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}
คุณจะเห็น mlops-dag ใน UI ของ Airflow
- เลือกยกเลิกการหยุดชั่วคราว
- เลือกทริกเกอร์ DAG เพื่อดำเนินการวงจร MLOps ด้วยตนเอง

เมื่อ DAG เสร็จสมบูรณ์ คุณจะเห็นเอาต์พุตลักษณะนี้ใน UI ของ Airflow

หลังจากขั้นตอนสุดท้าย คุณจะรับปลายทางของโมเดลและส่งพรอมต์เพื่อทดสอบโมเดลได้
โปรดรอประมาณ 5 นาทีก่อนออกคำสั่ง curl เพื่อให้การอนุมานของโมเดลเริ่มต้นได้และตัวจัดสรรภาระงานสามารถกำหนดที่อยู่ IP ภายนอกได้
export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
"prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
"temperature": 0.1,
"top_p": 1.0,
"max_tokens": 128
}
EOF
เอาต์พุต:
19. ยินดีด้วย
คุณได้สร้างเวิร์กโฟลว์ AI แรกโดยใช้ไปป์ไลน์ DAG ด้วย Airflow 2 บน GKE
อย่าลืมยกเลิกการจัดสรรทรัพยากรที่คุณได้ติดตั้งใช้งาน
20. การดำเนินการนี้ในเวอร์ชันที่ใช้งานจริง
แม้ว่า CodeLab จะให้ข้อมูลเชิงลึกที่ยอดเยี่ยมเกี่ยวกับวิธีตั้งค่า Airflow 2 ใน GKE แต่ในโลกแห่งความเป็นจริง คุณจะต้องพิจารณาหัวข้อต่อไปนี้เมื่อดำเนินการในสภาพแวดล้อมการผลิต
ใช้ส่วนหน้าของเว็บโดยใช้ Gradio หรือเครื่องมือที่คล้ายกัน
กำหนดค่าการตรวจสอบแอปพลิเคชันอัตโนมัติสำหรับภาระงานด้วย GKE ที่นี่ หรือส่งออกเมตริกจาก Airflow ที่นี่
คุณอาจต้องใช้ GPU ที่ใหญ่ขึ้นเพื่อปรับแต่งโมเดลให้เร็วขึ้น โดยเฉพาะอย่างยิ่งหากมีชุดข้อมูลขนาดใหญ่ อย่างไรก็ตาม หากต้องการฝึกโมเดลใน GPU หลายตัว เราต้องแยกชุดข้อมูลและแบ่งการฝึกออกเป็นส่วนๆ ต่อไปนี้คือคำอธิบายเกี่ยวกับ FSDP ด้วย PyTorch (การขนานข้อมูลแบบ Shard เต็ม โดยใช้การแชร์ GPU เพื่อให้บรรลุเป้าหมายดังกล่าว อ่านเพิ่มเติมได้ที่บล็อกโพสต์จาก Meta และอีกบล็อกโพสต์ในบทแนะนำนี้เกี่ยวกับ FSDP โดยใช้ Pytorch
Google Cloud Composer เป็นบริการ Airflow ที่มีการจัดการ คุณจึงไม่ต้องดูแลรักษา Airflow เอง เพียงแค่ติดตั้งใช้งาน DAG แล้วก็พร้อมใช้งานได้เลย
ดูข้อมูลเพิ่มเติม
- เอกสารประกอบของ Airflow: https://airflow.apache.org/
ใบอนุญาต
ผลงานนี้ได้รับอนุญาตภายใต้สัญญาอนุญาตครีเอทีฟคอมมอนส์สำหรับยอมรับสิทธิของผู้สร้าง (Creative Commons Attribution License) 2.0 แบบทั่วไป