1. Visão geral
O que é o Dataflow?
O Dataflow é um serviço gerenciado que executa uma ampla variedade de padrões de processamento de dados. A documentação neste site mostra como implantar os pipelines de processamento de dados em lote e streaming usando o Dataflow, incluindo instruções de uso dos recursos do serviço.
O SDK do Apache Beam é um modelo de programação de código aberto que permite desenvolver pipelines em lote e de streaming. Os pipelines são criados com um programa do Apache Beam e executados no serviço Dataflow. A documentação do Apache Beam (em inglês) fornece informações conceituais aprofundadas e material de referência para o modelo de programação, os SDKs e outros executores do Apache Beam.
Análise de dados de streaming com velocidade
O Dataflow permite o desenvolvimento rápido e simplificado de pipelines de dados de streaming com menor latência de dados.
Simplifique as operações e o gerenciamento
As equipes podem se concentrar na programação em vez de gerenciar clusters de servidor, já que a abordagem sem servidor do Dataflow remove a sobrecarga operacional das cargas de trabalho de engenharia de dados.
Reduzir o custo total de propriedade
O escalonamento automático de recursos aliado ao potencial de processamento em lote com custo otimizado permitem ao Dataflow oferecer uma capacidade praticamente ilimitada de gerenciar altos e baixos nas cargas de trabalho sem gastar demais.
Principais recursos
Gerenciamento automatizado de recursos e rebalanceamento dinâmico de trabalho
O Dataflow automatiza o provisionamento e o gerenciamento de recursos de processamento para minimizar a latência e maximizar a utilização. Assim, você não precisa ativar instâncias ou reservá-las manualmente. O particionamento de trabalho também é automatizado e otimizado para reequilibrar dinamicamente o trabalho atrasado. Não é preciso procurar teclas de atalho ou pré-processar dados de entrada.
Escalonamento automático horizontal
Escalonamento automático horizontal de recursos de worker para alcançar os melhores resultados de capacidade com o melhor custo-benefício.
Preços flexíveis de programação de recursos para processamento em lote
Para o processamento com flexibilidade no tempo de agendamento do job, como jobs noturnos, o agendamento flexível de recursos (FlexRS) oferece um preço menor para o processamento em lote. Esses jobs flexíveis são colocados em uma fila com a garantia de que serão recuperados para execução dentro de seis horas.
Este tutorial é uma adaptação de https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven
O que você vai aprender
- Como criar um projeto Maven com o Apache Beam usando o SDK do Java.
- Como executar um pipeline de exemplo com o Console do Google Cloud Platform
- Como excluir o bucket do Cloud Storage associado e o conteúdo dele
O que é necessário
Como você usará este tutorial?
Como você classificaria sua experiência com o uso dos serviços do Google Cloud Platform?
2. Configuração e requisitos
Configuração de ambiente autoguiada
- Faça login no Console do Cloud e crie um novo projeto ou reutilize um existente. Crie uma se você ainda não tiver uma conta do Gmail ou do G Suite.
Lembre-se do código do projeto, um nome exclusivo em todos os projetos do Google Cloud. O nome acima já foi escolhido e não servirá para você. Faremos referência a ele mais adiante neste codelab como PROJECT_ID
.
- Em seguida, será necessário ativar o faturamento no Console do Cloud para usar os recursos do Google Cloud.
A execução deste codelab não será muito cara, se for o caso. Siga todas as instruções na seção "Limpeza", que orienta você sobre como encerrar recursos para não incorrer em cobranças além deste tutorial. Novos usuários do Google Cloud estão qualificados para o programa de US$ 300 de avaliação sem custos.
Ativar as APIs
Clique no ícone de menu no canto superior esquerdo da tela.
Selecione APIs e Serviços > Painel no menu suspenso.
Selecione + Ativar APIs e serviços.
Pesquise "Compute Engine" na caixa de pesquisa. Clique em "API Compute Engine". na lista de resultados que aparece.
Na página do Google Compute Engine, clique em Ativar.
Após a ativação, clique na seta para voltar.
Agora, pesquise as seguintes APIs e ative-as também:
- Cloud Dataflow
- Stackdriver
- Cloud Storage
- JSON do Cloud Storage
- BigQuery
- Cloud Pub/Sub
- Cloud Datastore
- APIs do Cloud Resource Manager
3. Crie um novo bucket do Cloud Storage
No Console do Google Cloud Platform, clique no ícone Menu no canto superior esquerdo da tela:
Role para baixo e selecione Cloud Storage > Navegador na subseção Armazenamento:
Agora você verá o navegador do Cloud Storage e, supondo que esteja usando um projeto que não tem buckets do Cloud Storage, verá um convite para criar um novo bucket. Pressione o botão Criar bucket para criar um:
Digite um nome para o bucket. Como indicado na caixa de diálogo, os nomes dos buckets precisam ser exclusivos em todo o Cloud Storage. Portanto, se você escolher um nome óbvio, como "test", provavelmente descobrirá que outra pessoa já criou um bucket com esse nome e receberá um erro.
Há também algumas regras sobre quais caracteres são permitidos nos nomes dos buckets. Se você começar e terminar o nome do bucket com uma letra ou um número e usar apenas traços no meio, não vai ter problemas. Se você tentar usar caracteres especiais ou iniciar ou terminar o nome do bucket com algo que não seja uma letra ou um número, a caixa de diálogo lembrará você das regras.
Insira um nome exclusivo para o bucket e pressione Criar. Se você escolher algo que já está em uso, verá a mensagem de erro mostrada acima. Depois de criar o bucket, você vai acessar seu novo bucket vazio no navegador:
O nome do bucket que você vê será diferente, já que precisa ser único em todos os projetos.
4. Iniciar o Cloud Shell
Ativar o Cloud Shell
- No Console do Cloud, clique em Ativar o Cloud Shell.
Se você nunca tiver iniciado o Cloud Shell, verá uma tela intermediária (abaixo da dobra) com a descrição do que ele é. Se esse for o caso, clique em Continuar e você não o verá novamente. Esta é uma tela única:
Leva apenas alguns instantes para provisionar e se conectar ao Cloud Shell.
Essa máquina virtual contém todas as ferramentas de desenvolvimento necessárias. Ela oferece um diretório principal persistente de 5 GB, além de ser executada no Google Cloud. Isso aprimora o desempenho e a autenticação da rede. Praticamente todo o seu trabalho neste codelab pode ser feito em um navegador ou no seu Chromebook.
Depois de se conectar ao Cloud Shell, você já estará autenticado e o projeto já estará configurado com seu ID do projeto.
- Execute o seguinte comando no Cloud Shell para confirmar que você está autenticado:
gcloud auth list
Resposta ao comando
Credentialed Accounts ACTIVE ACCOUNT * <my_account>@<my_domain.com> To set the active account, run: $ gcloud config set account `ACCOUNT`
gcloud config list project
Resposta ao comando
[core] project = <PROJECT_ID>
Se o projeto não estiver configurado, configure-o usando este comando:
gcloud config set project <PROJECT_ID>
Resposta ao comando
Updated property [core/project].
5. Crie um projeto Maven
Depois que o Cloud Shell for iniciado, vamos criar um projeto Maven usando o SDK do Java para Apache Beam.
O Apache Beam é um modelo de programação de código aberto para pipelines de dados. Esses pipelines são definidos com um programa do Apache Beam e é possível escolher um executor, como o Dataflow, para executar o pipeline.
Execute o comando mvn archetype:generate
no seu shell da seguinte maneira:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
Depois de executar o comando, você verá um novo diretório chamado first-dataflow
no diretório atual. first-dataflow
contém um projeto Maven que inclui o SDK do Cloud Dataflow para Java e pipelines de exemplo.
6. Execute um pipeline de processamento de texto no Cloud Dataflow
Para começar, salve o ID do projeto e os nomes dos buckets do Cloud Storage como variáveis de ambiente. É possível fazer isso no Cloud Shell. Substitua <your_project_id>
pelo ID do seu projeto.
export PROJECT_ID=<your_project_id>
Agora faremos o mesmo para o bucket do Cloud Storage. Lembre-se de substituir <your_bucket_name>
pelo nome exclusivo que você usou para criar o bucket em uma etapa anterior.
export BUCKET_NAME=<your_bucket_name>
Altere para o diretório first-dataflow/
.
cd first-dataflow
Será executado um pipeline chamado "WordCount", que lê o texto, tokeniza as linhas de texto em palavras individuais e executa uma contagem de frequência em cada uma dessas palavras. Primeiro, vamos executar o pipeline e, enquanto ele estiver em execução, veremos o que acontece em cada etapa.
Inicie o pipeline executando o comando mvn compile exec:java
no shell ou na janela do terminal. Para os argumentos --project, --stagingLocation,
e --output
, o comando abaixo faz referência às variáveis de ambiente que você já configurou nesta etapa.
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
Enquanto o job estiver em execução, vamos encontrá-lo na lista de jobs.
Abra a IU da Web do Cloud Dataflow no Console do Google Cloud Platform. Você verá seu job de contagem de palavras com o status Em execução:
Agora vamos conhecer os parâmetros do pipeline. Para começar, clique no nome do seu job:
Quando você seleciona um job, é possível ver o gráfico de execução. O gráfico de execução de um pipeline representa cada transformação no pipeline como uma caixa que contém o nome da transformação e algumas informações de status. Para ver mais detalhes, clique no sinal de seta no canto superior direito de cada etapa:
Como o pipeline transforma os dados em cada etapa:
- Leitura: nesta etapa, o pipeline lê a partir de uma origem de entrada. Nesse caso, trata-se de um arquivo do Cloud Storage com o texto completo da peça Rei Lear, de Shakespeare. Nosso pipeline lê o arquivo linha por linha e gera um
PCollection
, em que cada linha do arquivo de texto é um elemento da coleção. - CountWords: a etapa
CountWords
tem duas partes. Primeiro, ela usa uma função de execução paralela (ParDo) chamadaExtractWords
para tokenizar cada linha em palavras individuais. O resultado de ExtractWords é uma nova PCollection, em que cada elemento é uma palavra. A próxima etapa,Count
, usa uma transformação fornecida pelo SDK do Java que retorna pares de chave-valor em que a chave é uma palavra única e o valor é o número de vezes que ela ocorre. Este é o método de implementação deCountWords
. Você pode verificar o arquivo WordCount.java completo no GitHub:
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements: invoca o
FormatAsTextFn
, copiado abaixo, que formata cada par de chave-valor em uma string apresentável.
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts: nesta etapa, gravaremos as strings que podem ser impressas em vários arquivos de texto fragmentados.
A resposta resultante do pipeline será exibida em alguns minutos.
Agora confira a página Informações do job à direita do gráfico, que inclui os parâmetros do pipeline que incluímos no comando mvn compile exec:java
.
Você também pode ver Contadores personalizados para o pipeline. Neste caso, eles mostram quantas linhas vazias foram encontradas até o momento durante a execução. É possível adicionar novos contadores ao pipeline para rastrear métricas específicas do aplicativo.
Clique no ícone Registros na parte inferior do console para exibir as mensagens de erro específicas.
Por padrão, o painel exibe as mensagens do registro de jobs que informam o status do job como um todo. Use o seletor "Gravidade mínima" para filtrar o andamento do job e as mensagens de status.
A seleção de uma etapa do pipeline no gráfico altera a visualização para registros gerados pelo código e o código gerado em execução na etapa do pipeline.
Para voltar aos Registros do job, desmarque a etapa clicando fora do gráfico ou usando o botão Fechar no painel lateral direito.
Use o botão Registros do worker na guia de registros para ver esses registros das instâncias do Compute Engine que executam seu pipeline. Esses registros consistem em linhas de registro geradas pelo seu código e pelo código gerado pelo Dataflow que os executa.
Se você estiver tentando depurar uma falha no pipeline, muitas vezes os Registros de workers terão mais registros para ajudar a resolver o problema. Lembre-se de que esses registros são agregados em todos os workers e podem ser filtrados e pesquisados.
A Interface de monitoramento do Dataflow mostra apenas as mensagens de registro mais recentes. Para conferir todos os registros, clique no link "Observabilidade do Google Cloud" no lado direito do painel de registros.
Confira um resumo dos diferentes tipos de registro disponíveis para visualização na página Monitoramento → Registros:
- Os registros job-message contêm mensagens no nível do job geradas por vários componentes do Dataflow. Os exemplos incluem a configuração de escalonamento automático, o momento em que os workers são inicializados ou desligados, o progresso na etapa do job e os erros. Os erros no nível do worker que se originam de falhas no código do usuário e que estão presentes em registros de worker também se propagam para os registros de job-message.
- Os registros worker são produzidos por workers do Dataflow. Os workers fazem a maior parte do trabalho do pipeline, por exemplo, aplicando ParDos aos dados. Os registros Worker contêm mensagens registradas pelo seu código e pelo Dataflow.
- Os registros de worker-startup estão presentes na maioria dos jobs do Dataflow e podem capturar mensagens relacionadas ao processo de inicialização. Esse processo inclui o download de jars de um job do Cloud Storage e a inicialização dos workers. Se houver um problema ao iniciar workers, esses registros serão um bom lugar para procurar.
- registros de shuffler – contêm mensagens de workers que consolidam os resultados das operações paralelas do pipeline.
- Os registros docker e kubelet contêm mensagens relacionadas a essas tecnologias públicas, que são usadas nos workers do Dataflow.
Na próxima etapa, verificaremos se o job foi concluído.
7. Confirme que o job foi concluído
Abra a IU da Web do Cloud Dataflow no Console do Google Cloud Platform.
Primeiro, você verá seu job de contagem de palavras com o status Em execução e depois com Concluído:
A execução do job levará entre três e quatro minutos.
Lembra quando você executou o canal e especificou um bucket de saída? Vamos dar uma olhada no resultado. Afinal, você não quer saber quantas ocorrências de cada palavra há no Rei Lear? Volte para o navegador do Cloud Storage no console do Google Cloud Platform. No seu bucket, os arquivos de saída e os arquivos de preparação que foram criados pelo job serão exibidos:
8. Encerrar seus recursos
É possível encerrar seus recursos no Console do Google Cloud Platform.
Abra o navegador do Cloud Storage no Console do Google Cloud Platform.
Marque a caixa de seleção ao lado do bucket que você criou e clique em EXCLUIR para excluir permanentemente o bucket e seu conteúdo.
9. Parabéns!
Você aprendeu a criar um projeto Maven com o SDK do Cloud Dataflow e a executar um pipeline de exemplo usando o Console do Google Cloud Platform, além de excluir o bucket do Cloud Storage associado e o conteúdo dele.
Saiba mais
- Documentação do Dataflow: https://cloud.google.com/dataflow/docs/
Licença
Este trabalho está sob a licença Atribuição 3.0 Genérica da Creative Commons e a licença Apache 2.0.