1. Введение
Apache Airflow предназначен для запуска DAG-графов по регулярному расписанию, но вы также можете запускать DAG-графы в ответ на события, такие как изменение в хранилище Cloud Storage или отправка сообщения в Cloud Pub/Sub. Для этого DAG-графы Cloud Composer можно запускать с помощью Cloud Functions .
В этом примере в лабораторной работе каждый раз при изменении содержимого корзины Cloud Storage запускается простой DAG. Этот DAG использует BashOperator для выполнения команды bash, выводящей информацию об изменениях, внесенных в корзину Cloud Storage.
Перед началом работы рекомендуется пройти лабораторные работы «Введение в Cloud Composer» и « Начало работы с Cloud Functions» . Если вы создали среду Composer в лабораторной работе «Введение в Cloud Composer», вы сможете использовать эту среду и в данной работе.
Что вы построите
В этом практическом занятии вы:
- Загрузите файл в облачное хранилище Google , что позволит
- Запустите функцию Google Cloud Function с помощью среды выполнения Node.js.
- Эта функция выполнит DAG в Google Cloud Composer.
- Это запускает простую команду bash, выводящую изменение в хранилище Google Cloud Storage.

Что вы узнаете
- Как запустить DAG Apache Airflow с помощью Google Cloud Functions + Node.js
Что вам понадобится
- Учетная запись GCP
- Базовое понимание JavaScript
- Базовые знания Cloud Composer/Airflow и Cloud Functions.
- Удобство использования команд командной строки.
2. Настройка GCP
Выберите или создайте проект.
Выберите или создайте проект Google Cloud Platform. Если вы создаете новый проект, следуйте инструкциям, приведенным здесь .
Запишите идентификатор вашего проекта, который вы будете использовать на последующих этапах.
При создании нового проекта идентификатор проекта находится непосредственно под названием проекта на странице создания. |
|
Если вы уже создали проект, вы можете найти его идентификатор на главной странице консоли в карточке «Информация о проекте». |
|
Включите API
|
Создание среды Composer
Создайте среду Cloud Composer со следующей конфигурацией:
Все остальные параметры конфигурации можно оставить по умолчанию. Нажмите кнопку «Создать» внизу. Запишите название и местоположение вашей среды Composer — они понадобятся вам на следующих шагах. |
|
Создать сегмент облачного хранилища
В вашем проекте создайте хранилище Cloud Storage со следующей конфигурацией:
Нажмите «Создать», когда будете готовы. Обязательно запишите название вашего хранилища Cloud Storage для последующих шагов. |
|
3. Настройка Google Cloud Functions (GCF)
Для настройки GCF мы будем выполнять команды в Google Cloud Shell.
Хотя Google Cloud можно управлять удаленно с ноутбука с помощью инструмента командной строки gcloud , в этом практическом занятии мы будем использовать Google Cloud Shell — среду командной строки, работающую в облаке.
Эта виртуальная машина на базе Debian содержит все необходимые инструменты разработки. Она предоставляет постоянный домашний каталог размером 5 ГБ и работает в облаке Google, что значительно повышает производительность сети и аутентификацию. Это означает, что для выполнения этого практического задания вам понадобится только браузер (да, он работает и на Chromebook).
Чтобы активировать Google Cloud Shell, в консоли разработчика нажмите кнопку в правом верхнем углу (подготовка и подключение к среде займут всего несколько минут): |
|
Предоставьте учетной записи службы Cloud Functions права на подписание BLOB-объектов.
Для аутентификации GCF в Cloud IAP , прокси-сервере, защищающем веб-сервер Airflow, необходимо предоставить учетной записи службы Appspot GCF роль « Service Account Token Creator . Для этого выполните следующую команду в Cloud Shell, заменив <your-project-id> проекта> именем вашего проекта.
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
Например, если ваш проект называется my-project , ваша команда будет выглядеть так:
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
Получение идентификатора клиента
Для создания токена аутентификации в Cloud IAP функция требует идентификатор клиента прокси-сервера, защищающего веб-сервер Airflow. API Cloud Composer не предоставляет эту информацию напрямую. Вместо этого выполните неаутентифицированный запрос к веб-серверу Airflow и получите идентификатор клиента из URL-адреса перенаправления. Мы сделаем это, запустив файл Python с помощью Cloud Shell для получения идентификатора клиента.
Загрузите необходимый код с GitHub, выполнив следующую команду в Cloud Shell.
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
Если вы получили ошибку, связанную с тем, что этот каталог уже существует, обновите его до последней версии, выполнив следующую команду.
cd python-docs-samples/ git pull origin master
Перейдите в соответствующий каталог, выполнив команду...
cd python-docs-samples/composer/rest
Запустите код на Python, чтобы получить свой идентификатор клиента, заменив <your-project-id> на название вашего проекта, <your-composer-location> — на местоположение созданной вами ранее среды Composer, а <your-composer-environment> — на название созданной вами ранее среды Composer.
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
Например, если название вашего проекта — my-project , расположение Composer — us-central1 , а имя вашей среды — my-composer , то ваша команда будет выглядеть так:
python3 get_client_id.py my-project us-central1 my-composer
get_client_id.py выполняет следующие действия:
- Аутентификация осуществляется через Google Cloud.
- Выполняет неаутентифицированный HTTP-запрос к веб-серверу Airflow для получения URI перенаправления.
- Извлекает параметр запроса
client_idиз этого перенаправления. - Распечатает для вас.
Ваш идентификатор клиента будет выведен в командную строку и будет выглядеть примерно так:
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
4. Создайте свою функцию
В Cloud Shell клонируйте репозиторий с необходимым примером кода, выполнив команду:
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
Перейдите в нужный каталог и оставьте Cloud Shell открытым, пока будете выполнять следующие шаги.
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
Чтобы перейти на страницу Google Cloud Functions, нажмите на меню навигации, а затем выберите «Cloud Functions». |
|
Нажмите кнопку "СОЗДАТЬ ФУНКЦИЮ" в верхней части страницы. |
|
Назовите свою функцию "my-function" и оставьте объем памяти по умолчанию — 256 МБ. |
|
Установите триггер на «Облачное хранилище», оставьте тип события «Завершение/Создание» и перейдите к корзине, созданной на шаге «Создание корзины облачного хранилища». |
|
Оставьте параметр «Исходный код» в значении «Встроенный редактор», а в качестве среды выполнения выберите «Node.js 8». |
|
В Cloud Shell выполните следующую команду. Это откроет файлы index.js и package.json в редакторе Cloud Shell.
cloudshell edit index.js package.json
Перейдите на вкладку package.json, скопируйте этот код и вставьте его в раздел package.json встроенного редактора Cloud Functions. |
|
Установите параметр "Функция для выполнения" на triggerDag. |
|
Перейдите на вкладку index.js, скопируйте код и вставьте его в раздел index.js встроенного редактора Cloud Functions. |
|
Измените |
|
В Cloud Shell выполните следующую команду, заменив <your-environment-name> на имя вашей среды Composer и <your-composer-region> на регион, в котором находится ваша среда Composer.
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
Например, если ваша среда называется my-composer-environment и находится в каталоге us-central1 ваша команда будет выглядеть так:
gcloud composer environments describe my-composer-environment --location us-central1
Результат должен выглядеть примерно так:
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
В полученном выводе найдите переменную с именем |
|
Нажмите на выпадающее меню «Подробнее», затем выберите регион, географически ближайший к вам. |
|
Установите флажок «Повторить попытку при неудаче». |
|
Нажмите «Создать», чтобы создать свою облачную функцию. |
|
Пошаговое выполнение кода
Скопированный вами код из index.js будет выглядеть примерно так:
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
Давайте посмотрим, что происходит. Здесь три функции: triggerDag , authorizeIap и makeIapPostRequest
triggerDag — это функция, которая запускается при загрузке чего-либо в указанный сегмент Cloud Storage. Здесь мы настраиваем важные переменные, используемые в других запросах, такие как PROJECT_ID , CLIENT_ID , WEBSERVER_ID и DAG_NAME . Она вызывает authorizeIap и makeIapPostRequest .
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
authorizeIap отправляет запрос к прокси-серверу, защищающему веб-сервер Airflow, используя учетную запись службы и «обменивая» JWT на токен идентификации, который будет использоваться для аутентификации запроса makeIapPostRequest .
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
Функция makeIapPostRequest вызывает веб-сервер Airflow для запуска функции composer_sample_trigger_response_dag. Имя DAG встроено в URL-адрес веб-сервера Airflow, переданный в качестве параметра url , а idToken — это токен, полученный в запросе authorizeIap .
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
5. Настройте свой DAG.
В Cloud Shell перейдите в каталог с примерами рабочих процессов. Он является частью python-docs-samples, которые вы скачали с GitHub на шаге «Получение идентификатора клиента».
cd cd python-docs-samples/composer/workflows
Загрузите DAG в Composer.
Загрузите пример DAG в хранилище DAG вашей среды Composer с помощью следующей команды, где <environment_name> — это имя вашей среды Composer, а <location> — имя региона, в котором она находится. trigger_response_dag.py — это DAG, с которым мы будем работать.
gcloud composer environments storage dags import \
--environment <environment_name> \
--location <location> \
--source trigger_response_dag.py
Например, если ваша среда Composer называется my-composer и находится в каталоге us-central1 , ваша команда будет выглядеть так:
gcloud composer environments storage dags import \
--environment my-composer \
--location us-central1 \
--source trigger_response_dag.py
Пошаговое выполнение DAG
Код DAG в файле trigger_response.py выглядит следующим образом.
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
Раздел default_args содержит аргументы по умолчанию, необходимые для модели BaseOperator в Apache Airflow. Вы увидите этот раздел с этими параметрами в любом DAG Apache Airflow. В настоящее время owner указан Composer Example , но вы можете изменить это на свое имя, если хотите. depends_on_past показывает, что этот DAG не зависит от каких-либо предыдущих DAG. Три раздела email, email , email_on_failure и email_on_retry настроены таким образом, чтобы уведомления по электронной почте не приходили в зависимости от статуса этого DAG. DAG повторит попытку только один раз, поскольку retries установлено на 1, и сделает это через пять минут в соответствии с retry_delay . start_date обычно определяет, когда должен запускаться DAG, в сочетании с schedule_interval (устанавливается позже), но в случае этого DAG это не имеет значения. Оно установлено на 1 января 2017 года, но может быть установлено на любую прошедшую дату.
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
Раздел ` with airflow.DAG настраивает DAG, который будет запущен. Он будет запущен с идентификатором задачи composer_sample_trigger_response_dag , аргументами по умолчанию из раздела default_args и, что наиболее важно, с schedule_interval равным None . Значение ` schedule_interval установлено на None потому что мы запускаем этот конкретный DAG с помощью нашей облачной функции. Именно поэтому параметр start_date в default_args не имеет значения.
При выполнении DAG выводит свою конфигурацию, заданную переменной print_gcs_info .
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
6. Проверьте свою функцию.
Откройте свою среду Composer и в строке с названием вашей среды щелкните ссылку Airflow. |
|
Откройте файл |
|
Откройте отдельную вкладку и загрузите любой файл в созданный ранее сегмент облачного хранилища, указанный в качестве триггера для вашей облачной функции. Это можно сделать через консоль или с помощью команды gsutil . |
|
Вернитесь на вкладку с пользовательским интерфейсом Airflow и нажмите «Просмотр графика». |
|
Щелкните по задаче |
|
Нажмите кнопку «Просмотреть журнал» в правом верхнем углу меню. |
|
В логах вы увидите информацию о файле, который вы загрузили в свой сегмент облачного хранилища. |
|
Поздравляем! Вы только что запустили DAG Airflow с помощью Node.js и Google Cloud Functions!
7. Уборка
Чтобы избежать списания средств с вашего счета GCP за ресурсы, использованные в этом кратком руководстве:
- (Необязательно) Чтобы сохранить данные, загрузите их из хранилища Cloud Storage для среды Cloud Composer и из хранилища, созданного вами для этого краткого руководства.
- Удалите созданный вами сегмент облачного хранилища для данной среды.
- Удалите среду Cloud Composer . Обратите внимание, что удаление среды не приводит к удалению хранилища для этой среды.
- (Необязательно) При использовании бессерверных вычислений первые 2 миллиона вызовов в месяц бесплатны, а при масштабировании функции до нуля плата не взимается (подробнее см. раздел «Цены» ). Однако, если вы хотите удалить свою облачную функцию, сделайте это, нажав кнопку «УДАЛИТЬ» в правом верхнем углу страницы обзора вашей функции.

При желании вы также можете удалить проект:
- В консоли GCP перейдите на страницу «Проекты» .
- В списке проектов выберите проект, который хотите удалить, и нажмите «Удалить».
- В поле введите идентификатор проекта, а затем нажмите «Завершить» , чтобы удалить проект.


























