1. Обзор

В этом CodeLab демонстрируется, как интегрировать практики DevOps в машинное обучение (MLOps) путем загрузки набора данных, уточнения модели и развертывания LLM на Google Kubernetes Engine (GKE) с использованием DAG Airflow с минимальным уровнем абстракции. В результате мы используем команды gcloud, а не terraform, чтобы вы могли шаг за шагом следовать инструкциям и легко понимать каждый процесс как с точки зрения инженера платформы, так и с точки зрения инженера машинного обучения.
В этом практическом руководстве вы узнаете, как использовать Airflow для оптимизации рабочих процессов в области ИИ, и получите наглядную демонстрацию всего жизненного цикла MLOps на примере настройки DAG.
Что вы узнаете
- Способствовать более тесному сотрудничеству и взаимопониманию между инженерами по платформам и машинному обучению, устраняя разрозненность знаний и улучшая рабочие процессы.
- Разберитесь, как развертывать, использовать и управлять Airflow 2 в GKE.
- Настройка DAG Airflow от начала до конца.
- Создайте основу для систем машинного обучения производственного уровня с помощью GKE.
- Внедрение и практическое применение систем машинного обучения
- Поймите, как разработка платформы стала важнейшим элементом поддержки MLOps.
Что достигается с помощью этого CodeLab
- Вы можете задавать вопросы о фильмах, используя LLM, который мы доработали на основе Gemma-2-9b-it, работающего в GKE с vLLM.
Целевая аудитория
- Инженеры по машинному обучению
- Инженеры платформы
- специалисты по анализу данных
- Инженеры по обработке данных
- Инженеры DevOps
- Архитектор платформы
- Инженеры по работе с клиентами
Данный CodeLab не предназначен для
- В качестве введения в GKE или рабочие процессы ИИ/машинного обучения
- В качестве обзора всего набора функций Airflow
2. Разработка платформ помогает инженерам/ученым в области машинного обучения.

Разработка платформ и MLOps — это взаимозависимые дисциплины, которые совместно создают надежную и эффективную среду для разработки и развертывания машинного обучения.
Область применения: Разработка платформ имеет более широкую область применения, чем MLOps, охватывая весь жизненный цикл разработки программного обеспечения и предоставляя для этого инструменты и инфраструктуру.
MLOps устраняет разрыв между разработкой, развертыванием и выводом результатов машинного обучения.
Специализация: Инженеры платформ, как правило, обладают глубокими знаниями в области инфраструктурных технологий, таких как облачные вычисления, контейнеризация и управление данными.
Инженеры MLOps специализируются на разработке, развертывании и мониторинге моделей машинного обучения, часто обладая навыками в области анализа данных и разработки программного обеспечения.
Инструменты: Инженеры платформы создают инструменты для предоставления инфраструктуры, управления конфигурациями, оркестровки контейнеров и создания каркаса приложений. Инженеры MLOps используют инструменты для обучения моделей машинного обучения, экспериментирования, развертывания, мониторинга и версионирования.
3. Настройка и требования Google Cloud
Настройка среды для самостоятельного обучения
- Войдите в консоль Google Cloud и создайте новый проект или используйте существующий. Если у вас еще нет учетной записи Gmail или Google Workspace, вам необходимо ее создать .



- Название проекта — это отображаемое имя участников данного проекта. Это строка символов, не используемая API Google. Вы всегда можете его изменить.
- Идентификатор проекта уникален для всех проектов Google Cloud и является неизменяемым (его нельзя изменить после установки). Консоль Cloud автоматически генерирует уникальную строку; обычно вам неважно, какая она. В большинстве практических заданий вам потребуется указать идентификатор вашего проекта (обычно обозначается как
PROJECT_ID). Если сгенерированный идентификатор вас не устраивает, вы можете сгенерировать другой случайный идентификатор. В качестве альтернативы вы можете попробовать свой собственный и посмотреть, доступен ли он. После этого шага его нельзя изменить, и он сохраняется на протяжении всего проекта. - К вашему сведению, существует третье значение — номер проекта , которое используется некоторыми API. Подробнее обо всех трех значениях можно узнать в документации .
- Далее вам потребуется включить оплату в консоли Cloud для использования ресурсов/API Cloud. Выполнение этого практического задания не потребует больших затрат, если вообще потребует. Чтобы отключить ресурсы и избежать дополнительных расходов после завершения этого урока, вы можете удалить созданные ресурсы или удалить проект. Новые пользователи Google Cloud имеют право на бесплатную пробную версию стоимостью 300 долларов США .
Запустить Cloud Shell
Хотя Google Cloud можно управлять удаленно с ноутбука, в этом практическом занятии вы будете использовать Cloud Shell — среду командной строки, работающую в облаке.
Активировать Cloud Shell
- В консоли Cloud нажмите «Активировать Cloud Shell» .
.

Если вы запускаете Cloud Shell впервые, вам будет показан промежуточный экран с описанием его возможностей. Если вам был показан промежуточный экран, нажмите «Продолжить» .

Подготовка и подключение к Cloud Shell займут всего несколько минут.

Эта виртуальная машина оснащена всеми необходимыми инструментами разработки. Она предоставляет постоянный домашний каталог объемом 5 ГБ и работает в облаке Google, что значительно повышает производительность сети и аутентификацию. Большая часть, если не вся, ваша работа в этом практическом задании может быть выполнена с помощью браузера.
После подключения к Cloud Shell вы увидите, что прошли аутентификацию и что проект настроен на ваш идентификатор проекта.
- Выполните следующую команду в Cloud Shell, чтобы подтвердить свою аутентификацию:
gcloud auth list
вывод команды
Credentialed Accounts
ACTIVE ACCOUNT
* <my_account>@<my_domain.com>
To set the active account, run:
$ gcloud config set account `ACCOUNT`
- Выполните следующую команду в Cloud Shell, чтобы убедиться, что команда gcloud знает о вашем проекте:
gcloud config list project
вывод команды
[core] project = <PROJECT_ID>
Если это не так, вы можете установить это с помощью следующей команды:
gcloud config set project <PROJECT_ID>
вывод команды
Updated property [core/project].
4. Шаг 1 — Зарегистрируйтесь и пройдите аутентификацию на Kaggle.
Чтобы начать работу над CodeLab, вам необходимо создать учетную запись на Kaggle — онлайн-платформе для сообщества специалистов по обработке данных и энтузиастов машинного обучения, принадлежащей Google и содержащей обширный репозиторий общедоступных наборов данных для различных областей. Именно с этого сайта вы загрузите набор данных RottenTomatoes, используемый для обучения вашей модели.
- Зарегистрируйтесь на Kaggle , для входа используйте Google SSO.
- Примите условия и положения.
- Перейдите в Настройки и получите свое имя пользователя.
- В разделе API выберите «Создать новый токен из» Kaggle, после чего будет загружен файл kaggle.json.
- Если у вас возникнут какие-либо проблемы, перейдите на страницу поддержки здесь.
5. Шаг 2 — Регистрация и авторизация на HuggingFace.
HuggingFace — это централизованная платформа для всех желающих познакомиться с технологиями машинного обучения. Здесь размещено 900 тысяч моделей, 200 тысяч наборов данных и 300 тысяч демонстрационных приложений (Spaces), все с открытым исходным кодом и доступны для публичного использования.
- Регистрация в HuggingFace — создайте учетную запись с именем пользователя; использование Google SSO невозможно.
- Подтвердите свой адрес электронной почты
- Перейдите сюда и примите лицензионное соглашение для модели Gemma-2-9b-it.
- Создайте токен HuggingFace здесь.
- Запишите учетные данные токена, они понадобятся вам позже.
6. Шаг 3 — Создайте необходимые ресурсы инфраструктуры Google Cloud.
Вы настроите GKE, GCE, реестр артефактов и примените роли IAM, используя федерацию идентификации рабочих нагрузок .
В вашем рабочем процессе ИИ используются два пула узлов: один для обучения, другой для вывода результатов. Пул узлов для обучения использует виртуальную машину GCE g2-standard-8, оснащенную одним графическим процессором Nvidia L4 Tensor Core. Пул узлов для вывода результатов использует виртуальную машину g2-standard-24, оснащенную двумя графическими процессорами Nvidia L4 Tensor Core. При указании региона выберите тот, где поддерживается необходимый графический процессор ( ссылка ).
В оболочке Cloud Shell выполните следующие команды:
# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")
SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1
# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto
# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount
# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com
# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
--accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-12 \
--num-nodes=1
# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
--accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-24 \
--num-nodes=1
# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a
# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}
Создайте свои YAML-манифесты.
mkdir manifests
cd manifests
mlops-sa.yaml
apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
name: airflow-mlops-sa
namespace: airflow
labels:
tier: airflow
pv-dags.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-dags
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 5Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_DAGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pv-logs.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-logs
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 100Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_LOGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pvc-dags.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-dags
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
volumeName: airflow-dags
storageClassName: standard
pvc-logs.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-logs
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
volumeName: airflow-logs
storageClassName: standard
namespace.yaml
kind: Namespace
apiVersion: v1
metadata:
name: airflow
labels:
name: airflow
sa-role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: airflow
name: airflow-deployment-role
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["services"]
verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]
sa-rolebinding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow-deployment-rolebinding
namespace: airflow
subjects:
- kind: ServiceAccount
name: airflow-worker
namespace: airflow
roleRef:
kind: Role
name: airflow-deployment-role
apiGroup: rbac.authorization.k8s.io
inference.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: inference-deployment
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: gemma-server
template:
metadata:
labels:
app: gemma-server
ai.gke.io/model: gemma-2-9b-it
ai.gke.io/inference-server: vllm
annotations:
gke-gcsfuse/volumes: "true"
spec:
serviceAccountName: airflow-mlops-sa
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
- key: "on-demand"
value: "true"
operator: "Equal"
effect: "NoSchedule"
containers:
- name: inference-server
image: vllm/vllm-openai:v0.6.6
ports:
- containerPort: 8000
resources:
requests:
nvidia.com/gpu: "2"
limits:
nvidia.com/gpu: "2"
command: ["/bin/sh", "-c"]
args:
- |
python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
volumeMounts:
- mountPath: /dev/shm
name: dshm
- name: gcs-fuse-csi-ephemeral
mountPath: /modeldata
readOnly: true
volumes:
- name: dshm
emptyDir:
medium: Memory
- name: gcs-fuse-csi-ephemeral
csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
bucketName: BUCKET_DATA_NAME
mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
fileCacheCapacity: "20Gi"
fileCacheForRangeRead: "true"
metadataStatCacheCapacity: "-1"
metadataTypeCacheCapacity: "-1"
metadataCacheTTLSeconds: "-1"
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-l4
inference-service.yaml
apiVersion: v1
kind: Service
metadata:
name: llm-service
namespace: airflow
spec:
selector:
app: gemma-server
type: LoadBalancer
ports:
- protocol: TCP
port: 8000
targetPort: 8000
Создайте 3 сегмента Google Cloud Storage (GCS).
gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}
# Create the namespace in GKE
kubectl apply -f namespace.yaml
# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml
Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"
7. Шаг 4 — Установите Airflow на GKE через схему управления.
Теперь развернем Airflow 2 с помощью Helm. Apache Airflow — это платформа управления рабочими процессами с открытым исходным кодом для конвейеров обработки данных. О функциональных возможностях Airflow 2 мы поговорим позже.
файл values.yaml для диаграммы Helm Airflow
config:
webserver:
expose_config: true
webserver:
service:
type: LoadBalancer
podAnnotations:
gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
- name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
value: "30"
logs:
persistence:
enabled: true
existingClaim: "airflow-logs"
dags:
persistence:
enabled: true
existingClaim: "airflow-dags"
scheduler:
podAnnotations:
gke-gcsfuse/volumes: "true"
triggerer:
podAnnotations:
gke-gcsfuse/volumes: "true"
workers:
podAnnotations:
gke-gcsfuse/volumes: "true"
Разверните Airflow 2
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml
8. Шаг 5 — Инициализация Airflow с указанием соединений и переменных.
После развертывания Airflow 2 мы можем приступить к его настройке. Мы определяем несколько переменных, которые считываются нашими скриптами на Python.
- Получите доступ к пользовательскому интерфейсу Airflow через порт 8080 с помощью вашего браузера.
Получите внешний IP-адрес
kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'
Откройте веб-браузер и перейдите по адресу http:// <ВНЕШНИЙ-IP> :8080. Логин: admin / admin
- Создайте стандартное подключение к GCP в пользовательском интерфейсе Airflow. Для этого перейдите в раздел Администрирование → Подключения → + Добавить новую запись.
- Идентификатор подключения: google_cloud_default
- Тип подключения: Google Cloud
Нажмите «Сохранить».
- Создайте необходимые переменные, для этого перейдите в Администрирование → Переменные → + Добавить новую запись
- Ключ: BUCKET_DATA_NAME - Значение: Скопировать из echo $BUCKET_DATA_NAME
- Ключ: GCP_PROJECT_ID - Значение: Скопировать из echo $DEVSHELL_PROJECT_ID
- Ключ: HF_TOKEN - Значение: Вставьте свой токен HF
- Ключ: KAGGLE_USERNAME - Значение: Введите ваше имя пользователя Kaggle
- Ключ: KAGGLE_KEY - Значение: Скопируйте это из kaggle.json
После каждой пары ключ-значение нажмите кнопку «Сохранить».
Ваш пользовательский интерфейс должен выглядеть примерно так:

9. Контейнер кода приложения №1 - Загрузка данных
В этом скрипте на Python мы выполняем аутентификацию в Kaggle, чтобы загрузить набор данных в наш бакет GCS.
Сам скрипт контейнеризирован, поскольку он становится единицей DAG № 1, и мы ожидаем, что набор данных будет часто обновляться, поэтому мы хотим автоматизировать этот процесс.
Создайте директорию и скопируйте наши скрипты сюда.
cd .. ; mkdir 1-dataset-download
cd 1-dataset-download
dataset-download.py
import os
import kagglehub
from google.cloud import storage
KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
def upload_blob(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")
print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"
upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]
requirements.txt
google-cloud-storage==2.19.0
kagglehub==0.3.4
Теперь мы создадим образ контейнера для загрузки набора данных и отправим его в реестр артефактов.
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest
10. Контейнер кода приложения №2 - Подготовка данных
В ходе подготовки данных мы достигаем следующих результатов:
- Укажите, какую часть набора данных мы хотим использовать для тонкой настройки нашей базовой модели.
- Загружает набор данных, то есть считывает CSV-файл в DataFrame Pandas, который представляет собой двумерную структуру данных, содержащую строки и столбцы.
- Преобразование/предварительная обработка данных — определение того, какие части набора данных являются нерелевантными, путем указания того, что мы хотим сохранить, что, по сути, означает удаление остального.
- Применяет функцию
transformк каждой строке DataFrame. - Сохраните подготовленные данные обратно в хранилище GCS.
Создайте директорию и скопируйте наши скрипты сюда.
cd .. ; mkdir 2-data-preparation
cd 2-data-preparation
data-preparation.py
import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset
# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100")) # Process a limited number of rows, used 100 during testing phase but can be increased
DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"
# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")
def transform(data):
"""
Transforms a row of the DataFrame into the desired format for fine-tuning.
Args:
data: A pandas Series representing a row of the DataFrame.
Returns:
A dictionary containing the formatted text.
"""
question = f"Review analysis for movie '{data['id']}'"
context = data['reviewText']
answer = data['scoreSentiment']
template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
return {'text': template.format(question=question, context=context, answer=answer)}
try:
df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
print(f"Dataset loaded successfully.")
# Drop rows with NaN values in relevant columns
df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])
# Apply transformation to the DataFrame
transformed_data = df.apply(transform, axis=1).tolist()
# Convert transformed data to a DataFrame and then to a Hugging Face Dataset
transformed_df = pd.DataFrame(transformed_data)
dataset = Dataset.from_pandas(transformed_df)
# Save the prepared dataset to JSON lines format
with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
for item in dataset:
f.write(json.dumps(item) + "\n")
print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
except Exception as e:
print(f"Error during data loading or preprocessing: {e}")
import traceback
print(traceback.format_exc())
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]
requirements.txt
datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3
# Now we create a container images for data-preparation and push it to the Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest
11. Контейнер кода приложения №3 — точная настройка
Здесь мы используем Gemma-2-9b-it в качестве базовой модели, а затем дорабатываем её с помощью нашего нового набора данных.
Это последовательность шагов, которые выполняются на этапе тонкой настройки.
1. Настройка: Импортируйте библиотеки, определите параметры (для модели, данных и обучения) и загрузите набор данных из Google Cloud Storage.
2. Загрузка модели: Загрузите предварительно обученную языковую модель с квантизацией для повышения эффективности и загрузите соответствующий токенизатор.
3. Настройка LoRA: Настройте адаптацию низкого ранга (LoRA) для эффективной тонкой настройки модели путем добавления небольших обучаемых матриц.
4. Обучение: Определите параметры обучения и используйте SFTTrainer для тонкой настройки модели на загруженном наборе данных, используя тип квантования FP16 .
5. Сохранение и загрузка: Сохраните доработанную модель и токенизатор локально, а затем загрузите их в наш бакет GCS.
Затем мы создаём образ контейнера с помощью Cloud Build и сохраняем его в реестре артефактов.
Создайте директорию и скопируйте наши скрипты сюда.
cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning
finetuning.py
import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage
# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")
# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")
# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
"json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)
################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))
################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03
# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50
################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False
# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
attn_implementation="eager",
pretrained_model_name_or_path=model_name,
torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")
# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
r=lora_r,
bias="none",
task_type="CAUSAL_LM",
target_modules=[
"q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj",
],
)
# Set training parameters
training_arguments = SFTConfig(
bf16=bf16,
dataset_kwargs={
"add_special_tokens": False,
"append_concat_token": False,
},
dataset_text_field="text",
disable_tqdm=True,
fp16=fp16,
gradient_accumulation_steps=gradient_accumulation_steps,
gradient_checkpointing=gradient_checkpointing,
gradient_checkpointing_kwargs={"use_reentrant": False},
group_by_length=group_by_length,
log_on_each_node=False,
logging_steps=logging_steps,
learning_rate=learning_rate,
lr_scheduler_type=lr_scheduler_type,
max_grad_norm=max_grad_norm,
max_seq_length=max_seq_length,
max_steps=max_steps,
num_train_epochs=num_train_epochs,
optim=optim,
output_dir=save_model_path,
packing=packing,
per_device_train_batch_size=per_device_train_batch_size,
save_strategy=save_strategy,
save_steps=save_steps,
save_total_limit=save_total_limit,
warmup_ratio=warmup_ratio,
weight_decay=weight_decay,
)
print(f"Configuring fine tuning completed")
# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
model=model,
train_dataset=dataset,
peft_config=peft_config,
dataset_text_field="text",
max_seq_length=max_seq_length,
tokenizer=tokenizer,
args=training_arguments,
packing=packing,
)
print(f"Creating trainer completed")
# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")
# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")
# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
low_cpu_mem_usage=True,
pretrained_model_name_or_path=model_name,
return_dict=True,
torch_dtype=torch.float16,
)
model = PeftModel.from_pretrained(
model=base_model,
model_id=new_model,
)
model = model.merge_and_unload()
print(f"Merging the new model with base model completed")
accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")
print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
is_main_process=accelerator.is_main_process,
save_directory=save_model_path,
save_function=accelerator.save,
)
print(f"Save unwrapped model completed")
print(f"Save new tokenizer started")
if accelerator.is_main_process:
tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")
# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
"""Uploads a directory to GCS."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
for root, _, files in os.walk(model_dir):
for file in files:
local_file_path = os.path.join(root, file)
gcs_file_path = os.path.relpath(local_file_path, model_dir)
blob = bucket.blob(os.path.join(new_model, gcs_file_path)) # Use new_model_name
blob.upload_from_filename(local_file_path)
# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")
Dockerfile
# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04
# Install necessary system packages
RUN apt-get update && \
apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
rm -rf /var/lib/apt/lists/*
# Copy requirements.txt into the container
COPY requirements.txt .
# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
# Copy your finetune script into the container
COPY finetuning.py .
# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]
requirements.txt
accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4
Теперь мы создадим образ контейнера для тонкой настройки и загрузим его в реестр артефактов.
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest
12. Обзор Airflow 2, включая информацию о том, что такое DAG.
Airflow — это платформа для организации рабочих процессов и конвейеров обработки данных. Она использует направленные ациклические графы (DAG) для определения этих рабочих процессов в коде Python, визуально представляя задачи и их зависимости.
Airflow, благодаря своим статическим DAG-графам и определениям на основе Python, хорошо подходит для планирования и управления предопределенными рабочими процессами. Его архитектура включает в себя удобный пользовательский интерфейс для мониторинга и управления этими рабочими процессами.
По сути, Airflow позволяет определять, планировать и отслеживать ваши конвейеры обработки данных с помощью Python, что делает его гибким и мощным инструментом для оркестрации рабочих процессов.
13. Обзор нашей направленной ациклической графовой структуры (DAG).

DAG расшифровывается как ориентированный ациклический граф. В Airflow DAG сам по себе представляет весь рабочий процесс или конвейер. Он определяет задачи, их зависимости и порядок выполнения.
Единицы рабочего процесса в рамках DAG выполняются из пода в кластере GKE и инициируются из конфигурации Airflow.
Краткое содержание:
Airflow: Загрузка данных — этот скрипт автоматизирует процесс получения набора данных с обзорами фильмов с Kaggle и сохранения его в вашем хранилище GCS, что делает его легкодоступным для дальнейшей обработки или анализа в вашей облачной среде.
Airflow: Подготовка данных — Код берет исходный набор данных с отзывами о фильмах, удаляет лишние столбцы данных, не необходимые для нашего случая, и удаляет наборы данных с пропущенными значениями. Затем он структурирует набор данных в формат вопросов и ответов, подходящий для машинного обучения, и сохраняет его обратно в GCS для последующего использования.
Airflow: Тонкая настройка модели — этот код выполняет тонкую настройку большой языковой модели (LLM) с использованием метода LoRA (Low-Rank Adaptation), а затем сохраняет обновленную модель. Сначала загружается предварительно обученная LLM и набор данных из Google Cloud Storage. Затем применяется метод LoRA для эффективной тонкой настройки модели на этом наборе данных. Наконец, настроенная модель сохраняется обратно в Google Cloud Storage для последующего использования в таких приложениях, как генерация текста или ответы на вопросы.
Airflow: Model Serving — развертывание дообученной модели в GKE с использованием vllm для вывода результатов.
Воздушный поток: Петля обратной связи — переобучение модели каждые xx раз (ежечасно, ежедневно, еженедельно).
На этой диаграмме объясняется принцип работы Airflow 2 при запуске в среде GKE.

14. Точная настройка модели вместо использования RAG
В этом CodeLab используется тонкая настройка LLM вместо генерации с расширенным поиском (RAG).
Давайте сравним эти два подхода:
Тонкая настройка: Создание специализированной модели: Тонкая настройка адаптирует LLM к конкретной задаче или набору данных, позволяя ей работать независимо, не полагаясь на внешние источники данных.
Упрощает процесс вывода: это устраняет необходимость в отдельной системе поиска и базе данных, что приводит к более быстрым и дешевым ответам, особенно для часто используемых сценариев.
RAG: Опирается на внешние знания: RAG извлекает необходимую информацию из базы знаний для каждого запроса, обеспечивая доступ к актуальным и конкретным данным.
Увеличение сложности: внедрение RAG в производственной среде, такой как кластер Kubernetes, часто включает в себя множество микросервисов для обработки и извлечения данных, что потенциально увеличивает задержку и вычислительные затраты.
Почему была выбрана тонкая настройка:
Хотя RAG подходил бы для небольшого набора данных, используемого в этом CodeLab, мы решили провести тонкую настройку, чтобы продемонстрировать типичный сценарий использования Airflow. Этот выбор позволяет нам сосредоточиться на аспектах оркестрации рабочих процессов, а не углубляться в нюансы настройки дополнительной инфраструктуры и микросервисов для RAG.
Заключение:
Как тонкая настройка, так и RAG — это ценные методы, каждый из которых имеет свои сильные и слабые стороны. Оптимальный выбор зависит от конкретных требований вашего проекта, таких как размер и сложность данных, потребности в производительности и соображения стоимости.
15. Задача DAG №1 — Создайте первый шаг в Airflow: Загрузка данных.
В качестве обзора этого блока DAG, наш код на Python, размещенный в образе контейнера, загружает последний набор данных RottenTomatoes с Kaggle.
Не копируйте этот код в хранилище GCS. На последнем шаге мы копируем файл mlops-dag.py , который содержит все шаги DAG Unit в одном скрипте Python.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
dataset_download
16. Задача DAG №2 — Создайте второй шаг в Airflow: Подготовка данных
В качестве обзора этого DAG-модуля мы загружаем CSV-файл (rotten_tomatoes_movie_reviews.csv) из GCS в DataFrame Pandas.
Далее мы ограничиваем количество обрабатываемых строк с помощью параметра DATASET_LIMIT для тестирования и повышения эффективности использования ресурсов, и, наконец, преобразуем данные в набор данных "Обнимающие лица".
Если присмотреться, то можно заметить, что в модели используется 1000 строк данных с параметром "DATASET_LIMIT": "1000". Это связано с тем, что на графическом процессоре Nvidia L4 на это требуется 20 минут.
Не копируйте этот код в хранилище GCS. На последнем шаге мы копируем файл mlops-dag.py , который содержит все шаги в одном скрипте Python.
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation
17. Задача DAG №3 — Создайте третий шаг в Airflow: Тонкая настройка модели.
В качестве обзора данного блока DAG, здесь мы запускаем finetune.py для уточнения модели Gemma с использованием нашего нового набора данных.
Не копируйте этот код в хранилище GCS. На последнем шаге мы копируем файл mlops-dag.py , который содержит все шаги в одном скрипте Python.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Task 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Task 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Task 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation >> fine_tuning
18. Задача DAG №4 — Создайте заключительный шаг в Airflow: Вывод результатов / Обслуживание модели
vLLM — это мощная библиотека с открытым исходным кодом, специально разработанная для высокопроизводительного вывода LLM-моделей. При развертывании на Google Kubernetes Engine (GKE) она использует масштабируемость и эффективность Kubernetes для эффективного обслуживания LLM-моделей.
Краткое описание шагов:
- Загрузите DAG "mlops-dag.py" в хранилище GCS.
- Скопируйте два YAML-файла конфигурации Kubernetes для настройки вывода результатов в корзину GCS.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
def model_serving():
config.load_incluster_config()
k8s_apps_v1 = client.AppsV1Api()
k8s_core_v1 = client.CoreV1Api()
while True:
try:
k8s_apps_v1.delete_namespaced_deployment(
namespace="airflow",
name="inference-deployment",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Deployment inference-deployment deleted")
with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_apps_v1.create_namespaced_deployment(
body=dep, namespace="airflow")
print(f"Deployment created. Status='{resp.metadata.name}'")
while True:
try:
k8s_core_v1.delete_namespaced_service(
namespace="airflow",
name="llm-service",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Service llm-service deleted")
with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_core_v1.create_namespaced_service(
body=dep, namespace="airflow")
print(f"Service created. Status='{resp.metadata.name}'")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 4: Run GKE Deployment for model serving
model_serving = PythonOperator(
task_id="model_serving",
python_callable=model_serving
)
dataset_download >> data_preparation >> fine_tuning >> model_serving
Загрузите свой скрипт на Python (файл DAG), а также манифесты Kubernetes в хранилище DAGS GCS.
gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}
В пользовательском интерфейсе Airflow вы увидите mlops-dag.
- Выберите «Возобновить».
- Выберите «Запустить DAG», чтобы выполнить цикл MLOps вручную.

После завершения построения DAG-графа вы увидите в пользовательском интерфейсе Airflow примерно такой результат.

После заключительного шага вы можете получить доступ к конечной точке модели и отправить запрос на тестирование модели.
Подождите примерно 5 минут, прежде чем выполнять команду curl, чтобы начать формирование модели и балансировщик нагрузки мог назначить внешний IP-адрес.
export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
"prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
"temperature": 0.1,
"top_p": 1.0,
"max_tokens": 128
}
EOF
Выход:
19. Поздравляем!
Вы создали свой первый рабочий процесс ИИ, используя конвейер DAG с Airflow 2 на GKE.
Не забудьте отменить выделение развернутых ресурсов.
20. Выполнение этого в производственной среде
Хотя CodeLab предоставил вам прекрасное представление о том, как настроить Airflow 2 в GKE, в реальных условиях при работе в продакшене вам следует учесть некоторые из следующих моментов.
Разработайте веб-интерфейс с использованием Gradio или аналогичных инструментов.
Здесь можно настроить автоматический мониторинг приложений для рабочих нагрузок с помощью GKE, либо экспортировать метрики из Airflow здесь .
Для более быстрой тонкой настройки модели, особенно при работе с большими наборами данных, могут потребоваться более мощные графические процессоры. Однако, если мы хотим обучать модель на нескольких графических процессорах, нам необходимо разделить набор данных и сегментировать обучение. Вот объяснение FSDP с использованием PyTorch (полностью сегментированное параллельное обучение с использованием совместного использования графических процессоров для достижения этой цели). Дополнительную информацию можно найти в статье блога Meta и в этом руководстве по FSDP с использованием PyTorch .
Google Cloud Composer — это управляемый сервис Airflow, поэтому вам не нужно поддерживать сам Airflow, просто разверните свой DAG, и всё готово.
Узнать больше
- Документация Airflow: https://airflow.apache.org/
Лицензия
Данная работа распространяется под лицензией Creative Commons Attribution 2.0 Generic.