Como fazer o pré-processamento de dados do BigQuery com o PySpark no Dataproc

1. Visão geral

Este codelab aborda como criar um pipeline de processamento de dados usando o Apache Spark com Dataproc no Google Cloud Platform. Ler dados de um local de armazenamento, realizar transformações nele e gravá-los em outro local de armazenamento é um caso de uso comum em ciência de dados e engenharia de dados. As transformações comuns incluem mudar o conteúdo dos dados, remover informações desnecessárias e alterar os tipos de arquivo.

Neste codelab, você vai aprender sobre o Apache Spark, executar um pipeline de amostra usando o Dataproc com o PySpark (API Python do Apache Spark), BigQuery, Google Cloud Storage e dados do Reddit.

2. Introdução ao Apache Spark (opcional)

De acordo com o site, " o Apache Spark é um mecanismo de análise unificado para processamento de dados em grande escala". Ele permite analisar e processar dados em paralelo e na memória, o que permite uma computação paralela massiva em várias máquinas e nós diferentes. Ele foi lançado originalmente em 2014 como uma atualização do MapReduce tradicional e ainda é um dos frameworks mais populares para realizar computações em grande escala. O Apache Spark é escrito em Scala e, posteriormente, tem APIs em Scala, Java, Python e R. Ele contém uma grande quantidade de bibliotecas, como o Spark SQL para realizar consultas SQL nos dados, o Spark Streaming para streaming de dados, o MLlib para machine learning e o GraphX para processamento de gráficos, que são executados no mecanismo do Apache Spark.

32add0b6a47bafbc.png

O Spark pode ser executado sozinho ou pode aproveitar um serviço de gerenciamento de recursos, como Yarn, Mesos ou Kubernetes para escalonamento. Você vai usar o Dataproc para este codelab, que utiliza o Yarn.

Os dados no Spark foram carregados originalmente na memória no que é chamado de RDD ou conjunto de dados distribuídos resilientes. O desenvolvimento no Spark incluiu a adição de dois novos tipos de dados colunares: o conjunto de dados, que é digitado, e o DataFrame, que não é digitado. De modo geral, os RDDs são ótimos para qualquer tipo de dados, enquanto os conjuntos de dados e DataFrames são otimizados para dados tabulares. Como os conjuntos de dados só estão disponíveis com as APIs Java e Scala, vamos usar a API DataFrame do PySpark para este codelab. Para mais informações, consulte a documentação do Apache Spark documentation.

3. Caso de uso

Os engenheiros de dados geralmente precisam que os dados sejam facilmente acessíveis aos cientistas de dados. No entanto, os dados geralmente são inicialmente sujos (difíceis de usar para análise no estado atual) e precisam ser limpos antes de serem úteis. Um exemplo disso são os dados extraídos da Web, que podem conter codificações estranhas ou tags HTML estranhas.

Neste laboratório, você vai carregar um conjunto de dados do BigQuery na forma de postagens do Reddit em um cluster do Spark hospedado no Dataproc, extrair informações úteis e armazenar os dados processados como arquivos CSV compactados no Google Cloud Storage.

be2a4551ece63bfc.png

O cientista de dados principal da sua empresa tem interesse em que as equipes trabalhem em diferentes problemas de processamento de linguagem natural. Especificamente, eles estão interessados em analisar os dados no subreddit "r/food". Você vai criar um pipeline para um despejo de dados começando com um backfill de janeiro de 2017 a agosto de 2019.

4. Como acessar o BigQuery pela API BigQuery Storage

Extrair dados do BigQuery usando o método de API tabledata.list pode ser demorado e ineficiente à medida que a quantidade de dados aumenta. Esse método retorna uma lista de objetos JSON e exige a leitura sequencial de uma página por vez para ler um conjunto de dados inteiro.

A API BigQuery Storage traz melhorias significativas para o acesso a dados no BigQuery usando um protocolo baseado em RPC. Ela oferece suporte a leituras e gravações de dados em paralelo, bem como a diferentes formatos de serialização, como Apache Avro e Apache Arrow. Em um nível alto, isso se traduz em um desempenho significativamente melhor, especialmente em conjuntos de dados maiores.

Neste codelab, você vai usar o spark-bigquery-connector para ler e gravar dados entre o BigQuery e o Spark.

5. Como criar um projeto

Faça login no console do Google Cloud Platform em console.cloud.google.com e crie um novo projeto:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Em seguida, será necessário ativar o faturamento no console do Cloud para usar os recursos do Google Cloud.

A execução por meio deste codelab terá um custo baixo, mas poderá ser maior se você decidir usar mais recursos ou se deixá-los em execução. A última seção deste codelab vai orientar você na limpeza do projeto.

Novos usuários do Google Cloud Platform estão qualificados para um teste sem custo financeiro de US$300.

6. Como configurar seu ambiente

Agora, você vai configurar seu ambiente:

  • Ativar as APIs Compute Engine, Dataproc e BigQuery Storage
  • Definir configurações do projeto
  • Como criar um cluster do Dataproc
  • Criar um bucket do Cloud Storage

Ativar APIs e configurar seu ambiente

Abra o Cloud Shell pressionando o botão no canto superior direito do console do Cloud.

a10c47ee6ca41c54.png

Depois que o Cloud Shell for carregado, execute os comandos a seguir para ativar as APIs Compute Engine, Dataproc e BigQuery Storage:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Defina o ID do projeto. Para encontrar o ID, acesse a página de seleção de projetos e pesquise o projeto. Ele pode não ser o mesmo que o nome do projeto.

e682e8227aa3c781.png

76d45fb295728542.png

Execute o comando a seguir para definir o ID do projeto:

gcloud config set project <project_id>

Defina a região do projeto escolhendo uma na lista aqui. Um exemplo pode ser us-central1.

gcloud config set dataproc/region <region>

Escolha um nome para o cluster do Dataproc e crie uma variável de ambiente para ele.

CLUSTER_NAME=<cluster_name>

Como criar um cluster do Dataproc

Crie um cluster do Dataproc executando o comando a seguir:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

Esse comando leva alguns minutos para ser concluído. Para detalhar o comando:

Isso vai iniciar a criação de um cluster do Dataproc com o nome fornecido anteriormente. O uso da API beta vai ativar os recursos Beta do Dataproc, como o Gateway de componentes.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Isso vai definir o tipo de máquina a ser usada para os workers.

--worker-machine-type n1-standard-8

Isso vai definir o número de workers que o cluster terá.

--num-workers 8

Isso vai definir a versão da imagem do Dataproc.

--image-version 1.5-debian

Isso vai configurar as ações de inicialização a serem usadas no cluster. Aqui, você está incluindo a ação de inicialização do pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Esses são os metadados a serem incluídos no cluster. Aqui, você está fornecendo metadados para a ação de inicialização pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Isso vai definir os componentes opcionais a serem instalados no cluster.

--optional-components=ANACONDA

Isso vai ativar o Gateway de componentes, que permite usar o Gateway de componentes do Dataproc para visualizar interfaces comuns, como Zeppelin, Jupyter ou o histórico do Spark.

--enable-component-gateway

Para uma introdução mais detalhada ao Dataproc, consulte este codelab.

Criar um bucket do Cloud Storage

Você vai precisar de um bucket do Cloud Storage para a saída do job. Determine um nome exclusivo para o bucket e execute o comando a seguir para criar um novo bucket. Os nomes de bucket são exclusivos em todos os projetos do Google Cloud para todos os usuários. Portanto, talvez seja necessário tentar algumas vezes com nomes diferentes. Um bucket é criado com sucesso se você não receber uma ServiceException.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Análise exploratória de dados

Antes de realizar o pré-processamento, aprenda mais sobre a natureza dos dados com que você está trabalhando. Para fazer isso, você vai explorar dois métodos de exploração de dados. Primeiro, você vai visualizar alguns dados brutos usando a interface da Web do BigQuery e, em seguida, calcular o número de postagens por subreddit usando o PySpark e o Dataproc.

Como usar a interface da Web do BigQuery

Comece usando a interface da Web do BigQuery para visualizar seus dados. No ícone de menu do console do Cloud, role para baixo e pressione "BigQuery" para abrir a interface da Web do BigQuery.

242a597d7045b4da.png

Em seguida, execute o comando a seguir no editor de consultas da interface da Web do BigQuery. Isso vai retornar 10 linhas completas dos dados de janeiro de 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Role a página para ver todas as colunas disponíveis, bem como alguns exemplos. Em particular, você vai encontrar duas colunas que representam o conteúdo textual de cada postagem: "título" e "selftext", sendo a última o corpo da postagem. Observe também outras colunas, como "created_utc", que é a hora UTC em que uma postagem foi feita, e "subreddit", que é o subreddit em que a postagem existe.

Executar um job do PySpark

Execute os comandos a seguir no Cloud Shell para clonar o repositório com o exemplo de código e cd no diretório correto:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

É possível usar o PySpark para determinar uma contagem de quantas postagens existem para cada subreddit. Você pode abrir o Cloud Editor e ler o script cloud-dataproc/codelabs/spark-bigquery antes de executá-lo na próxima etapa:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Clique no botão "Abrir terminal" no Cloud Editor para voltar ao Cloud Shell e execute o comando a seguir para executar seu primeiro job do PySpark:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Esse comando permite enviar jobs ao Dataproc pela API Jobs. Aqui, você está indicando o tipo de serviço como pyspark. É possível fornecer o nome do cluster, parâmetros opcionais e o nome do arquivo que contém o job. Aqui, você está fornecendo o parâmetro --jars, que permite incluir o spark-bigquery-connector com seu job. Também é possível definir os níveis de saída de registro usando --driver-log-levels root=FATAL, que vai suprimir toda a saída de registro, exceto erros. Os registros do Spark tendem a ser bastante ruidosos.

Isso leva alguns minutos para ser executado e a saída final será semelhante a esta:

6c185228db47bb18.png

8. Como explorar as interfaces do Dataproc e do Spark

Ao executar jobs do Spark no Dataproc, você tem acesso a duas interfaces para verificar o status dos jobs / clusters. A primeira é a interface do Dataproc, que pode ser encontrada clicando no ícone de menu e rolando para baixo até o Dataproc. Aqui, você pode conferir a memória disponível atual, bem como a memória pendente e o número de workers.

6f2987346d15c8e2.png

Também é possível clicar na guia "Jobs" para conferir os jobs concluídos. Para conferir detalhes da tarefa, como os registros e a saída dessas tarefas, clique no código da tarefa de uma tarefa específica. 114d90129b0e4c88.png

1b2160f0f484594a.png

Também é possível conferir a interface do Spark. Na página de jobs, clique na seta para voltar e, em seguida, em "Interfaces da Web". Várias opções vão aparecer em "Gateway de componentes". Muitas delas podem ser ativadas usando "Componentes opcionais" ao configurar o cluster. Para este laboratório, clique em "Servidor de histórico do Spark".

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

Isso vai abrir a seguinte janela:

8f6786760f994fe8.png

Todos os jobs concluídos vão aparecer aqui, e você pode clicar em qualquer application_id para saber mais informações sobre o job. Da mesma forma, é possível clicar em "Mostrar aplicativos incompletos" na parte de baixo da página de destino para conferir todos os jobs em execução.

9. Executar o job de backfill

Agora, você vai executar um job que carrega dados na memória, extrai as informações necessárias e despeja a saída em um bucket do Cloud Storage. Você vai extrair o "título", o "corpo" (texto bruto) e o "carimbo de data/hora criado" para cada comentário do Reddit. Em seguida, você vai usar esses dados, convertê-los em um CSV, compactá-los e carregá-los em um bucket com um URI de gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

Você pode consultar o Cloud Editor novamente para ler o código de cloud-dataproc/codelabs/spark-bigquery/backfill.sh, que é um script wrapper para executar o código em cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Em breve, você vai encontrar várias mensagens de conclusão de job. O job pode levar até 15 minutos para ser concluído. Você também pode verificar o bucket de armazenamento para verificar a saída de dados bem-sucedida usando o gsutil. Quando todos os jobs forem concluídos, execute o comando a seguir:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Você verá esta resposta:

a7c3c7b2e82f9fca.png

Parabéns, você concluiu um backfill para os dados de comentários do Reddit. Se você tiver interesse em como criar modelos com base nesses dados, continue para o codelab do Spark-NLP.

10. Revisão dos dados

Para evitar cobranças desnecessárias na sua conta do GCP após a conclusão deste guia de início rápido:

  1. Exclua o bucket do Cloud Storage do ambiente que você criou.
  2. Exclua o ambiente do Dataproc.

Se você criou um projeto apenas para este codelab, também poderá excluir o projeto:

  1. No console do GCP, acesse a página Projetos.
  2. Na lista de projetos, selecione um e clique em Excluir.
  3. Na caixa, digite o ID do projeto e clique em desligar para excluir o projeto.

Licença

Este trabalho está licenciado sob a Licença Creative Commons Atribuição 3.0 genérica e a licença Apache 2.0.