Executar um pipeline de processamento de texto de Big Data no Cloud Dataflow

1. Visão geral

Cloud-Dataflow.png

O que é o Dataflow?

O Dataflow é um serviço gerenciado que executa uma ampla variedade de padrões de processamento de dados. A documentação neste site mostra como implantar os pipelines de processamento de dados de streaming e em lote usando o Dataflow, incluindo instruções de uso dos recursos de serviço.

O SDK do Apache Beam é um modelo de programação de código aberto que permite desenvolver pipelines de lote e de streaming. Você cria pipelines com um programa do Apache Beam e os executa no serviço do Dataflow. A documentação do Apache Beam (em inglês) fornece informações conceituais aprofundadas e material de referência para o modelo de programação, os SDKs e outros executores do Apache Beam.

Análise de dados de streaming com velocidade

O Dataflow permite o desenvolvimento rápido e simplificado de pipelines de dados de streaming com menor latência de dados.

Simplifique as operações e o gerenciamento

Com a abordagem sem servidor do Dataflow removendo a sobrecarga operacional das cargas de trabalho da engenharia de dados, as equipes podem se concentrar na programação em vez de gerenciar clusters de servidores.

Reduza o custo total de propriedade

O escalonamento automático de recursos aliado ao potencial de processamento em lote com custo otimizado permitem ao Dataflow oferecer uma capacidade praticamente ilimitada de gerenciar altos e baixos nas cargas de trabalho sem gastar demais.

Principais características

Gerenciamento automatizado de recursos e reequilíbrio dinâmico de trabalho

O Dataflow automatiza o provisionamento e o gerenciamento dos recursos de processamento para minimizar a latência e maximizar a utilização. Assim, você não precisa criar novas instâncias ou reservá-las manualmente. O particionamento de trabalho também é automatizado e otimizado para reequilibrar dinamicamente o trabalho com atraso. Não é preciso procurar "teclas de atalho" nem fazer o pré-processamento dos seus dados de entrada.

Escalonamento automático horizontal

O escalonamento automático horizontal dos recursos de worker para alcançar os melhores resultados de capacidade de processamento com o melhor custo-benefício.

Preços flexíveis de programação de recursos para processamento em lote

Para o processamento com flexibilidade no tempo de agendamento do job, como jobs noturnos, o agendamento flexível de recursos (FlexRS) custa menos para o processamento em lote. Esses jobs flexíveis são colocados em uma fila com a garantia de que eles serão recuperados para execução dentro de seis horas.

Este tutorial foi adaptado de https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven

O que você vai aprender

  • Como criar um projeto Maven com o Apache Beam usando o SDK Java
  • Como executar um pipeline de exemplo com o Console do Google Cloud Platform
  • Como excluir o bucket do Cloud Storage associado e o conteúdo dele

O que é necessário

Como você usará este tutorial?

Apenas leitura Leitura e exercícios

Como você classificaria sua experiência com o uso dos serviços do Google Cloud Platform?

Iniciante Intermediário Proficiente

2. Configuração e requisitos

Configuração de ambiente autoguiada

  1. Faça login no Console do Cloud e crie um novo projeto ou reutilize um existente. Crie uma se você ainda não tiver uma conta do Gmail ou do G Suite.

dMbN6g9RawQj_VXCSYpdYncY-DbaRzr2GbnwoV7jFf1u3avxJtmGPmKpMYgiaMH-qu80a_NJ9p2IIXFppYk8x3wyymZXavjglNLJJhuXieCem56H30hwXtd8PvXGpXJO9gEUDu3cZw

ci9Oe6PgnbNuSYlMyvbXF1JdQyiHoEgnhl4PlV_MFagm2ppzhueRkqX4eLjJllZco_2zCp0V0bpTupUSKji9KkQyWqj11pqit1K1faS1V6aFxLGQdkuzGp4rsQTan7F01iePL5DtqQ

8-tA_Lheyo8SscAVKrGii2coplQp2_D1Iosb2ViABY0UUO1A8cimXUu6Wf1R9zJIRExL5OB2j946aIiFtyKTzxDcNnuznmR45vZ2HMoK3o67jxuoUJCAnqvEX6NgPGFjCVNgASc-lg

Lembre-se do código do projeto, um nome exclusivo em todos os projetos do Google Cloud. O nome acima já foi escolhido e não servirá para você. Faremos referência a ele mais adiante neste codelab como PROJECT_ID.

  1. Em seguida, será necessário ativar o faturamento no Console do Cloud para usar os recursos do Google Cloud.

A execução deste codelab não será muito cara, se for o caso. Siga todas as instruções na seção "Limpeza", que orienta você sobre como encerrar recursos para não incorrer em cobranças além deste tutorial. Novos usuários do Google Cloud estão qualificados para o programa de US$ 300 de avaliação sem custos.

Ativar as APIs

Clique no ícone de menu no canto superior esquerdo da tela.

2bfc27ef9ba2ec7d.png

Selecione APIs e serviços > Painel no menu suspenso.

5b65523a6cc0afa6.png

Selecione + Ativar APIs e serviços.

81ed72192c0edd96.png

Pesquise "Compute Engine" na caixa de pesquisa. Clique em "API Compute Engine" na lista de resultados que aparece.

3f201e991c7b4527.png

Na página do Google Compute Engine, clique em Ativar.

ac121653277fa7bb.png

Depois de fazer a ativação, clique na seta para voltar.

Agora pesquise e ative as seguintes APIs:

  • Cloud Dataflow
  • Stackdriver
  • Cloud Storage
  • JSON do Cloud Storage
  • BigQuery
  • Cloud Pub/Sub
  • Cloud Datastore
  • APIs Cloud Resource Manager

3. Criar um novo bucket do Cloud Storage

No console do Google Cloud Platform, clique no ícone Menu no canto superior esquerdo da tela:

2bfc27ef9ba2ec7d.png

Role a tela para baixo e selecione Cloud Storage > Navegador na subseção Storage:

2b6c3a2a92b47015.png

Agora você vai ver o navegador do Cloud Storage. Se estiver usando um projeto que não tem buckets do Cloud Storage, vai aparecer um convite para criar um. Clique no botão Criar bucket para criar um:

a711016d5a99dc37.png

Digite um nome para o bucket. Como a caixa de diálogo observa, os nomes de bucket precisam ser exclusivos em todo o Cloud Storage. Portanto, se você escolher um nome óbvio, como "test", provavelmente vai descobrir que outra pessoa já criou um bucket com esse nome e vai receber um erro.

Há também algumas regras sobre quais caracteres são permitidos nos nomes de buckets. Se você começar e terminar o nome do bucket com uma letra ou um número e usar apenas traços no meio, não haverá problemas. Se você tentar usar caracteres especiais ou começar ou terminar o nome do bucket com algo que não seja uma letra ou um número, a caixa de diálogo vai lembrar você das regras.

3a5458648cfe3358.png

Digite um nome exclusivo para o bucket e clique em Criar. Se você escolher algo que já está em uso, vai receber a mensagem de erro mostrada acima. Depois de criar um bucket com sucesso, você verá seu novo bucket vazio no navegador:

3bda986ae88c4e71.png

O nome do bucket que você vê será diferente, já que eles precisam ser exclusivos em todos os projetos.

4. Iniciar o Cloud Shell

Ativar o Cloud Shell

  1. No Console do Cloud, clique em Ativar o Cloud ShellH7JlbhKGHITmsxhQIcLwoe5HXZMhDlYue4K-SPszMxUxDjIeWfOHBfxDHYpmLQTzUmQ7Xx8o6OJUlANnQF0iBuUyfp1RzVad_4nCa0Zz5LtwBlUZFXFCWFrmrWZLqg1MkZz2LdgUDQ.

zlNW0HehB_AFW1qZ4AyebSQUdWm95n7TbnOr7UVm3j9dFcg6oWApJRlC0jnU1Mvb-IQp-trP1Px8xKNwt6o3pP6fyih947sEhOFI4IRF0W7WZk6hFqZDUGXQQXrw21GuMm2ecHrbzQ

Se você nunca tiver iniciado o Cloud Shell, verá uma tela intermediária (abaixo da dobra) com a descrição do que ele é. Se esse for o caso, clique em Continuar e você não o verá novamente. Esta é uma tela única:

kEPbNAo_w5C_pi9QvhFwWwky1cX8hr_xEMGWySNIoMCdi-Djx9AQRqWn-__DmEpC7vKgUtl-feTcv-wBxJ8NwzzAp7mY65-fi2LJo4twUoewT1SUjd6Y3h81RG3rKIkqhoVlFR-G7w

Leva apenas alguns instantes para provisionar e se conectar ao Cloud Shell.

pTv5mEKzWMWp5VBrg2eGcuRPv9dLInPToS-mohlrqDASyYGWnZ_SwE-MzOWHe76ZdCSmw0kgWogSJv27lrQE8pvA5OD6P1I47nz8vrAdK7yR1NseZKJvcxAZrPb8wRxoqyTpD-gbhA

Essa máquina virtual contém todas as ferramentas de desenvolvimento necessárias. Ela oferece um diretório principal persistente de 5 GB, além de ser executada no Google Cloud. Isso aprimora o desempenho e a autenticação da rede. Praticamente todo o seu trabalho neste codelab pode ser feito em um navegador ou no seu Chromebook.

Depois de se conectar ao Cloud Shell, você já estará autenticado e o projeto já estará configurado com seu ID do projeto.

  1. Execute o seguinte comando no Cloud Shell para confirmar que você está autenticado:
gcloud auth list

Resposta ao comando

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
gcloud config list project

Resposta ao comando

[core]
project = <PROJECT_ID>

Se o projeto não estiver configurado, configure-o usando este comando:

gcloud config set project <PROJECT_ID>

Resposta ao comando

Updated property [core/project].

5. Crie um projeto Maven

Depois que o Cloud Shell for iniciado, vamos começar criando um projeto Maven usando o SDK Java para Apache Beam.

O Apache Beam é um modelo de programação de código aberto para pipelines de dados. Esses pipelines são definidos usando um programa do Apache Beam, e é possível escolher um executor, como o Dataflow, para executar o pipeline.

Execute o comando mvn archetype:generate no shell da seguinte maneira:

  mvn archetype:generate \
     -DarchetypeGroupId=org.apache.beam \
     -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
     -DarchetypeVersion=2.46.0 \
     -DgroupId=org.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -Dpackage=org.apache.beam.examples \
     -DinteractiveMode=false

Depois de executar o comando, você verá um novo diretório chamado first-dataflow abaixo do diretório atual. first-dataflow contém um projeto Maven que inclui o SDK do Cloud Dataflow para Java e pipelines de exemplo.

6. Execute um pipeline de processamento de texto no Cloud Dataflow

Vamos começar salvando o ID do projeto e os nomes dos buckets do Cloud Storage como variáveis de ambiente. É possível fazer isso no Cloud Shell. Substitua <your_project_id> pelo ID do seu projeto.

 export PROJECT_ID=<your_project_id>

Agora vamos fazer o mesmo para o bucket do Cloud Storage. Substitua <your_bucket_name> pelo nome exclusivo que você usou para criar o bucket em uma etapa anterior.

 export BUCKET_NAME=<your_bucket_name>

Altere para o diretório first-dataflow/.

 cd first-dataflow

Será executado um pipeline chamado "WordCount", que lê o texto, tokeniza as linhas de texto em palavras individuais e executa uma contagem de frequência em cada uma dessas palavras. Primeiro, vamos executar o pipeline e, enquanto ele estiver em execução, vamos analisar o que está acontecendo em cada etapa.

Inicie o pipeline executando o comando mvn compile exec:java no shell ou na janela do terminal. Nos argumentos --project, --stagingLocation, e --output, o comando abaixo faz referência às variáveis de ambiente que você configurou anteriormente nesta etapa.

 mvn compile exec:java \
      -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=DataflowRunner \
      --region=us-central1 \
      --gcpTempLocation=gs://${BUCKET_NAME}/temp"

Enquanto o job estiver em execução, vamos encontrá-lo na lista de jobs.

Abra a interface da Web do Cloud Dataflow no Console do Google Cloud Platform. Você vai ver seu job de contagem de palavras com o status Em execução:

3623be74922e3209.png

Agora vamos conhecer os parâmetros do pipeline. Para começar, clique no nome do seu job:

816d8f59c72797d7.png

Ao selecionar um job, é possível conferir o gráfico de execução. O gráfico de execução de um pipeline representa cada transformação no pipeline como uma caixa que contém o nome da transformação e algumas informações de status. Para ver mais detalhes, clique no sinal de seta no canto superior direito de cada etapa:

80a972dd19a6f1eb.png

Como o pipeline transforma os dados em cada etapa:

  • Read: nesta etapa, o pipeline lê de uma origem da entrada. Nesse caso, é um arquivo de texto do Cloud Storage com todo o texto da peça de Shakespeare Rei Lear. Nosso pipeline lê o arquivo linha por linha e gera um PCollection, em que cada linha no arquivo de texto é um elemento da coleção.
  • CountWords: a etapa CountWords tem duas partes. Primeiro, ele usa uma função paralela do (ParDo) chamada ExtractWords para criar tokens de cada linha em palavras individuais. A saída de "ExtractWords" é uma nova PCollection em que cada elemento é uma palavra. A próxima etapa, Count, usa uma transformação fornecida pelo SDK para Java que retorna pares de chave-valor em que a chave é uma palavra única e o valor é o número de vezes que ela aparece. Confira o método que implementa CountWords e o arquivo WordCount.java completo no GitHub:
 /**
   * A PTransform that converts a PCollection containing lines of text into a PCollection of
   * formatted word counts.
   *
   * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
   * modular testing, and an improved monitoring experience.
   */
  public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());

      return wordCounts;
    }
  }
  • MapElements: invoca o FormatAsTextFn, copiado abaixo, que formata cada par de chave-valor em uma string imprimível.
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }
  • WriteCounts: nesta etapa, gravamos as strings imprimíveis em vários arquivos de texto fragmentados.

A resposta resultante do pipeline será exibida em alguns minutos.

Agora veja a página Informações do job à direita do gráfico, que inclui os parâmetros de pipeline que adicionamos ao comando mvn compile exec:java.

9723815a1f5bf08b.png

208a7f0d6973acf6.png

Você também pode ver Contadores personalizados para o pipeline. Neste caso, eles mostram quantas linhas vazias foram encontradas até o momento durante a execução. Você pode adicionar novos contadores ao seu pipeline para rastrear métricas específicas do aplicativo.

a2e2800e2c6893f8.png

Clique no ícone Registros na parte de baixo do console para ver as mensagens de erro específicas.

23c64138a1027f8.png

O painel mostra por padrão mensagens do registro do job que informam o status do job como um todo. Use o seletor "Gravidade mínima" para filtrar o andamento do job e as mensagens de status.

94ba42015fdafbe2.png

A seleção de uma etapa no gráfico muda a visualização para os registros gerados pelo código e o código gerado em execução na etapa do pipeline.

Para voltar aos Registros do job, clique fora do gráfico ou use o botão "Fechar" no painel lateral à direita.

Use o botão Registros de worker na guia "Registros" para ver os registros de worker das instâncias do Compute Engine que executam seu pipeline. Esses registros consistem em linhas de registro geradas pelo seu código e pelo código gerado pelo Dataflow que os executa.

Se você estiver tentando depurar uma falha no pipeline, muitas vezes haverá outros dados nos registros do worker para ajudar a resolver o problema. Esses registros são agregados em todos os workers e podem ser filtrados e pesquisados.

5a53c244f28d5478.png

A Interface de monitoramento do Dataflow mostra apenas as mensagens de registro mais recentes. Para conferir todos os registros, clique no link do Google Cloud Observability no lado direito do painel de registros.

2bc704a4d6529b31.png

Confira um resumo dos diferentes tipos de registro disponíveis para visualização na página Monitoramento → Registros:

  • Os registros job-message contêm mensagens no nível de job que vários componentes do Dataflow geram. Os exemplos incluem a configuração de escalonamento automático, o momento em que os workers são inicializados ou desligados, o progresso na etapa do job e erros no job. Os erros no nível do worker que são originados de falha no código de usuário e que estão presentes em registros worker também se propagam para os registros jobs-message.
  • Os registros worker são produzidos por workers do Dataflow. Os workers fazem a maior parte do trabalho do pipeline. Por exemplo, aplicar ParDos aos dados. Os registros worker contêm mensagens registradas pelo seu código e pelo Dataflow.
  • Os registros worker-startup estão presentes na maioria dos jobs do Dataflow e podem capturar mensagens relacionadas ao processo de inicialização. Esse processo inclui o download de jars do job do Google Cloud Storage e a inicialização dos workers. Se houver um problema ao iniciar os workers, esses registros são um bom local para procurar.
  • Os registros shuffler contêm mensagens de workers que consolidam os resultados de operações paralelas do pipeline.
  • Os registros docker e kubelet contêm mensagens relacionadas a essas tecnologias públicas, que são usadas em workers do Dataflow.

Na próxima etapa, vamos verificar se o job foi concluído.

7. Confirme que o job foi concluído

Abra a interface da Web do Cloud Dataflow no Console do Google Cloud Platform.

Você verá o job de contagem de palavras com o status Em execução que mudará para Concluído:

4c408162416d03a2.png

A execução do job levará entre três e quatro minutos.

Lembra quando você executou o canal e especificou um bucket de saída? Vamos dar uma olhada no resultado. Afinal, você não quer saber quantas ocorrências de cada palavra há no Rei Lear? Volte ao navegador do Cloud Storage no console do Google Cloud Platform. No seu bucket, os arquivos de saída e os arquivos de preparação que foram criados pelo job serão exibidos:

25a5d3d4b5d0b567.png

8. Desativar seus recursos

É possível encerrar os recursos no Console do Google Cloud Platform.

Abra o navegador do Cloud Storage no Console do Google Cloud Platform.

2b6c3a2a92b47015.png

Marque a caixa de seleção ao lado do bucket que você criou e clique em EXCLUIR para remover permanentemente o bucket e o conteúdo dele.

2f7780bdf10b69ba.png

8051ef293a8e5cfe.png

9. Parabéns!

Você aprendeu a criar um projeto Maven com o SDK do Cloud Dataflow e a executar um pipeline de exemplo usando o Console do Google Cloud Platform, além de excluir o bucket do Cloud Storage associado e o conteúdo dele.

Saiba mais

Licença

Este trabalho está licenciado sob uma Licença Creative Commons Atribuição 3.0 Genérica e uma licença Apache 2.0.