১. সংক্ষিপ্ত বিবরণ

এই কোডল্যাবটি দেখায় কিভাবে একটি ডেটাসেট ডাউনলোড করে, একটি মডেলকে পরিমার্জন করে এবং সর্বনিম্ন অ্যাবস্ট্রাকশন সহ একটি এয়ারফ্লো DAG ব্যবহার করে গুগল কুবারনেটিস ইঞ্জিন (GKE)-এ LLM-টি ডেপ্লয় করার মাধ্যমে মেশিন লার্নিং (MLOps)-এ ডেভঅপ্স অনুশীলনগুলিকে একীভূত করা যায়। ফলস্বরূপ, আমরা টেরাফর্মের পরিবর্তে gcloud কমান্ড ব্যবহার করছি, যাতে আপনি ল্যাবটি ধাপে ধাপে অনুসরণ করতে পারেন এবং প্ল্যাটফর্ম ইঞ্জিনিয়ার ও মেশিন লার্নিং ইঞ্জিনিয়ার উভয়ের দৃষ্টিকোণ থেকে প্রতিটি প্রক্রিয়া সহজেই বুঝতে পারেন।
এই ব্যবহারিক নির্দেশিকাটি আপনাকে একটি DAG কনফিগার করার মাধ্যমে সম্পূর্ণ MLOps জীবনচক্রের একটি স্পষ্ট এবং বাস্তবসম্মত প্রদর্শনীর সাহায্যে, আপনার AI ওয়ার্কফ্লোকে সুবিন্যস্ত করতে Airflow ব্যবহারের পদ্ধতি ধাপে ধাপে দেখাবে।
আপনি যা শিখবেন
- জ্ঞানের সীমাবদ্ধতা দূর করে এবং কর্মপ্রবাহ উন্নত করার মাধ্যমে প্ল্যাটফর্ম ও মেশিন লার্নিং ইঞ্জিনিয়ারদের মধ্যে বৃহত্তর সহযোগিতা ও বোঝাপড়া বৃদ্ধি করুন।
- GKE-তে Airflow 2 কীভাবে স্থাপন, ব্যবহার এবং পরিচালনা করতে হয় তা বুঝুন।
- একটি এয়ারফ্লো ডিএজি শুরু থেকে শেষ পর্যন্ত কনফিগার করুন।
- GKE ব্যবহার করে প্রোডাকশন গ্রেড মেশিন লার্নিং সিস্টেমের ভিত্তি তৈরি করুন।
- মেশিন লার্নিং সিস্টেমগুলোকে যন্ত্রায়িত ও কার্যকর করা
- বুঝুন কিভাবে প্ল্যাটফর্ম ইঞ্জিনিয়ারিং MLOps-এর জন্য একটি গুরুত্বপূর্ণ সহায়ক স্তম্ভে পরিণত হয়েছে।
এই কোডল্যাব যা অর্জন করে
- আপনি Gemma-2-9b-it-এর উপর ভিত্তি করে আমাদের দ্বারা পরিমার্জিত একটি LLM থেকে চলচ্চিত্র সম্পর্কে প্রশ্ন করতে পারেন, যা GKE-তে vLLM-এর সাথে পরিবেশিত হয়।
লক্ষ্য দর্শক
- মেশিন লার্নিং ইঞ্জিনিয়াররা
- প্ল্যাটফর্ম ইঞ্জিনিয়াররা
- ডেটা বিজ্ঞানীরা
- ডেটা ইঞ্জিনিয়ার
- ডেভঅপস ইঞ্জিনিয়ার
- প্ল্যাটফর্ম আর্কিটেক্ট
- গ্রাহক প্রকৌশলী
এই কোডল্যাবটি উদ্দেশ্য নয়
- GKE বা AI/ML ওয়ার্কফ্লো-এর ভূমিকা হিসেবে
- সম্পূর্ণ এয়ারফ্লো ফিচার-সেটের একটি সংক্ষিপ্ত বিবরণ
২. প্ল্যাটফর্ম ইঞ্জিনিয়ারিং মেশিন লার্নিং ইঞ্জিনিয়ার/বিজ্ঞানীদের সহায়তা করে

প্ল্যাটফর্ম ইঞ্জিনিয়ারিং এবং এমএলওপিএস হলো দুটি পরস্পর নির্ভরশীল শাখা, যা মেশিন লার্নিং-এর উন্নয়ন ও স্থাপনের জন্য একটি শক্তিশালী এবং কার্যকর পরিবেশ তৈরিতে সহযোগিতা করে।
পরিধি: প্ল্যাটফর্ম ইঞ্জিনিয়ারিংয়ের পরিধি এমএলওপিএস (MLOps) থেকেও ব্যাপক, যা সম্পূর্ণ সফটওয়্যার ডেভেলপমেন্ট লাইফসাইকেলকে অন্তর্ভুক্ত করে এবং এর জন্য প্রয়োজনীয় টুলস ও অবকাঠামো সরবরাহ করে।
MLOps, ML উন্নয়ন, প্রয়োগ এবং অনুমানের মধ্যকার ব্যবধান দূর করে।
বিশেষজ্ঞতা: প্ল্যাটফর্ম ইঞ্জিনিয়ারদের সাধারণত ক্লাউড কম্পিউটিং, কন্টেইনারাইজেশন এবং ডেটা ম্যানেজমেন্টের মতো অবকাঠামো প্রযুক্তিতে গভীর দক্ষতা থাকে।
এমএলওপিএস ইঞ্জিনিয়াররা এমএল মডেলের উন্নয়ন, স্থাপন এবং পর্যবেক্ষণে বিশেষজ্ঞ হন এবং প্রায়শই তাদের ডেটা সায়েন্স ও সফটওয়্যার ইঞ্জিনিয়ারিং দক্ষতা থাকে।
টুলস: প্ল্যাটফর্ম ইঞ্জিনিয়াররা ইনফ্রাস্ট্রাকচার প্রভিশনিং, কনফিগারেশন ম্যানেজমেন্ট, কন্টেইনার অর্কেস্ট্রেশন এবং অ্যাপ্লিকেশন স্ক্যাফোল্ডিংয়ের জন্য টুল তৈরি করেন। এমএলওপিএস ইঞ্জিনিয়াররা এমএল মডেল ট্রেনিং, এক্সপেরিমেন্টেশন, ডেপ্লয়মেন্ট, মনিটরিং এবং ভার্সনিংয়ের জন্য টুল ব্যবহার করেন।
৩. গুগল ক্লাউড সেটআপ এবং প্রয়োজনীয়তা
স্ব-গতিতে পরিবেশ সেটআপ
- Google Cloud Console- এ সাইন-ইন করুন এবং একটি নতুন প্রজেক্ট তৈরি করুন অথবা বিদ্যমান কোনো প্রজেক্ট পুনরায় ব্যবহার করুন। যদি আপনার আগে থেকে Gmail বা Google Workspace অ্যাকাউন্ট না থাকে, তবে আপনাকে অবশ্যই একটি তৈরি করতে হবে।



- প্রজেক্টের নামটি হলো এই প্রজেক্টের অংশগ্রহণকারীদের প্রদর্শিত নাম। এটি একটি ক্যারেক্টার স্ট্রিং যা গুগল এপিআই ব্যবহার করে না। আপনি যেকোনো সময় এটি আপডেট করতে পারেন।
- প্রজেক্ট আইডি সমস্ত গুগল ক্লাউড প্রজেক্ট জুড়ে অনন্য এবং অপরিবর্তনীয় (একবার সেট করার পর এটি পরিবর্তন করা যায় না)। ক্লাউড কনসোল স্বয়ংক্রিয়ভাবে একটি অনন্য স্ট্রিং তৈরি করে; সাধারণত এটি কী তা নিয়ে আপনার মাথা ঘামানোর দরকার নেই। বেশিরভাগ কোডল্যাবে, আপনাকে আপনার প্রজেক্ট আইডি উল্লেখ করতে হবে (যা সাধারণত
PROJECT_IDহিসাবে চিহ্নিত করা হয়)। তৈরি করা আইডিটি আপনার পছন্দ না হলে, আপনি এলোমেলোভাবে আরেকটি তৈরি করতে পারেন। বিকল্পভাবে, আপনি আপনার নিজের আইডি দিয়ে চেষ্টা করে দেখতে পারেন যে সেটি উপলব্ধ আছে কিনা। এই ধাপের পরে এটি পরিবর্তন করা যাবে না এবং প্রজেক্টের পুরো সময়কাল জুড়ে এটি অপরিবর্তিত থাকবে। - আপনার অবগতির জন্য জানাচ্ছি যে, তৃতীয় একটি ভ্যালু রয়েছে, যা হলো প্রজেক্ট নম্বর , এবং কিছু এপিআই এটি ব্যবহার করে থাকে। ডকুমেন্টেশনে এই তিনটি ভ্যালু সম্পর্কে আরও বিস্তারিত জানুন।
- এরপর, ক্লাউড রিসোর্স/এপিআই ব্যবহার করার জন্য আপনাকে ক্লাউড কনসোলে বিলিং চালু করতে হবে। এই কোডল্যাবটি সম্পন্ন করতে খুব বেশি খরচ হবে না, এমনকি আদৌ কোনো খরচ নাও হতে পারে। এই টিউটোরিয়ালের পর বিলিং এড়াতে রিসোর্সগুলো বন্ধ করার জন্য, আপনি আপনার তৈরি করা রিসোর্সগুলো অথবা প্রজেক্টটি ডিলিট করে দিতে পারেন। নতুন গুগল ক্লাউড ব্যবহারকারীরা ৩০০ মার্কিন ডলারের ফ্রি ট্রায়াল প্রোগ্রামের জন্য যোগ্য।
ক্লাউড শেল শুরু করুন
যদিও গুগল ক্লাউড আপনার ল্যাপটপ থেকে দূরবর্তীভাবে পরিচালনা করা যায়, এই কোডল্যাবে আপনি ক্লাউড শেল ব্যবহার করবেন, যা ক্লাউডে চালিত একটি কমান্ড লাইন পরিবেশ।
ক্লাউড শেল সক্রিয় করুন
- ক্লাউড কনসোল থেকে, Activate Cloud Shell-এ ক্লিক করুন।
.

আপনি যদি প্রথমবারের মতো ক্লাউড শেল চালু করেন, তাহলে এটি কী তা বর্ণনা করে একটি মধ্যবর্তী স্ক্রিন আপনার সামনে আসবে। যদি একটি মধ্যবর্তী স্ক্রিন আসে, তাহলে 'চালিয়ে যান' (Continue) এ ক্লিক করুন।

ক্লাউড শেল প্রস্তুত করতে এবং এর সাথে সংযোগ স্থাপন করতে মাত্র কয়েক মুহূর্ত সময় লাগা উচিত।

এই ভার্চুয়াল মেশিনটিতে প্রয়োজনীয় সমস্ত ডেভেলপমেন্ট টুলস লোড করা আছে। এটি একটি স্থায়ী ৫ জিবি হোম ডিরেক্টরি প্রদান করে এবং গুগল ক্লাউডে চলে, যা নেটওয়ার্ক পারফরম্যান্স ও অথেনটিকেশনকে ব্যাপকভাবে উন্নত করে। এই কোডল্যাবে আপনার প্রায় সমস্ত কাজই একটি ব্রাউজার দিয়ে করা সম্ভব।
ক্লাউড শেলে সংযুক্ত হওয়ার পর, আপনি দেখতে পাবেন যে আপনাকে প্রমাণীকৃত করা হয়েছে এবং প্রজেক্টটি আপনার প্রজেক্ট আইডিতে সেট করা আছে।
- আপনি প্রমাণীকৃত কিনা তা নিশ্চিত করতে ক্লাউড শেলে নিম্নলিখিত কমান্ডটি চালান:
gcloud auth list
কমান্ড আউটপুট
Credentialed Accounts
ACTIVE ACCOUNT
* <my_account>@<my_domain.com>
To set the active account, run:
$ gcloud config set account `ACCOUNT`
- gcloud কমান্ডটি আপনার প্রজেক্ট সম্পর্কে জানে কিনা তা নিশ্চিত করতে ক্লাউড শেলে নিম্নলিখিত কমান্ডটি চালান:
gcloud config list project
কমান্ড আউটপুট
[core] project = <PROJECT_ID>
যদি তা না থাকে, তবে আপনি এই কমান্ডটি দিয়ে এটি সেট করতে পারেন:
gcloud config set project <PROJECT_ID>
কমান্ড আউটপুট
Updated property [core/project].
৪. ধাপ #১ - Kaggle-এ সাইন-আপ এবং প্রমাণীকরণ করুন
কোডল্যাব শুরু করার জন্য, আপনাকে ক্যাগল-এ একটি অ্যাকাউন্ট তৈরি করতে হবে। ক্যাগল হলো গুগলের মালিকানাধীন ডেটা সায়েন্টিস্ট এবং মেশিন লার্নিং উৎসাহীদের জন্য একটি অনলাইন কমিউনিটি প্ল্যাটফর্ম, যেখানে বিভিন্ন ডোমেনের জন্য সর্বজনীনভাবে উপলব্ধ ডেটাসেটের একটি বিশাল ভান্ডার রয়েছে। এই সাইট থেকেই আপনি আপনার মডেলকে প্রশিক্ষণ দেওয়ার জন্য ব্যবহৃত রটেনটমেটোস ডেটাসেটটি ডাউনলোড করবেন।
- Kaggle- এ সাইন আপ করতে, আপনি সাইন-ইন করার জন্য Google SSO ব্যবহার করতে পারেন।
- শর্তাবলী মেনে নিন
- সেটিংসে যান এবং আপনার ইউজারনেমটি নিন।
- API সেকশনের অধীনে, "Create new token from" নির্বাচন করুন, যা kaggle.json ডাউনলোড করবে।
- আপনার কোনো সমস্যা হলে, এখানে সাপোর্ট পেজে যান।
৫. ধাপ ২ - HuggingFace-এ সাইন-আপ এবং প্রমাণীকরণ করুন।
হাগিংফেস হলো মেশিন লার্নিং প্রযুক্তির সাথে সকলের সম্পৃক্ত হওয়ার একটি কেন্দ্রীয় স্থান। এখানে ৯ লক্ষ মডেল, ২ লক্ষ ডেটাসেট এবং ৩ লক্ষ ডেমো অ্যাপ (স্পেস) রয়েছে, যেগুলো সবই ওপেন সোর্স এবং সর্বসাধারণের জন্য উন্মুক্ত।
- HuggingFace- এ সাইন আপ করুন - ইউজারনেম দিয়ে একটি অ্যাকাউন্ট তৈরি করুন, আপনি Google SSO ব্যবহার করতে পারবেন না।
- আপনার ইমেল ঠিকানা নিশ্চিত করুন
- এখানে যান এবং Gemma-2-9b-it মডেলটির লাইসেন্স গ্রহণ করুন।
- এখানে একটি হাগিংফেস টোকেন তৈরি করুন
- টোকেনের তথ্যগুলো লিখে রাখুন, পরে আপনার এটি প্রয়োজন হবে।
৬. ধাপ ৩ - প্রয়োজনীয় গুগল ক্লাউড অবকাঠামোগত রিসোর্স তৈরি করুন
আপনি ওয়ার্কলোড আইডেন্টিটি ফেডারেশন ব্যবহার করে GKE, GCE, আর্টিফ্যাক্ট রেজিস্ট্রি সেট আপ করবেন এবং IAM রোলগুলো প্রয়োগ করবেন।
আপনার এআই ওয়ার্কফ্লোতে দুটি নোডপুল ব্যবহৃত হয়, একটি ট্রেনিংয়ের জন্য এবং অন্যটি ইনফারেন্সের জন্য। ট্রেনিং নোডপুলটি একটি এনভিডিয়া এল৪ টেনসর কোর জিপিইউ সহ সজ্জিত একটি জি২-স্ট্যান্ডার্ড-৮ জিসিই ভিএম ব্যবহার করছে। ইনফারেন্স নোডপুলটি দুটি এনভিডিয়া এল৪ টেনসর কোর জিপিইউ সহ সজ্জিত একটি জি২-স্ট্যান্ডার্ড-২৪ ভিএম ব্যবহার করছে। অঞ্চল নির্দিষ্ট করার সময়, এমন একটি বেছে নিন যেখানে প্রয়োজনীয় জিপিইউটি সমর্থিত ( লিঙ্ক )।
আপনার ক্লাউড শেলে নিম্নলিখিত কমান্ডগুলো চালান:
# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")
SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1
# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto
# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount
# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com
# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
--accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-12 \
--num-nodes=1
# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
--accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-24 \
--num-nodes=1
# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a
# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}
আপনার YAML ম্যানিফেস্ট তৈরি করুন
mkdir manifests
cd manifests
mlops-sa.yaml
apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
name: airflow-mlops-sa
namespace: airflow
labels:
tier: airflow
pv-dags.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-dags
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 5Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_DAGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pv-logs.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-logs
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 100Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_LOGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pvc-dags.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-dags
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
volumeName: airflow-dags
storageClassName: standard
pvc-logs.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-logs
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
volumeName: airflow-logs
storageClassName: standard
namespace.yaml
kind: Namespace
apiVersion: v1
metadata:
name: airflow
labels:
name: airflow
sa-role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: airflow
name: airflow-deployment-role
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["services"]
verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]
sa-rolebinding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow-deployment-rolebinding
namespace: airflow
subjects:
- kind: ServiceAccount
name: airflow-worker
namespace: airflow
roleRef:
kind: Role
name: airflow-deployment-role
apiGroup: rbac.authorization.k8s.io
inference.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: inference-deployment
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: gemma-server
template:
metadata:
labels:
app: gemma-server
ai.gke.io/model: gemma-2-9b-it
ai.gke.io/inference-server: vllm
annotations:
gke-gcsfuse/volumes: "true"
spec:
serviceAccountName: airflow-mlops-sa
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
- key: "on-demand"
value: "true"
operator: "Equal"
effect: "NoSchedule"
containers:
- name: inference-server
image: vllm/vllm-openai:v0.6.6
ports:
- containerPort: 8000
resources:
requests:
nvidia.com/gpu: "2"
limits:
nvidia.com/gpu: "2"
command: ["/bin/sh", "-c"]
args:
- |
python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
volumeMounts:
- mountPath: /dev/shm
name: dshm
- name: gcs-fuse-csi-ephemeral
mountPath: /modeldata
readOnly: true
volumes:
- name: dshm
emptyDir:
medium: Memory
- name: gcs-fuse-csi-ephemeral
csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
bucketName: BUCKET_DATA_NAME
mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
fileCacheCapacity: "20Gi"
fileCacheForRangeRead: "true"
metadataStatCacheCapacity: "-1"
metadataTypeCacheCapacity: "-1"
metadataCacheTTLSeconds: "-1"
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-l4
inference-service.yaml
apiVersion: v1
kind: Service
metadata:
name: llm-service
namespace: airflow
spec:
selector:
app: gemma-server
type: LoadBalancer
ports:
- protocol: TCP
port: 8000
targetPort: 8000
৩টি গুগল ক্লাউড স্টোরেজ (GCS) বাকেট তৈরি করুন
gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}
# Create the namespace in GKE
kubectl apply -f namespace.yaml
# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml
Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"
৭. ধাপ ৪ - হেলম চার্টের মাধ্যমে GKE-তে Airflow ইনস্টল করুন।
এখন আমরা Helm ব্যবহার করে Airflow 2 ডেপ্লয় করব। Apache Airflow হলো ডেটা ইঞ্জিনিয়ারিং পাইপলাইনের জন্য একটি ওপেন-সোর্স ওয়ার্কফ্লো ম্যানেজমেন্ট প্ল্যাটফর্ম । আমরা পরে Airflow 2-এর ফিচার সেট নিয়ে আলোচনা করব।
এয়ারফ্লো হেলম চার্টের জন্য values.yaml
config:
webserver:
expose_config: true
webserver:
service:
type: LoadBalancer
podAnnotations:
gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
- name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
value: "30"
logs:
persistence:
enabled: true
existingClaim: "airflow-logs"
dags:
persistence:
enabled: true
existingClaim: "airflow-dags"
scheduler:
podAnnotations:
gke-gcsfuse/volumes: "true"
triggerer:
podAnnotations:
gke-gcsfuse/volumes: "true"
workers:
podAnnotations:
gke-gcsfuse/volumes: "true"
এয়ারফ্লো ২ স্থাপন করুন
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml
৮. ধাপ #৫ - সংযোগ এবং ভেরিয়েবল ব্যবহার করে এয়ারফ্লো চালু করুন
একবার Airflow 2 ডেপ্লয় করা হয়ে গেলে, আমরা এটি কনফিগার করা শুরু করতে পারি। আমরা কিছু ভ্যারিয়েবল নির্ধারণ করি, যেগুলো আমাদের পাইথন স্ক্রিপ্টগুলো পড়ে থাকে।
- আপনার ব্রাউজার দিয়ে পোর্ট ৮০৮০-তে এয়ারফ্লো ইউআই অ্যাক্সেস করুন।
বাহ্যিক আইপি পান
kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'
একটি ওয়েব ব্রাউজার খুলুন এবং http:// <EXTERNAL-IP> :8080 -এ যান। লগইন হলো admin / admin।
- Airflow UI-এর মধ্যে একটি ডিফল্ট GCP সংযোগ তৈরি করতে, Admin → Connections → + Add a new record-এ যান।
- সংযোগ আইডি: google_cloud_default
- সংযোগের ধরণ: গুগল ক্লাউড
সংরক্ষণ করুন-এ ক্লিক করুন।
- প্রয়োজনীয় ভেরিয়েবলগুলো তৈরি করতে, Admin → Variables → + Add a new record-এ যান।
- Key: BUCKET_DATA_NAME - Value: echo $BUCKET_DATA_NAME থেকে কপি করুন
- Key: GCP_PROJECT_ID - Value: echo $DEVSHELL_PROJECT_ID থেকে কপি করুন
- কী: HF_TOKEN - মান: আপনার HF টোকেন প্রবেশ করান
- Key: KAGGLE_USERNAME - Value: আপনার ক্যাগল ইউজারনেম লিখুন
- Key: KAGGLE_KEY - Value: kaggle.json থেকে এটি কপি করুন
প্রতিটি কী-ভ্যালু পেয়ারের পরে সেভ-এ ক্লিক করুন।
আপনার UI দেখতে এইরকম হওয়া উচিত:

৯. অ্যাপ্লিকেশন কোড কন্টেইনার #১ - ডেটা ডাউনলোড
এই পাইথন স্ক্রিপ্টে, আমরা আমাদের GCS বাকেটে ডেটাসেটটি ডাউনলোড করার জন্য Kaggle-এর সাথে প্রমাণীকরণ করি।
স্ক্রিপ্টটিকে কন্টেইনারাইজ করা হয়েছে, কারণ এটি DAG ইউনিট #১ হয়ে ওঠে এবং আমরা আশা করি ডেটাসেটটি ঘন ঘন আপডেট করা হবে, তাই আমরা এই প্রক্রিয়াটি স্বয়ংক্রিয় করতে চাই।
ডিরেক্টরি তৈরি করুন এবং আমাদের স্ক্রিপ্টগুলো এখানে কপি করুন।
cd .. ; mkdir 1-dataset-download
cd 1-dataset-download
ডেটাসেট-ডাউনলোড.py
import os
import kagglehub
from google.cloud import storage
KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
def upload_blob(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")
print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"
upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)
ডকারফাইল
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]
requirements.txt
google-cloud-storage==2.19.0
kagglehub==0.3.4
এখন আমরা ডেটাসেট-ডাউনলোডের জন্য একটি কন্টেইনার ইমেজ তৈরি করব এবং সেটিকে আর্টিফ্যাক্ট রেজিস্ট্রি-তে পুশ করব।
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest
১০. অ্যাপ্লিকেশন কোড কন্টেইনার #২ - ডেটা প্রস্তুতি
আমাদের ডেটা প্রস্তুতির ধাপে আমরা যা অর্জন করি তা হলো:
- আমাদের বেস মডেলের ফাইন-টিউনিংয়ের জন্য ডেটাসেটের কতটুকু অংশ ব্যবহার করতে চাই, তা নির্দিষ্ট করুন।
- ডেটা সেট লোড করে, অর্থাৎ CSV ফাইলটিকে পড়ে একটি Pandas ডেটাফ্রেমে নেয়, যা সারি এবং কলামের জন্য একটি দ্বি-মাত্রিক ডেটা কাঠামো।
- ডেটা রূপান্তর / প্রাক-প্রক্রিয়াকরণ - ডেটাসেটের কোন অংশগুলো অপ্রাসঙ্গিক তা নির্ধারণ করা এবং তার জন্য আমরা কী রাখতে চাই তা নির্দিষ্ট করে দেওয়া, যার ফলস্বরূপ বাকি অংশগুলো বাদ দেওয়া হয়।
- ডেটাফ্রেমের প্রতিটি সারিতে
transformফাংশনটি প্রয়োগ করে। - প্রস্তুতকৃত ডেটা GCS বাকেটে পুনরায় সংরক্ষণ করুন।
ডিরেক্টরি তৈরি করুন এবং আমাদের স্ক্রিপ্টগুলো এখানে কপি করুন।
cd .. ; mkdir 2-data-preparation
cd 2-data-preparation
data-preparation.py
import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset
# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100")) # Process a limited number of rows, used 100 during testing phase but can be increased
DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"
# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")
def transform(data):
"""
Transforms a row of the DataFrame into the desired format for fine-tuning.
Args:
data: A pandas Series representing a row of the DataFrame.
Returns:
A dictionary containing the formatted text.
"""
question = f"Review analysis for movie '{data['id']}'"
context = data['reviewText']
answer = data['scoreSentiment']
template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
return {'text': template.format(question=question, context=context, answer=answer)}
try:
df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
print(f"Dataset loaded successfully.")
# Drop rows with NaN values in relevant columns
df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])
# Apply transformation to the DataFrame
transformed_data = df.apply(transform, axis=1).tolist()
# Convert transformed data to a DataFrame and then to a Hugging Face Dataset
transformed_df = pd.DataFrame(transformed_data)
dataset = Dataset.from_pandas(transformed_df)
# Save the prepared dataset to JSON lines format
with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
for item in dataset:
f.write(json.dumps(item) + "\n")
print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
except Exception as e:
print(f"Error during data loading or preprocessing: {e}")
import traceback
print(traceback.format_exc())
ডকারফাইল
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]
requirements.txt
datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3
# Now we create a container images for data-preparation and push it to the Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest
11. অ্যাপ্লিকেশন কোড কন্টেইনার #3 - ফাইনটিউনিং
এখানে আমরা Gemma-2-9b-it-কে ভিত্তি মডেল হিসেবে ব্যবহার করি এবং তারপর আমাদের নতুন ডেটাসেট দিয়ে এটিকে ফাইনটিউন করি।
সূক্ষ্ম সমন্বয়ের ধাপে এইগুলি ক্রমে ঘটে থাকে।
১. সেটআপ: লাইব্রেরি ইম্পোর্ট করুন, প্যারামিটার (মডেল, ডেটা এবং প্রশিক্ষণের জন্য) নির্ধারণ করুন এবং গুগল ক্লাউড স্টোরেজ থেকে ডেটাসেট লোড করুন।
২. মডেল লোড করুন: কার্যকারিতার জন্য কোয়ান্টাইজেশন সহ একটি প্রি-ট্রেইনড ল্যাঙ্গুয়েজ মডেল এবং সংশ্লিষ্ট টোকেনাইজার লোড করুন।
৩. LoRA কনফিগার করুন: ছোট প্রশিক্ষণযোগ্য ম্যাট্রিক্স যোগ করে মডেলটিকে দক্ষতার সাথে ফাইন-টিউন করার জন্য লো-র্যাঙ্ক অ্যাডাপটেশন (LoRA) সেট আপ করুন।
৪. প্রশিক্ষণ: প্রশিক্ষণের প্যারামিটারগুলো নির্ধারণ করুন এবং FP16 কোয়ান্টাইজেশন টাইপ ব্যবহার করে লোড করা ডেটাসেটের উপর মডেলটিকে ফাইন-টিউন করতে SFTTrainer ব্যবহার করুন।
৫. সংরক্ষণ ও আপলোড: সূক্ষ্মভাবে টিউন করা মডেল এবং টোকেনাইজারটি স্থানীয়ভাবে সংরক্ষণ করুন, তারপর সেগুলোকে আমাদের GCS বাকেটে আপলোড করুন।
এরপর আমরা ক্লাউড বিল্ড ব্যবহার করে একটি কন্টেইনার ইমেজ তৈরি করি এবং সেটিকে আর্টিফ্যাক্ট রেজিস্ট্রি-তে সংরক্ষণ করি।
ডিরেক্টরি তৈরি করুন এবং আমাদের স্ক্রিপ্টগুলো এখানে কপি করুন।
cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning
finetuning.py
import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage
# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")
# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")
# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
"json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)
################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))
################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03
# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50
################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False
# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
attn_implementation="eager",
pretrained_model_name_or_path=model_name,
torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")
# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
r=lora_r,
bias="none",
task_type="CAUSAL_LM",
target_modules=[
"q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj",
],
)
# Set training parameters
training_arguments = SFTConfig(
bf16=bf16,
dataset_kwargs={
"add_special_tokens": False,
"append_concat_token": False,
},
dataset_text_field="text",
disable_tqdm=True,
fp16=fp16,
gradient_accumulation_steps=gradient_accumulation_steps,
gradient_checkpointing=gradient_checkpointing,
gradient_checkpointing_kwargs={"use_reentrant": False},
group_by_length=group_by_length,
log_on_each_node=False,
logging_steps=logging_steps,
learning_rate=learning_rate,
lr_scheduler_type=lr_scheduler_type,
max_grad_norm=max_grad_norm,
max_seq_length=max_seq_length,
max_steps=max_steps,
num_train_epochs=num_train_epochs,
optim=optim,
output_dir=save_model_path,
packing=packing,
per_device_train_batch_size=per_device_train_batch_size,
save_strategy=save_strategy,
save_steps=save_steps,
save_total_limit=save_total_limit,
warmup_ratio=warmup_ratio,
weight_decay=weight_decay,
)
print(f"Configuring fine tuning completed")
# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
model=model,
train_dataset=dataset,
peft_config=peft_config,
dataset_text_field="text",
max_seq_length=max_seq_length,
tokenizer=tokenizer,
args=training_arguments,
packing=packing,
)
print(f"Creating trainer completed")
# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")
# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")
# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
low_cpu_mem_usage=True,
pretrained_model_name_or_path=model_name,
return_dict=True,
torch_dtype=torch.float16,
)
model = PeftModel.from_pretrained(
model=base_model,
model_id=new_model,
)
model = model.merge_and_unload()
print(f"Merging the new model with base model completed")
accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")
print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
is_main_process=accelerator.is_main_process,
save_directory=save_model_path,
save_function=accelerator.save,
)
print(f"Save unwrapped model completed")
print(f"Save new tokenizer started")
if accelerator.is_main_process:
tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")
# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
"""Uploads a directory to GCS."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
for root, _, files in os.walk(model_dir):
for file in files:
local_file_path = os.path.join(root, file)
gcs_file_path = os.path.relpath(local_file_path, model_dir)
blob = bucket.blob(os.path.join(new_model, gcs_file_path)) # Use new_model_name
blob.upload_from_filename(local_file_path)
# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")
ডকারফাইল
# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04
# Install necessary system packages
RUN apt-get update && \
apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
rm -rf /var/lib/apt/lists/*
# Copy requirements.txt into the container
COPY requirements.txt .
# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
# Copy your finetune script into the container
COPY finetuning.py .
# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]
requirements.txt
accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4
এখন আমরা সূক্ষ্ম সমন্বয়ের জন্য একটি কন্টেইনার ইমেজ তৈরি করে আর্টিফ্যাক্ট রেজিস্ট্রি-তে পুশ করব।
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest
১২. বায়ুপ্রবাহ ২ এর সংক্ষিপ্ত বিবরণ, সাথে DAG কী
এয়ারফ্লো হলো ওয়ার্কফ্লো এবং ডেটা পাইপলাইন সমন্বয়ের একটি প্ল্যাটফর্ম। এটি পাইথন কোডে এই ওয়ার্কফ্লোগুলো সংজ্ঞায়িত করতে ডিএজি (ডাইরেক্টেড অ্যাসাইক্লিক গ্রাফ) ব্যবহার করে, যা টাস্ক এবং তাদের নির্ভরশীলতাকে দৃশ্যমানভাবে উপস্থাপন করে।
এয়ারফ্লো, তার স্ট্যাটিক DAG এবং পাইথন-ভিত্তিক ডেফিনিশনের সাহায্যে, পূর্বনির্ধারিত ওয়ার্কফ্লো শিডিউল করা ও পরিচালনা করার জন্য অত্যন্ত উপযোগী। এর আর্কিটেকচারে এই ওয়ার্কফ্লোগুলো মনিটর ও পরিচালনা করার জন্য একটি ব্যবহারকারী-বান্ধব UI অন্তর্ভুক্ত রয়েছে।
মূলত, এয়ারফ্লো আপনাকে পাইথন ব্যবহার করে আপনার ডেটা পাইপলাইনগুলো সংজ্ঞায়িত করতে, সময়সূচী নির্ধারণ করতে এবং নিরীক্ষণ করতে দেয়, যা এটিকে ওয়ার্কফ্লো অর্কেস্ট্রেশনের জন্য একটি নমনীয় ও শক্তিশালী টুলে পরিণত করে।
১৩. আমাদের ডিএজি-এর সংক্ষিপ্ত বিবরণ

DAG-এর পূর্ণরূপ হলো ডিরেক্টেড অ্যাসাইক্লিক গ্রাফ। Airflow-তে একটি DAG নিজেই সম্পূর্ণ ওয়ার্কফ্লো বা পাইপলাইনকে উপস্থাপন করে। এটি টাস্কগুলো, তাদের নির্ভরশীলতা এবং সম্পাদনের ক্রম নির্ধারণ করে।
DAG-এর অন্তর্গত ওয়ার্কফ্লো ইউনিটগুলি GKE ক্লাস্টারের একটি পড থেকে সম্পাদিত হয়, যা Airflow কনফিগারেশন থেকে শুরু করা হয়।
সারসংক্ষেপ:
এয়ারফ্লো: ডেটা ডাউনলোড - এই স্ক্রিপ্টটি ক্যাগল (Kaggle) থেকে একটি মুভি রিভিউ ডেটাসেট সংগ্রহ করে আপনার GCS বাকেটে সংরক্ষণ করার প্রক্রিয়াটিকে স্বয়ংক্রিয় করে, যার ফলে এটি আপনার ক্লাউড পরিবেশে পরবর্তী প্রক্রিয়াকরণ বা বিশ্লেষণের জন্য সহজেই উপলব্ধ হয়।
এয়ারফ্লো: ডেটা প্রস্তুতি - এই কোডটি মূল মুভি রিভিউ ডেটাসেটটি গ্রহণ করে, আমাদের ব্যবহারের জন্য অপ্রয়োজনীয় অতিরিক্ত ডেটা কলামগুলো সরিয়ে দেয় এবং অনুপস্থিত মান (missing values) সহ ডেটাসেটগুলো মুছে ফেলে। এরপর, এটি ডেটাসেটটিকে মেশিন লার্নিংয়ের জন্য উপযুক্ত একটি প্রশ্নোত্তর বিন্যাসে সাজিয়ে নেয় এবং পরবর্তী ব্যবহারের জন্য এটিকে আবার GCS-এ সংরক্ষণ করে।
এয়ারফ্লো: মডেল ফাইন-টিউনিং - এই কোডটি LoRA (লো-র্যাঙ্ক অ্যাডাপটেশন) নামক একটি কৌশল ব্যবহার করে একটি বৃহৎ ল্যাঙ্গুয়েজ মডেল (LLM) ফাইন-টিউন করে এবং তারপর আপডেট করা মডেলটি সংরক্ষণ করে। এটি গুগল ক্লাউড স্টোরেজ থেকে একটি প্রি-ট্রেইনড LLM এবং একটি ডেটাসেট লোড করার মাধ্যমে কাজ শুরু করে। তারপর, এই ডেটাসেটের উপর মডেলটিকে দক্ষতার সাথে ফাইন-টিউন করার জন্য এটি LoRA প্রয়োগ করে। সবশেষে, টেক্সট জেনারেশন বা প্রশ্নোত্তরের মতো অ্যাপ্লিকেশনগুলিতে পরবর্তী ব্যবহারের জন্য ফাইন-টিউন করা মডেলটি পুনরায় গুগল ক্লাউড স্টোরেজে সংরক্ষণ করা হয়।
এয়ারফ্লো: মডেল সার্ভিং - ইনফারেন্সের জন্য vllm ব্যবহার করে GKE-তে ফাইন-টিউন করা মডেলটি পরিবেশন করা।
বায়ুপ্রবাহ: ফিডব্যাক লুপ - প্রতি xx সময় পর পর (ঘণ্টা, দিন বা সপ্তাহ) মডেলের পুনঃপ্রশিক্ষণ।
এই ডায়াগ্রামটি ব্যাখ্যা করে যে, GKE-তে চালিত হলে Airflow 2 কীভাবে কাজ করে।

14. RAG ব্যবহার করে একটি মডেলের ফিনিটিউনিং
এই কোডল্যাবটি রিট্রিভাল অগমেন্টেড জেনারেশন (RAG) ব্যবহার না করে একটি এলএলএম (LLM)-কে সূক্ষ্মভাবে সমন্বয় করে।
আসুন এই দুটি পদ্ধতির তুলনা করা যাক:
ফাইনটিউনিং: একটি বিশেষায়িত মডেল তৈরি করে: ফাইনটিউনিং এলএলএম-কে একটি নির্দিষ্ট কাজ বা ডেটাসেটের সাথে খাপ খাইয়ে নেয়, যার ফলে এটি বাহ্যিক ডেটা উৎসের উপর নির্ভর না করে স্বাধীনভাবে কাজ করতে পারে।
অনুমান প্রক্রিয়াকে সহজ করে: এর ফলে একটি পৃথক তথ্য পুনরুদ্ধার ব্যবস্থা এবং ডেটাবেসের প্রয়োজন হয় না, যার ফলস্বরূপ দ্রুততর এবং সাশ্রয়ী ফলাফল পাওয়া যায়, বিশেষ করে ঘন ঘন ব্যবহারের ক্ষেত্রে।
RAG: বাহ্যিক জ্ঞানের উপর নির্ভর করে: RAG প্রতিটি অনুরোধের জন্য একটি নলেজ বেস থেকে প্রাসঙ্গিক তথ্য সংগ্রহ করে, যা হালনাগাদ এবং নির্দিষ্ট ডেটাতে প্রবেশাধিকার নিশ্চিত করে।
জটিলতা বৃদ্ধি: কুবারনেটিস ক্লাস্টারের মতো প্রোডাকশন এনভায়রনমেন্টে RAG প্রয়োগ করতে প্রায়শই ডেটা প্রসেসিং এবং পুনরুদ্ধারের জন্য একাধিক মাইক্রোসার্ভিসের প্রয়োজন হয়, যা সম্ভাব্যভাবে লেটেন্সি এবং কম্পিউটেশনাল খরচ বাড়িয়ে দেয়।
কেন ফাইন-টিউনিং বেছে নেওয়া হয়েছিল:
যদিও এই কোডল্যাবে ব্যবহৃত ছোট ডেটাসেটের জন্য RAG উপযুক্ত হতো, আমরা Airflow-এর একটি সাধারণ ব্যবহার দেখানোর জন্য ফাইন-টিউনিং করার সিদ্ধান্ত নিয়েছি। এই সিদ্ধান্তের ফলে আমরা RAG-এর জন্য অতিরিক্ত পরিকাঠামো এবং মাইক্রোসার্ভিস সেট আপ করার সূক্ষ্ম বিষয়গুলিতে না গিয়ে, ওয়ার্কফ্লো অর্কেস্ট্রেশন দিকগুলির উপর মনোযোগ দিতে পেরেছি।
উপসংহার:
ফাইনটিউনিং এবং RAG উভয়ই মূল্যবান কৌশল, যাদের নিজস্ব শক্তি ও দুর্বলতা রয়েছে। সর্বোত্তম নির্বাচনটি আপনার প্রকল্পের নির্দিষ্ট প্রয়োজনীয়তার উপর নির্ভর করে, যেমন আপনার ডেটার আকার ও জটিলতা, পারফরম্যান্সের চাহিদা এবং খরচের বিষয়াদি।
১৫. DAG টাস্ক #১ - Airflow-তে আপনার প্রথম ধাপ তৈরি করুন: ডেটা ডাউনলোড
এই DAG ইউনিটটির একটি সংক্ষিপ্ত বিবরণ হলো, একটি কন্টেইনার ইমেজে হোস্ট করা আমাদের পাইথন কোডটি Kaggle থেকে সর্বশেষ RottenTomatoes ডেটাসেটটি ডাউনলোড করে।
এই কোডটি GCS বাকেটে কপি করবেন না। আমরা শেষ ধাপ হিসেবে mlops-dag.py কপি করি, যেটিতে একটি পাইথন স্ক্রিপ্টের মধ্যেই সমস্ত DAG ইউনিট ধাপ রয়েছে।
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
dataset_download
১৬. DAG টাস্ক #২ - এয়ারফ্লো-তে আপনার দ্বিতীয় ধাপ তৈরি করুন: ডেটা প্রস্তুতি
এই DAG ইউনিটটির একটি সার্বিক ধারণা দেওয়ার জন্য, আমরা GCS থেকে একটি CSV ফাইল (rotten_tomatoes_movie_reviews.csv) একটি Pandas DataFrame-এ লোড করি।
এরপরে, পরীক্ষা এবং সম্পদের কার্যকারিতার জন্য আমরা DATASET_LIMIT ব্যবহার করে প্রক্রিয়াকৃত সারির সংখ্যা সীমিত করি এবং অবশেষে রূপান্তরিত ডেটাটিকে একটি হাগিং ফেস ডেটাসেটে (Hugging Face Dataset) রূপান্তর করি।
আপনি যদি ভালোভাবে লক্ষ্য করেন, তাহলে দেখতে পাবেন যে আমরা মডেলে "DATASET_LIMIT": "1000" ব্যবহার করে ১০০০টি সারি প্রশিক্ষণ দিচ্ছি, কারণ একটি Nvidia L4 GPU-তে এটি করতে ২০ মিনিট সময় লাগে।
এই কোডটি GCS বাকেটে কপি করবেন না। আমরা শেষ ধাপে mlops-dag.py ফাইলটি কপি করি, যেটিতে সমস্ত ধাপ একটি পাইথন স্ক্রিপ্টের মধ্যেই রয়েছে।
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation
১৭. DAG টাস্ক #৩ - এয়ারফ্লো-তে আপনার তৃতীয় ধাপ তৈরি করুন: মডেল ফাইনটিউনিং
এই DAG ইউনিটটির একটি সার্বিক ধারণা দেওয়ার জন্য, এখানে আমরা আমাদের নতুন ডেটাসেট দিয়ে Gemma মডেলটিকে পরিমার্জন করতে finetune.py এক্সিকিউট করছি।
এই কোডটি GCS বাকেটে কপি করবেন না। আমরা শেষ ধাপে mlops-dag.py ফাইলটি কপি করি, যেটিতে সমস্ত ধাপ একটি পাইথন স্ক্রিপ্টের মধ্যেই রয়েছে।
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Task 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Task 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Task 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation >> fine_tuning
১৮. DAG টাস্ক #৪ - Airflow-তে আপনার চূড়ান্ত ধাপ তৈরি করুন: ইনফারেন্স / মডেল পরিবেশন করা
vLLM হলো একটি শক্তিশালী ওপেন-সোর্স লাইব্রেরি, যা বিশেষভাবে LLM-এর উচ্চ-পারফরম্যান্স ইনফারেন্সের জন্য ডিজাইন করা হয়েছে। গুগল কুবারনেটিস ইঞ্জিন (GKE)-এ ডেপ্লয় করা হলে, এটি কুবারনেটিসের স্কেলেবিলিটি এবং দক্ষতাকে কাজে লাগিয়ে কার্যকরভাবে LLM পরিবেশন করে।
ধাপগুলোর সারসংক্ষেপ:
- "mlops-dag.py" DAG-টি GCS বাকেটে আপলোড করুন।
- ইনফারেন্স সেটআপ করার জন্য দুটি Kubernetes YAML কনফিগারেশন ফাইল একটি GCS বাকেটে কপি করুন।
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
def model_serving():
config.load_incluster_config()
k8s_apps_v1 = client.AppsV1Api()
k8s_core_v1 = client.CoreV1Api()
while True:
try:
k8s_apps_v1.delete_namespaced_deployment(
namespace="airflow",
name="inference-deployment",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Deployment inference-deployment deleted")
with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_apps_v1.create_namespaced_deployment(
body=dep, namespace="airflow")
print(f"Deployment created. Status='{resp.metadata.name}'")
while True:
try:
k8s_core_v1.delete_namespaced_service(
namespace="airflow",
name="llm-service",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Service llm-service deleted")
with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_core_v1.create_namespaced_service(
body=dep, namespace="airflow")
print(f"Service created. Status='{resp.metadata.name}'")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 4: Run GKE Deployment for model serving
model_serving = PythonOperator(
task_id="model_serving",
python_callable=model_serving
)
dataset_download >> data_preparation >> fine_tuning >> model_serving
আপনার পাইথন স্ক্রিপ্ট (DAG ফাইল) এবং কুবারনেটিস ম্যানিফেস্টগুলো DAGS GCS বাকেটে আপলোড করুন।
gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}
আপনার Airflow UI-তে আপনি mlops-dag দেখতে পাবেন।
- আন-পজ নির্বাচন করুন।
- ম্যানুয়াল MLOps সাইকেল সম্পাদন করতে ট্রিগার DAG নির্বাচন করুন।

আপনার DAG তৈরি হয়ে গেলে, আপনি Airflow UI-তে এই ধরনের আউটপুট দেখতে পাবেন।

চূড়ান্ত ধাপের পরে, আপনি মডেল এন্ডপয়েন্টটি নিয়ে মডেলটি পরীক্ষা করার জন্য একটি প্রম্পট পাঠাতে পারেন।
`curl` কমান্ডটি দেওয়ার আগে প্রায় ৫ মিনিট অপেক্ষা করুন, যাতে মডেলটির ইনফারেন্স শুরু হতে পারে এবং লোড ব্যালেন্সার একটি এক্সটার্নাল আইপি অ্যাড্রেস বরাদ্দ করতে পারে।
export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
"prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
"temperature": 0.1,
"top_p": 1.0,
"max_tokens": 128
}
EOF
আউটপুট:
১৯. অভিনন্দন!
আপনি GKE-তে Airflow 2 ব্যবহার করে একটি DAG পাইপলাইনের মাধ্যমে আপনার প্রথম AI ওয়ার্কফ্লো তৈরি করেছেন।
আপনার স্থাপন করা রিসোর্সগুলো অ-স্থানান্তর করতে ভুলবেন না।
২০. উৎপাদনে এটি করা
যদিও কোডল্যাব আপনাকে GKE-তে Airflow 2 সেট আপ করার বিষয়ে একটি চমৎকার ধারণা দিয়েছে, বাস্তব জগতে প্রোডাকশনে এটি করার সময় আপনাকে নিম্নলিখিত কিছু বিষয় বিবেচনা করতে হবে।
Gradio বা অনুরূপ টুল ব্যবহার করে একটি ওয়েব ফ্রন্টএন্ড তৈরি করুন।
হয় এখানে GKE দিয়ে ওয়ার্কলোডগুলির জন্য স্বয়ংক্রিয় অ্যাপ্লিকেশন মনিটরিং কনফিগার করুন অথবা এখানে Airflow থেকে মেট্রিক্স এক্সপোর্ট করুন।
মডেলটিকে দ্রুত ফাইন-টিউন করার জন্য আপনার আরও বড় জিপিইউ-এর প্রয়োজন হতে পারে, বিশেষ করে যদি আপনার ডেটাসেট বড় হয়। তবে, যদি আমরা একাধিক জিপিইউ জুড়ে মডেলটিকে ট্রেইন করতে চাই, তাহলে আমাদের ডেটাসেটকে ভাগ করে ট্রেনিংটি শার্ড করতে হবে। এখানে পাইটর্চ (PyTorch) দিয়ে এফএসডিপি (FSDP) -এর একটি ব্যাখ্যা দেওয়া হলো (এই লক্ষ্য অর্জনের জন্য জিপিইউ শেয়ারিং ব্যবহার করে সম্পূর্ণ শার্ড করা ডেটা প্যারালাল)। এ বিষয়ে আরও জানতে মেটা (Meta) -র একটি ব্লগ পোস্ট এবং পাইটর্চ ব্যবহার করে এফএসডিপি-র উপর এই টিউটোরিয়ালটি দেখতে পারেন।
গুগল ক্লাউড কম্পোজার একটি পরিচালিত এয়ারফ্লো পরিষেবা, তাই আপনাকে এয়ারফ্লো নিজে রক্ষণাবেক্ষণ করতে হবে না, শুধু আপনার DAG স্থাপন করুন এবং কাজ শুরু করুন।
আরও জানুন
- এয়ারফ্লো ডকুমেন্টেশন: https://airflow.apache.org/
লাইসেন্স
এই কাজটি ক্রিয়েটিভ কমন্স অ্যাট্রিবিউশন ২.০ জেনেরিক লাইসেন্সের অধীনে রয়েছে।