Como fazer o pré-processamento de dados do BigQuery com o PySpark no Dataproc

1. Visão geral

Neste codelab, vamos mostrar como criar um pipeline de processamento de dados usando o Apache Spark com o Dataproc no Google Cloud Platform. Ler dados de um local de armazenamento, realizar transformações nele e gravá-los em outro local de armazenamento é um caso de uso comum em ciência de dados e engenharia de dados. As transformações comuns incluem mudar o conteúdo dos dados, remover informações desnecessárias e alterar tipos de arquivo.

Neste codelab, você vai aprender sobre o Apache Spark, executar um pipeline de amostra usando o Dataproc com PySpark (API Python do Apache Spark), BigQuery, Google Cloud Storage e dados do Reddit.

2. Introdução ao Apache Spark (opcional)

De acordo com o site, "o Apache Spark é um mecanismo de análise unificado para processamento de dados em grande escala". Ele permite analisar e processar dados em paralelo e na memória, o que possibilita computação paralela massiva em várias máquinas e nós diferentes. Ele foi lançado originalmente em 2014 como uma atualização do MapReduce tradicional e ainda é um dos frameworks mais usados para realizar computações em grande escala. O Apache Spark é escrito em Scala e tem APIs em Scala, Java, Python e R. Ele contém várias bibliotecas, como Spark SQL para realizar consultas SQL nos dados, Spark Streaming para streaming de dados, MLlib para machine learning e GraphX para processamento de gráficos. Todas elas são executadas no mecanismo do Apache Spark.

32add0b6a47bafbc.png

O Spark pode ser executado por conta própria ou usar um serviço de gerenciamento de recursos como Yarn, Mesos ou Kubernetes para escalonamento. Neste codelab, você vai usar o Dataproc, que utiliza o Yarn.

Os dados no Spark eram originalmente carregados na memória em um RDD, ou conjunto de dados distribuído resiliente. Desde então, o desenvolvimento no Spark incluiu a adição de dois novos tipos de dados no estilo de coluna: o conjunto de dados, que é tipado, e o DataFrame, que não é tipado. De modo geral, os RDDs são ótimos para qualquer tipo de dados, enquanto os conjuntos de dados e os DataFrames são otimizados para dados tabulares. Como os conjuntos de dados só estão disponíveis com as APIs Java e Scala, vamos usar a API PySpark Dataframe neste codelab. Para mais informações, consulte a documentação do Apache Spark.

3. Caso de uso

Os engenheiros de dados geralmente precisam que os dados sejam facilmente acessíveis aos cientistas de dados. No entanto, os dados geralmente são inicialmente sujos (difíceis de usar para análise no estado atual) e precisam ser limpos antes de serem úteis. Um exemplo disso são os dados extraídos da Web, que podem conter codificações estranhas ou tags HTML desnecessárias.

Neste laboratório, você vai carregar um conjunto de dados do BigQuery na forma de postagens do Reddit em um cluster do Spark hospedado no Dataproc, extrair informações úteis e armazenar os dados processados como arquivos CSV compactados no Google Cloud Storage.

be2a4551ece63bfc.png

O principal cientista de dados da sua empresa quer que as equipes trabalhem em diferentes problemas de processamento de linguagem natural. Especificamente, eles querem analisar os dados do subreddit "r/food". Você vai criar um pipeline para um despejo de dados começando com um backfill de janeiro de 2017 a agosto de 2019.

4. Acessar o BigQuery pela API BigQuery Storage

Extrair dados do BigQuery usando o método da API tabledata.list pode ser demorado e ineficiente à medida que a quantidade de dados aumenta. Esse método retorna uma lista de objetos JSON e exige a leitura sequencial de uma página por vez para ler um conjunto de dados inteiro.

A API BigQuery Storage traz melhorias significativas para o acesso a dados no BigQuery usando um protocolo baseado em RPC. Ele oferece suporte a leituras e gravações de dados em paralelo, além de diferentes formatos de serialização, como Apache Avro e Apache Arrow. Em um nível alto, isso se traduz em um desempenho significativamente melhor, especialmente em conjuntos de dados maiores.

Neste codelab, você vai usar o spark-bigquery-connector para ler e gravar dados entre o BigQuery e o Spark.

5. Como criar um projeto

Faça login no console do Google Cloud Platform em console.cloud.google.com e crie um projeto:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Em seguida, será necessário ativar o faturamento no console do Cloud para usar os recursos do Google Cloud.

A execução deste codelab não deve custar mais do que alguns dólares, mas pode ser mais se você decidir usar mais recursos ou deixá-los em execução. A última seção deste codelab vai mostrar como limpar seu projeto.

Novos usuários do Google Cloud Platform estão qualificados para uma avaliação sem custo financeiro de US$300.

6. Como configurar seu ambiente

Agora você vai configurar seu ambiente fazendo o seguinte:

  • Como ativar as APIs Compute Engine, Dataproc e BigQuery Storage
  • Como configurar as configurações do projeto
  • Como criar um cluster do Dataproc
  • Criar um bucket do Cloud Storage

Ativar APIs e configurar o ambiente

Abra o Cloud Shell clicando no botão no canto superior direito do console do Cloud.

a10c47ee6ca41c54.png

Depois que o Cloud Shell for carregado, execute os comandos a seguir para ativar as APIs Compute Engine, Dataproc e BigQuery Storage:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Defina o ID do projeto. Para encontrar, acesse a página de seleção de projetos e pesquise o seu. Talvez não seja o mesmo nome do projeto.

e682e8227aa3c781.png

76d45fb295728542.png

Execute o comando a seguir para definir o ID do projeto:

gcloud config set project <project_id>

Defina a região do projeto escolhendo uma na lista aqui. Por exemplo, us-central1.

gcloud config set dataproc/region <region>

Escolha um nome para o cluster do Dataproc e crie uma variável de ambiente para ele.

CLUSTER_NAME=<cluster_name>

Como criar um cluster do Dataproc

Crie um cluster do Dataproc executando o seguinte comando:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

A execução desse comando leva alguns minutos. Para detalhar o comando:

Isso vai iniciar a criação de um cluster do Dataproc com o nome que você forneceu anteriormente. Usar a API beta vai ativar recursos Beta do Dataproc, como o Gateway de componentes.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Isso vai definir o tipo de máquina a ser usado para os workers.

--worker-machine-type n1-standard-8

Isso vai definir o número de workers que seu cluster terá.

--num-workers 8

Isso vai definir a versão da imagem do Dataproc.

--image-version 1.5-debian

Isso vai configurar as ações de inicialização a serem usadas no cluster. Aqui, você está incluindo a ação de inicialização pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

São os metadados a serem incluídos no cluster. Aqui, você está fornecendo metadados para a ação de inicialização pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Isso vai definir os Componentes opcionais a serem instalados no cluster.

--optional-components=ANACONDA

Isso vai ativar o gateway de componentes, que permite usar o gateway de componentes do Dataproc para visualizar interfaces comuns, como Zeppelin, Jupyter ou o histórico do Spark.

--enable-component-gateway

Para uma introdução mais detalhada ao Dataproc, confira este codelab.

Como criar um bucket do Cloud Storage

Você vai precisar de um bucket do Cloud Storage para a saída do job. Determine um nome exclusivo para o bucket e execute o comando a seguir para criar um novo bucket. Os nomes de bucket são exclusivos em todos os projetos do Google Cloud para todos os usuários. Por isso, talvez seja necessário tentar algumas vezes com nomes diferentes. Um bucket é criado com sucesso se você não receber um ServiceException.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Análise exploratória de dados

Antes de fazer o pré-processamento, saiba mais sobre a natureza dos dados que você está usando. Para isso, você vai conhecer dois métodos de exploração de dados. Primeiro, você vai conferir alguns dados brutos usando a interface da Web do BigQuery. Depois, vai calcular o número de postagens por subreddit usando o PySpark e o Dataproc.

Como usar a IU da Web do BigQuery

Comece usando a IU da Web do BigQuery para ver seus dados. No ícone de menu do console do Cloud, role para baixo e pressione "BigQuery" para abrir a interface da Web do BigQuery.

242a597d7045b4da.png

Em seguida, execute este comando no Editor de consultas da interface da Web do BigQuery. Isso vai retornar 10 linhas completas dos dados de janeiro de 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Role a página para ver todas as colunas disponíveis e alguns exemplos. Em particular, você vai encontrar duas colunas que representam o conteúdo textual de cada postagem: "title" e "selftext", sendo a última o corpo da postagem. Observe também outras colunas, como "created_utc", que é a hora UTC em que uma postagem foi feita, e "subreddit", que é o subreddit em que a postagem está.

Executar um job do PySpark

Execute os comandos a seguir no Cloud Shell para clonar o repositório com o exemplo de código e acessar o diretório correto:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

É possível usar o PySpark para determinar uma contagem de quantas postagens existem em cada subreddit. Abra o editor do Cloud e leia o script cloud-dataproc/codelabs/spark-bigquery antes de executá-lo na próxima etapa:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Clique no botão "Abrir terminal" no Cloud Editor para voltar ao Cloud Shell e execute o comando a seguir para executar seu primeiro job do PySpark:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Esse comando permite enviar jobs ao Dataproc pela API Jobs. Aqui você está indicando o tipo de serviço como pyspark. É possível fornecer o nome do cluster, parâmetros opcionais e o nome do arquivo que contém o job. Aqui, você está fornecendo o parâmetro --jars, que permite incluir o spark-bigquery-connector com seu job. Também é possível definir os níveis de saída de registro usando --driver-log-levels root=FATAL, que vai suprimir toda a saída de registro, exceto os erros. Os registros do Spark tendem a ser bastante ruidosos.

Isso leva alguns minutos para ser executado, e a saída final será parecida com esta:

6c185228db47bb18.png

8. Como explorar as interfaces do Dataproc e do Spark

Ao executar jobs do Spark no Dataproc, você tem acesso a duas interfaces para verificar o status dos seus jobs / clusters. A primeira é a interface do Dataproc, que pode ser encontrada clicando no ícone de menu e rolando a tela para baixo até o Dataproc. Aqui, você pode conferir a memória disponível atual, a memória pendente e o número de workers.

6f2987346d15c8e2.png

Você também pode clicar na guia "Jobs" para ver os jobs concluídos. Para ver detalhes do job, como os registros e a saída, clique no código da tarefa de um job específico. 114d90129b0e4c88.png

1b2160f0f484594a.png

Também é possível acessar a interface do Spark. Na página do job, clique na seta para voltar e em "Interfaces da Web". Várias opções vão aparecer em "Gateway de componentes". Muitos deles podem ser ativados em Componentes opcionais ao configurar o cluster. Neste laboratório, clique em "Servidor de histórico do Spark".

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

Isso vai abrir a seguinte janela:

8f6786760f994fe8.png

Todos os jobs concluídos vão aparecer aqui. Clique em qualquer application_id para saber mais sobre o job. Da mesma forma, clique em "Mostrar inscrições incompletas" na parte de baixo da página de destino para ver todos os trabalhos em andamento.

9. Executar o job de preenchimento

Agora você vai executar um job que carrega dados na memória, extrai as informações necessárias e despeja a saída em um bucket do Cloud Storage. Você vai extrair o "título", o "corpo" (texto bruto) e o "carimbo de data/hora da criação" de cada comentário do Reddit. Em seguida, pegue esses dados, converta em um CSV, compacte e carregue em um bucket com um URI de gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

Consulte o Cloud Editor novamente para ler o código de cloud-dataproc/codelabs/spark-bigquery/backfill.sh, que é um script wrapper para executar o código em cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Em breve, você vai ver várias mensagens de conclusão de jobs. O job pode levar até 15 minutos para ser concluído. Você também pode verificar o bucket de armazenamento para confirmar a saída de dados usando o gsutil. Quando todos os jobs forem concluídos, execute o comando a seguir:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Você verá esta resposta:

a7c3c7b2e82f9fca.png

Parabéns, você concluiu um backfill dos dados de comentários do Reddit. Se você quiser saber como criar modelos com base nesses dados, continue para o codelab Spark-NLP.

10. Limpeza

Para evitar cobranças desnecessárias na sua conta do GCP após a conclusão deste guia de início rápido:

  1. Exclua o bucket do Cloud Storage do ambiente que você criou.
  2. Exclua o ambiente do Dataproc.

Se você criou um projeto apenas para este codelab, também é possível excluir o projeto:

  1. No Console do GCP, acesse a página Projetos.
  2. Na lista de projetos, selecione um e clique em Excluir.
  3. Na caixa, digite o ID do projeto e clique em Encerrar para excluí-lo.

Licença

Este trabalho está licenciado sob uma Licença Creative Commons Atribuição 3.0 Genérica e uma licença Apache 2.0.