1. Introdução
O Apache Airflow foi projetado para executar DAGs regularmente, mas também é possível acioná-los em resposta a eventos, como uma mudança em um bucket do Cloud Storage ou uma mensagem enviada ao Cloud Pub/Sub. Para isso, os DAGs do Cloud Composer podem ser acionados pelo Cloud Functions.
No exemplo deste laboratório, um DAG simples é executado sempre que ocorre uma mudança em um bucket do Cloud Storage. Esse DAG usa o BashOperator para executar um comando bash que imprime as informações de mudança sobre o que foi enviado por upload para o bucket do Cloud Storage.
Antes de começar este laboratório, recomendamos que você conclua os codelabs Introdução ao Cloud Composer e Primeiros passos com o Cloud Functions. Se você criou um ambiente do Composer no codelab "Introdução ao Cloud Composer", use esse ambiente neste laboratório.
O que você vai criar
Neste codelab, você vai:
- Faça upload de um arquivo para o Google Cloud Storage, que vai
- Acionar uma função do Google Cloud usando o ambiente de execução do Node.JS
- Essa função executa um DAG no Google Cloud Composer.
- Isso executa um comando Bash simples que imprime a mudança no bucket do Cloud Storage.

O que você vai aprender
- Como acionar um DAG do Apache Airflow usando o Google Cloud Functions + Node.js
O que é necessário
- Conta do GCP
- Noções básicas de JavaScript
- Conhecimento básico do Cloud Composer/Airflow e do Cloud Functions
- Conforto ao usar comandos da CLI
2. Como configurar o GCP
Selecionar ou criar o projeto
Selecione ou crie um projeto do Google Cloud Platform. Se você estiver criando um novo projeto, siga as etapas aqui.
Anote o ID do projeto, que será usado nas etapas posteriores.
Se você estiver criando um novo projeto, o ID do projeto vai aparecer logo abaixo do Nome do Projeto na página de criação. |
|
Se você já criou um projeto, o ID está na página inicial do console, no card "Informações do projeto". |
|
Ative as APIs
Ative as APIs Cloud Composer, Google Cloud Functions e Cloud Identity and Access Management (IAM). |
|
Criar ambiente do Composer
Crie um ambiente do Cloud Composer com a seguinte configuração:
Todas as outras configurações podem permanecer no padrão. Clique em "Criar" na parte de baixo.Anote o nome e o local do ambiente do Cloud Composer, porque eles serão necessários nas próximas etapas. |
|
Criar um bucket do Cloud Storage
No seu projeto, crie um bucket do Cloud Storage com a seguinte configuração:
Clique em "Criar" quando estiver tudo pronto. Anote o nome do bucket do Cloud Storage para as próximas etapas. |
|
3. Como configurar o Google Cloud Functions (GCF)
Para configurar o GCF, vamos executar comandos no Google Cloud Shell.
Embora o Google Cloud possa ser operado remotamente em seu laptop usando a ferramenta de linha de comando gcloud, neste codelab vamos usar o Google Cloud Shell, um ambiente de linha de comando executado no Cloud.
O Cloud Shell é uma máquina virtual com base em Debian que contém todas as ferramentas de desenvolvimento necessárias. Ela oferece um diretório principal permanente de 5 GB, além de ser executada no Google Cloud, o que aprimora o desempenho e a autenticação da rede. Isso significa que tudo que você precisa para este codelab é um navegador (sim, funciona em um Chromebook).
Para ativar o Google Cloud Shell, clique no botão no canto superior direito do console do desenvolvedor. Leva apenas alguns instantes para provisionar e se conectar ao ambiente: |
|
Conceder permissões de assinatura de blob à conta de serviço do Cloud Functions
Para que o GCF se autentique no Cloud IAP, o proxy que protege o servidor da Web do Airflow, é necessário conceder à conta de serviço do Appspot GCF a função Service Account Token Creator. Para isso, execute o seguinte comando no Cloud Shell, substituindo o nome do projeto por <your-project-id>.
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
Por exemplo, se o projeto se chamar my-project, o comando será
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
Como conseguir o ID de cliente
Para criar um token e se autenticar no Cloud IAP, a função requer o ID do cliente do proxy que protege o servidor da Web do Airflow. A API Cloud Composer não fornece essas informações diretamente. Em vez disso, faça uma solicitação não autenticada no servidor da Web do Airflow e capture o ID do cliente do URL de redirecionamento. Para isso, vamos executar um arquivo Python usando o Cloud Shell para capturar o ID do cliente.
Faça o download do código necessário do GitHub executando o seguinte comando no Cloud Shell:
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
Se você recebeu um erro porque esse diretório já existe, atualize-o para a versão mais recente executando o seguinte comando:
cd python-docs-samples/ git pull origin master
Mude para o diretório apropriado executando
cd python-docs-samples/composer/rest
Execute o código Python para receber o ID do cliente, substituindo o nome do projeto por <your-project-id>, o local do ambiente do Composer criado anteriormente por <your-composer-location> e o nome do ambiente do Composer criado anteriormente por <your-composer-environment>.
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
Por exemplo, se o nome do projeto for my-project, o local do Composer for us-central1 e o nome do ambiente for my-composer, o comando será
python3 get_client_id.py my-project us-central1 my-composer
A get_client_id.py realiza as seguintes ações:
- Faz a autenticação com o Google Cloud
- Faz uma solicitação HTTP não autenticada ao servidor da Web do Airflow para receber o URI de redirecionamento.
- Extrai o parâmetro de consulta
client_iddesse redirecionamento. - Imprime para você usar
Seu ID do cliente será impresso na linha de comando e será semelhante a este:
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
4. Criar sua função
No Cloud Shell, clone o repositório com o exemplo de código necessário executando
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
Mude para o diretório necessário e deixe o Cloud Shell aberto enquanto você conclui as próximas etapas.
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
Acesse a página do Google Cloud Functions clicando no menu de navegação e em "Cloud Functions". |
|
Clique em "CRIAR FUNÇÃO" na parte de cima da página. |
|
Nomeie a função como "my-function" e deixe a memória no padrão, 256 MB. |
|
Defina o gatilho como "Cloud Storage", deixe o tipo de evento como "Finalizar/Criar" e navegue até o bucket criado na etapa "Criar um bucket do Cloud Storage". |
|
Deixe o código-fonte definido como "Editor in-line" e o ambiente de execução como "Node.js 8". |
|
No Cloud Shell, execute o comando a seguir. Isso vai abrir index.js e package.json no Editor do Cloud Shell.
cloudshell edit index.js package.json
Clique na guia "package.json", copie o código e cole-o na seção "package.json" do editor in-line do Cloud Functions. |
|
Defina a "Função a ser executada" como triggerDag |
|
Clique na guia index.js, copie o código e cole-o na seção index.js do editor in-line do Cloud Functions. |
|
Mude |
|
No Cloud Shell, execute o comando a seguir, substituindo <your-environment-name> pelo nome do seu ambiente do Composer e <your-composer-region> pela região em que ele está localizado.
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
Por exemplo, se o ambiente for chamado de my-composer-environment e estiver localizado em us-central1, o comando será
gcloud composer environments describe my-composer-environment --location us-central1
A resposta será semelhante a esta:
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
Na saída, procure a variável chamada |
|
Clique no link do menu suspenso "Mais" e escolha a região geograficamente mais próxima de você. |
|
Marque "Tentar novamente em caso de falha" |
|
Clique em "Criar" para criar a função do Cloud. |
|
Percorrer o código
O código copiado de index.js vai ficar assim:
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
Vamos analisar o que está acontecendo. Há três funções aqui: triggerDag, authorizeIap e makeIapPostRequest
triggerDag é a função acionada quando fazemos upload de algo para o bucket designado do Cloud Storage. É onde configuramos variáveis importantes usadas nas outras solicitações, como PROJECT_ID, CLIENT_ID, WEBSERVER_ID e DAG_NAME. Ele chama authorizeIap e makeIapPostRequest.
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
O authorizeIap faz uma solicitação ao proxy que protege o servidor da Web do Airflow, usando uma conta de serviço e "trocando" um JWT por um token de ID que será usado para autenticar o makeIapPostRequest.
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
makeIapPostRequest faz uma chamada ao servidor da Web do Airflow para acionar o composer_sample_trigger_response_dag.. O nome do DAG está incorporado ao URL do servidor da Web do Airflow transmitido com o parâmetro url, e o idToken é o token que recebemos na solicitação authorizeIap.
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
5. Configurar seu DAG
No Cloud Shell, mude para o diretório com os fluxos de trabalho de exemplo. Ele faz parte do python-docs-samples que você baixou do GitHub na etapa "Como receber o ID do cliente".
cd cd python-docs-samples/composer/workflows
Fazer upload do DAG para o Composer
Faça upload do DAG de exemplo para o bucket de armazenamento de DAGs do ambiente do Composer com o seguinte comando, em que <environment_name> é o nome do ambiente do Composer e <location> é o nome da região em que ele está localizado. trigger_response_dag.py é o DAG com que vamos trabalhar.
gcloud composer environments storage dags import \
--environment <environment_name> \
--location <location> \
--source trigger_response_dag.py
Por exemplo, se o ambiente do Composer se chamasse my-composer e estivesse localizado em us-central1, o comando seria
gcloud composer environments storage dags import \
--environment my-composer \
--location us-central1 \
--source trigger_response_dag.py
Percorrer o DAG
O código DAG em trigger_response.py é assim:
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
A seção default_args contém os argumentos padrão, conforme exigido pelo modelo BaseOperator no Apache Airflow. Você veria essa seção com esses parâmetros em qualquer DAG do Apache Airflow. O owner está definido como Composer Example, mas você pode mudar para seu nome, se quiser. depends_on_past mostra que este DAG não depende de nenhum DAG anterior. As três seções de e-mail, email, email_on_failure e email_on_retry, são definidas para que nenhuma notificação por e-mail seja recebida com base no status desse DAG. O DAG só vai tentar de novo uma vez, já que retries está definido como 1, e fará isso após cinco minutos, de acordo com retry_delay. O start_date normalmente determina quando um DAG deve ser executado, em conjunto com o schedule_interval (definido mais tarde), mas, no caso deste DAG, não é relevante. Ela está definida como 1º de janeiro de 2017, mas pode ser qualquer data passada.
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
A seção with airflow.DAG configura o DAG que será executado. Ele será executado com o ID da tarefa composer_sample_trigger_response_dag, os argumentos padrão da seção default_args e, mais importante, com um schedule_interval de None. O schedule_interval está definido como None porque estamos acionando esse DAG específico com nossa função do Cloud. Por isso, o start_date em default_args não é relevante.
Quando ele é executado, o DAG imprime a configuração, conforme ditado na variável print_gcs_info.
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
6. Testar a função
Abra seu ambiente do Composer e, na linha com o nome do ambiente, clique no link do Airflow. |
|
Clique no nome para abrir o |
|
Abra outra guia e faça upload de qualquer arquivo para o bucket do Cloud Storage que você criou e especificou como o gatilho da sua função do Cloud. Faça isso pelo Console ou usando um comando gsutil. |
|
Volte para a guia com a interface do Airflow e clique em "Visualização de gráfico". |
|
Clique na tarefa |
|
Clique em "Ver registro" no canto superior direito do menu. |
|
Nos registros, você vai encontrar informações sobre o arquivo que fez upload para o bucket do Cloud Storage. |
|
Parabéns! Você acabou de acionar um DAG do Airflow usando o Node.js e o Google Cloud Functions.
7. Limpeza
Para evitar cobranças na sua conta do GCP pelos recursos usados neste guia de início rápido:
- (Opcional) Para salvar seus dados, faça o download do bucket do Cloud Storage para o ambiente do Cloud Composer e do bucket de armazenamento criado para este guia de início rápido.
- Exclua o bucket do Cloud Storage do ambiente que você criou.
- Exclua o ambiente do Cloud Composer. A exclusão do ambiente não remove o bucket de armazenamento dele.
- (Opcional) Com a computação sem servidor, as primeiras 2 milhões de invocações por mês são sem custo financeiro. Além disso, quando você reduz a função a zero, não há cobrança. Consulte os preços para mais detalhes. No entanto, se você quiser excluir a função do Cloud, clique em "EXCLUIR" no canto superior direito da página de visão geral da função.

Também é possível excluir o projeto:
- No Console do GCP, acesse a página Projetos.
- Na lista de projetos, selecione um e clique em Excluir.
- Na caixa, digite o ID do projeto e clique em desligar para excluir o projeto.


























