1. Introdução
Os fluxos de trabalho são um caso de uso comum na análise de dados. Eles envolvem ingestão, transformação e análise de dados para encontrar informações relevantes. No Google Cloud Platform, a ferramenta para orquestrar fluxos de trabalho é o Cloud Composer, que é uma versão hospedada da famosa ferramenta de fluxo de trabalho de código aberto Apache Airflow. Neste laboratório, você vai usar o Cloud Composer para criar um fluxo de trabalho simples que cria um cluster do Cloud Dataproc, o analisa usando o Cloud Dataproc e o Apache Hadoop e depois o exclui.
O que é o Cloud Composer?
O Google Cloud Composer é um serviço totalmente gerenciado de orquestração de fluxos de trabalho que permite criar, agendar e monitorar canais que abrangem nuvens e data centers no local. Criado com base no conhecido projeto de código aberto Apache Airflow e operado com a linguagem de programação Python, o Cloud Composer é fácil de usar e deixa você livre da dependência tecnológica.
Ao usar o Cloud Composer em vez de uma instância local do Apache Airflow, os usuários podem aproveitar o melhor do Airflow sem sobrecarga de instalação ou de gerenciamento.
O que é o Apache Airflow?
O Apache Airflow é uma ferramenta de código aberto usada para criar, programar e monitorar fluxos de trabalho de maneira programática. Há alguns termos importantes relacionados ao Airflow que você vai encontrar ao longo do laboratório:
- DAG: um DAG (gráfico acíclico dirigido) é uma coleção de tarefas organizadas que você quer programar e executar. Os DAGs, também chamados de fluxos de trabalho, são definidos em arquivos Python padrão.
- Operador: descreve uma única tarefa em um fluxo de trabalho.
O que é o Cloud Dataproc?
O Cloud Dataproc é o serviço totalmente gerenciado do Apache Spark e do Apache Hadoop no Google Cloud Platform. O Cloud Dataproc se integra facilmente a outros serviços do GCP, oferecendo uma plataforma completa e poderosa para processamento de dados, análise e machine learning.
Atividades do laboratório
Este codelab mostra como criar e executar um fluxo de trabalho do Apache Airflow no Cloud Composer que realiza as seguintes tarefas:
- Cria um cluster do Cloud Dataproc.
- Executa um job de contagem de palavras do Apache Hadoop no cluster e envia os resultados para o Cloud Storage
- Exclui o cluster
O que você vai aprender
- Como criar e executar um fluxo de trabalho do Apache Airflow no Cloud Composer
- Como usar o Cloud Composer e o Cloud Dataproc para executar uma análise em um conjunto de dados
- Como acessar o ambiente do Cloud Composer pelo Console do Google Cloud Platform, pelo SDK Cloud e pela interface da Web do Airflow
O que é necessário
- Conta do GCP
- Conhecimento básico da CLI
- compreensão básica do Python
2. Como configurar o GCP
Criar o projeto
Selecione ou crie um projeto do Google Cloud Platform.
Anote o ID do projeto, que será usado nas etapas posteriores.
Se você estiver criando um novo projeto, o ID do projeto vai aparecer logo abaixo do Nome do Projeto na página de criação. |
|
Se você já criou um projeto, o ID está na página inicial do console, no card "Informações do projeto". |
|
Ativar as APIs
Ative as APIs Cloud Composer, Cloud Dataproc e Cloud Storage.Depois de ativadas, ignore o botão "Acessar credenciais" e siga para a próxima etapa do tutorial. |
|
Criar ambiente do Composer
Crie um ambiente do Cloud Composer com a seguinte configuração:
Todas as outras configurações podem permanecer no padrão. Clique em "Criar" na parte de baixo. |
|
Criar um bucket do Cloud Storage
No seu projeto, crie um bucket do Cloud Storage com a seguinte configuração:
Clique em "Criar" quando terminar. |
|
3. Como configurar o Apache Airflow
Como ver informações do ambiente do Composer
No Console do GCP, abra a página Ambientes.
Clique no nome do ambiente para ver os detalhes.
A página Detalhes do ambiente mostra o URL da interface da Web do Airflow, o ID do cluster do Google Kubernetes Engine, o nome do bucket do Cloud Storage e o caminho da pasta /dags.
No Airflow, um DAG (gráfico acíclico dirigido) é uma coleção de tarefas organizadas que você quer programar e executar. Os DAGs, também chamados de fluxos de trabalho, são definidos em arquivos Python padrão. O Cloud Composer agenda somente os DAGs na pasta /dags. A pasta /dags está no bucket do Cloud Storage que o Cloud Composer cria automaticamente quando você cria o ambiente.
Como definir variáveis de ambiente do Apache Airflow
As variáveis do Apache Airflow são um conceito específico da plataforma e diferente das variáveis de ambiente. Nesta etapa, você vai definir as três variáveis do Airflow: gcp_project, gcs_bucket e gce_zone.
Como usar gcloud para definir variáveis
Primeiro, abra o Cloud Shell, que já vem com o SDK Cloud instalado.
Defina a variável de ambiente COMPOSER_INSTANCE como o nome do seu ambiente do Cloud Composer.
COMPOSER_INSTANCE=my-composer-environment
Para definir variáveis do Airflow usando a ferramenta de linha de comando gcloud, use o comando gcloud composer environments run com o subcomando variables. Esse comando gcloud composer executa o subcomando variables da CLI do Airflow. O subcomando transmite os argumentos para a ferramenta de linha de comando gcloud.
Execute esse comando três vezes, substituindo as variáveis pelas relevantes para seu projeto.
Defina o gcp_project usando o comando a seguir, substituindo <your-project-id> pelo ID do projeto que você anotou na etapa 2.
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --set gcp_project <your-project-id>
A resposta será semelhante a esta:
kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
Defina o gcs_bucket usando o comando a seguir, substituindo <your-bucket-name> pelo ID do bucket que você anotou na etapa 2. Se você seguiu nossa recomendação, o nome do bucket é o mesmo do ID do projeto. A saída será semelhante ao comando anterior.
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>
Defina o gce_zone usando o seguinte comando. A saída será semelhante aos comandos anteriores.
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --set gce_zone us-central1-a
(Opcional) Usar gcloud para ver uma variável
Para ver o valor de uma variável, execute o subcomando variables da CLI do Airflow com o argumento get ou use a IU do Airflow.
Exemplo:
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --get gcs_bucket
É possível fazer isso com qualquer uma das três variáveis que você acabou de definir: gcp_project, gcs_bucket e gce_zone.
4. Exemplo de fluxo de trabalho
Vamos analisar o código da DAG que vamos usar na etapa 5. Não se preocupe em baixar arquivos ainda, apenas siga as instruções aqui.
Há muita coisa para analisar aqui, então vamos detalhar um pouco.
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
Começamos com algumas importações do Airflow:
airflow.models: permite acessar e criar dados no banco de dados do Airflow.airflow.contrib.operators: onde os operadores da comunidade estão. Nesse caso, precisamos dodataproc_operatorpara acessar a API Dataproc.airflow.utils.trigger_rule: para adicionar regras de gatilho aos nossos operadores. As regras de acionamento permitem um controle refinado sobre se um operador deve ser executado em relação ao status dos pais.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
Isso especifica o local do nosso arquivo de saída. A linha importante aqui é models.Variable.get('gcs_bucket'), que vai extrair o valor da variável gcs_bucket do banco de dados do Airflow.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
WORDCOUNT_JAR: local do arquivo .jar que vamos executar no cluster do Cloud Dataproc. Ele já está hospedado no GCP para você.input_file: local do arquivo que contém os dados que o job do Hadoop vai processar. Vamos fazer upload dos dados para esse local juntos na etapa 5.wordcount_args: argumentos que serão transmitidos para o arquivo JAR.
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
Isso vai nos dar um objeto datetime equivalente que representa a meia-noite do dia anterior. Por exemplo, se isso for executado às 11h do dia 4 de março, o objeto datetime vai representar 0h do dia 3 de março. Isso tem a ver com a forma como o Airflow processa o agendamento. Confira mais informações neste link.
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
A variável default_dag_args na forma de um dicionário precisa ser fornecida sempre que um novo DAG é criado:
'email_on_failure': indica se alertas por e-mail devem ser enviados quando uma tarefa falha.'email_on_retry': indica se os alertas por e-mail devem ser enviados quando uma tarefa é repetida.'retries': indica quantas novas tentativas o Airflow deve fazer em caso de falha do DAG.'retry_delay': indica quanto tempo o Airflow deve esperar antes de tentar uma nova tentativa.'project_id': informa ao DAG com qual ID de projeto do GCP associá-lo, o que será necessário mais tarde com o operador do Dataproc.
with models.DAG(
'composer_hadoop_tutorial',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
Usar with models.DAG informa ao script para incluir tudo abaixo dele no mesmo DAG. Também vemos três argumentos transmitidos:
- A primeira, uma string, é o nome do DAG que estamos criando. Neste caso, estamos usando
composer_hadoop_tutorial. schedule_interval: um objetodatetime.timedelta, que aqui definimos como um dia. Isso significa que o DAG vai tentar ser executado uma vez por dia após o'start_date'definido anteriormente em'default_dag_args'.default_args: o dicionário que criamos anteriormente com os argumentos padrão do DAG.
crie um cluster do Dataproc
Em seguida, vamos criar um dataproc_operator.DataprocClusterCreateOperator que cria um cluster do Cloud Dataproc.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
Nesse operador, vemos alguns argumentos, todos, exceto o primeiro, específicos dele:
task_id: assim como no BashOperator, esse é o nome que atribuímos ao operador, que pode ser visto na interface do Airflow.cluster_name: o nome que atribuímos ao cluster do Cloud Dataproc. Aqui, chamamos decomposer-hadoop-tutorial-cluster-{{ ds_nodash }}. Consulte a caixa de informações para ver mais detalhes opcionais.num_workers: o número de workers que alocamos para o cluster do Cloud Dataproczone: a região geográfica em que queremos que o cluster fique, conforme salvo no banco de dados do Airflow. Isso vai ler a variável'gce_zone'definida na etapa 3.master_machine_type: o tipo de máquina que queremos alocar para o mestre do Cloud Dataprocworker_machine_type: o tipo de máquina que queremos alocar para o worker do Cloud Dataproc
Enviar um job do Apache Hadoop
O dataproc_operator.DataProcHadoopOperator permite enviar um job para um cluster do Cloud Dataproc.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
Fornecemos vários parâmetros:
task_id: nome atribuído a esta parte do DAG.main_jar: local do arquivo .jar que queremos executar no clustercluster_name: nome do cluster em que o job será executado. Ele é idêntico ao que encontramos no operador anterior.arguments: argumentos transmitidos ao arquivo JAR, como se você estivesse executando o arquivo .jar na linha de comando.
Exclua o cluster
O último operador que vamos criar é o dataproc_operator.DataprocClusterDeleteOperator.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
Como o nome sugere, esse operador exclui um determinado cluster do Cloud Dataproc. Aqui, vemos três argumentos:
task_id: assim como no BashOperator, esse é o nome que atribuímos ao operador, que pode ser visto na interface do Airflow.cluster_name: o nome que atribuímos ao cluster do Cloud Dataproc. Aqui, chamamos decomposer-hadoop-tutorial-cluster-{{ ds_nodash }}. Consulte a caixa de informações depois de "Criar um cluster do Dataproc" para mais informações opcionais.trigger_rule: mencionamos brevemente as regras de acionamento durante as importações no início desta etapa, mas aqui temos uma em ação. Por padrão, um operador do Airflow não é executado a menos que todos os operadores upstream tenham sido concluídos com sucesso. A regra de gatilhoALL_DONEexige apenas que todos os operadores upstream tenham sido concluídos, independentemente de terem sido bem-sucedidos ou não. Aqui, isso significa que, mesmo que o job do Hadoop tenha falhado, ainda queremos encerrar o cluster.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
Por fim, queremos que esses operadores sejam executados em uma ordem específica, e podemos indicar isso usando operadores de deslocamento de bits do Python. Nesse caso, create_dataproc_cluster sempre será executado primeiro, seguido por run_dataproc_hadoop e, por fim, delete_dataproc_cluster.
Juntando tudo, o código fica assim:
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
See https://cloud.google.com/storage/docs/creating-buckets for creating a
bucket.
"""
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# Output file for Cloud Dataproc job.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
# [START composer_hadoop_schedule]
with models.DAG(
'composer_hadoop_tutorial',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_hadoop_schedule]
# Create a Cloud Dataproc cluster.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# Give the cluster a unique name by appending the date scheduled.
# See https://airflow.apache.org/code.html#default-variables
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
# master node.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
# Delete Cloud Dataproc cluster.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
# even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]
# [END composer_hadoop]
5. Fazer upload de arquivos do Airflow para o Cloud Storage
Copiar o DAG para a pasta /dags
- Primeiro, abra o Cloud Shell, que já vem com o SDK Cloud instalado.
- Clone o repositório de amostras do Python e mude para o diretório composer/workflows.
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
- Execute o comando a seguir para definir o nome da pasta de DAGs como uma variável de ambiente:
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
- Execute o comando
gsutila seguir para copiar o código do tutorial para o local em que a pasta /dags foi criada.
gsutil cp hadoop_tutorial.py $DAGS_FOLDER
A resposta será semelhante a esta:
Copying file://hadoop_tutorial.py [Content-Type=text/x-python]... / [1 files][ 4.1 KiB/ 4.1 KiB] Operation completed over 1 objects/4.1 KiB.
6. Como usar a IU do Airflow
Para acessar a interface da Web do Airflow usando o console do GCP:
|
|
Para saber mais sobre a IU do Airflow, consulte Como acessar a interface da Web.
Ver variáveis
As variáveis já definidas são mantidas no seu ambiente. Para conferir as variáveis, selecione Admin > Variables na barra de menus da interface do Airflow.

Como o DAG é executado
Quando você faz o upload do arquivo DAG para a pasta dags no Cloud Storage, o Cloud Composer analisa o arquivo. Se nenhum erro for encontrado, o nome do fluxo de trabalho será exibido na lista de DAGs e entrará na fila para ser executado imediatamente. Para conferir seus DAGs, clique em DAGs na parte de cima da página.

Clique em composer_hadoop_tutorial para abrir a página de detalhes do DAG. Nela, você encontra uma representação gráfica das tarefas e das dependências do fluxo de trabalho.

Na barra de ferramentas, clique em Visualização de gráfico e passe o cursor sobre o gráfico de cada tarefa para conferir o status. A borda das tarefas também indica o status: verde = em execução, vermelha = falha etc.

Para executar o fluxo de trabalho novamente a partir da Visualização de gráfico:
- Na visualização de gráfico da IU do Airflow, clique no gráfico
create_dataproc_cluster. - Clique em Limpar para redefinir as três tarefas e em OK para confirmar.

Também é possível verificar o status e os resultados do fluxo de trabalho composer-hadoop-tutorial acessando as seguintes páginas do Console do GCP:
- Os clusters do Cloud Dataproc para monitorar a criação e exclusão de clusters. O cluster criado pelo fluxo de trabalho é efêmero: ele só existe durante o fluxo de trabalho e é excluído como parte da última tarefa.
- Os Jobs do Cloud Dataproc para visualizar ou monitorar o job de contagem de palavras do Apache Hadoop. Clique no ID do job para ver a saída do registro dele.
- O Navegador do Cloud Storage para ver os resultados da contagem de palavras na pasta
wordcountdo bucket do Cloud Storage criado para este codelab.
7. Limpeza
Para evitar cobranças na sua conta do GCP pelos recursos usados neste codelab:
- (Opcional) Para salvar seus dados, faça o download deles do bucket do Cloud Storage para o ambiente do Cloud Composer e do bucket de armazenamento criado para este codelab.
- Exclua o bucket do Cloud Storage criado para este codelab.
- Exclua o bucket do Cloud Storage do ambiente.
- Exclua o ambiente do Cloud Composer. A exclusão do ambiente não remove o bucket de armazenamento dele.
Também é possível excluir o projeto:
- No Console do GCP, acesse a página Projetos.
- Na lista de projetos, selecione um e clique em Excluir.
- Na caixa, digite o ID do projeto e clique em desligar para excluir o projeto.







