Como fazer o pré-processamento de dados do BigQuery com o PySpark no Dataproc

1. Visão geral

Neste codelab, você vai aprender a criar um pipeline de processamento de dados usando o Apache Spark com o Dataproc no Google Cloud Platform. Ler dados de um local de armazenamento, realizar transformações nele e gravá-los em outro local de armazenamento é um caso de uso comum em ciência de dados e engenharia de dados. As transformações comuns incluem alterar o conteúdo dos dados, remover informações desnecessárias e mudar os tipos de arquivo.

Neste codelab, você vai aprender sobre o Apache Spark, executar um pipeline de exemplo usando o Dataproc com o PySpark (API Python do Apache Spark), o BigQuery, o Google Cloud Storage e dados do Reddit.

2. Introdução ao Apache Spark (opcional)

De acordo com o site, "o Apache Spark é um mecanismo de análise unificado para processamento de dados em grande escala". Ele permite analisar e processar dados em paralelo e na memória, o que possibilita a computação paralela massiva em várias máquinas e nós diferentes. Ele foi lançado originalmente em 2014 como uma atualização do MapReduce tradicional e ainda é um dos frameworks mais conhecidos para a realização de cálculos em grande escala. O Apache Spark é escrito em Scala e, portanto, tem APIs em Scala, Java, Python e R. Ele contém uma infinidade de bibliotecas, como o Spark SQL para executar consultas SQL nos dados, o Spark Streaming para dados de streaming, o MLlib para machine learning e o GraphX para processamento de gráficos, todos executados no mecanismo Apache Spark.

32add0b6a47bafbc.png

O Spark pode ser executado sozinho ou pode aproveitar um serviço de gerenciamento de recursos, como Yarn, Mesos ou Kubernetes para escalonamento. Você vai usar o Dataproc neste codelab, que utiliza o Yarn.

Os dados no Spark eram originalmente carregados na memória em um RDD, ou conjunto de dados distribuído resiliente. Desde então, o desenvolvimento no Spark incluiu a adição de dois novos tipos de dados de estilo colunar: o Dataset, que é tipado, e o Dataframe, que não é tipado. De modo geral, os RDDs são ótimos para qualquer tipo de dados, enquanto os conjuntos e dataframes são otimizados para dados tabulares. Como os conjuntos de dados só estão disponíveis com as APIs Java e Scala, vamos usar a API PySpark Dataframe para este codelab. Para mais informações, consulte a documentação do Apache Spark.

3. Caso de uso

Os engenheiros de dados geralmente precisam que os dados sejam facilmente acessíveis aos cientistas de dados. No entanto, os dados geralmente são sujos no início (difíceis de usar para análises no estado atual) e precisam ser limpos antes de serem úteis. Um exemplo disso são os dados extraídos da Web que podem conter codificações estranhas ou tags HTML estranhas.

Neste laboratório, você vai carregar um conjunto de dados do BigQuery na forma de postagens do Reddit em um cluster do Spark hospedado no Dataproc, extrair informações úteis e armazenar os dados processados como arquivos CSV compactados no Google Cloud Storage.

be2a4551ece63bfc.png

O cientista-chefe de dados da sua empresa quer que as equipes trabalhem em diferentes problemas de processamento de linguagem natural. Especificamente, ele quer analisar os dados no subreddit "r/food". Você vai criar um pipeline para um despejo de dados, começando com um preenchimento de lacunas de janeiro de 2017 a agosto de 2019.

4. Como acessar o BigQuery pela API BigQuery Storage

Extrair dados do BigQuery usando o método de API tabledata.list pode ser demorado e não eficiente, já que a quantidade de dados aumenta. Esse método retorna uma lista de objetos JSON e exige a leitura sequencial de uma página por vez para ler um conjunto de dados inteiro.

A API BigQuery Storage melhora significativamente o acesso aos dados no BigQuery usando um protocolo baseado em RPC. Ele oferece suporte a leituras e gravações de dados em paralelo, bem como diferentes formatos de serialização, como Apache Avro e Apache Arrow. Em um nível alto, isso se traduz em uma melhora significativa no desempenho, especialmente em conjuntos de dados maiores.

Neste codelab, você vai usar o spark-bigquery-connector para ler e gravar dados entre o BigQuery e o Spark.

5. Como criar um projeto

Faça login no console do Google Cloud Platform em console.cloud.google.com e crie um novo projeto:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Em seguida, ative o faturamento no console do Cloud para usar os recursos do Google Cloud.

A execução por meio deste codelab não vai custar mais do que alguns dólares, mas pode ser mais se você decidir usar mais recursos ou se deixá-los em execução. A última seção deste codelab vai mostrar como limpar seu projeto.

Novos usuários do Google Cloud Platform estão qualificados para um teste sem custo financeiro de US$300.

6. Como configurar seu ambiente

Agora você vai configurar seu ambiente:

  • Como ativar as APIs Compute Engine, Dataproc e BigQuery Storage
  • Como configurar as configurações do projeto
  • Como criar um cluster do Dataproc
  • Criação de um bucket do Google Cloud Storage

Como ativar APIs e configurar seu ambiente

Para abrir o Cloud Shell, pressione o botão no canto superior direito do Cloud Console.

a10c47ee6ca41c54.png

Depois que o Cloud Shell carregar, execute os comandos a seguir para ativar as APIs do Compute Engine, do Dataproc e do BigQuery Storage:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Defina o ID do projeto. Para encontrá-lo, acesse a página de seleção de projetos e pesquise o seu. Ele pode não ser igual ao nome do seu projeto.

e682e8227aa3c781.png

76d45fb295728542.png

Execute o comando a seguir para definir o ID do projeto:

gcloud config set project <project_id>

Defina a região do seu projeto escolhendo uma na lista aqui. Um exemplo pode ser us-central1.

gcloud config set dataproc/region <region>

Escolha um nome para o cluster do Dataproc e crie uma variável de ambiente para ele.

CLUSTER_NAME=<cluster_name>

Como criar um cluster do Dataproc

Crie um cluster do Dataproc executando o seguinte comando:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

A execução desse comando leva alguns minutos. Para detalhar o comando:

Isso vai iniciar a criação de um cluster do Dataproc com o nome fornecido anteriormente. O uso da API beta ativa os recursos Beta do Dataproc, como o Gateway de componentes.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Isso vai definir o tipo de máquina a ser usado para os workers.

--worker-machine-type n1-standard-8

Isso vai definir o número de workers que o cluster terá.

--num-workers 8

Isso vai definir a versão da imagem do Dataproc.

--image-version 1.5-debian

Isso vai configurar as ações de inicialização que serão usadas no cluster. Aqui, você está incluindo a ação de inicialização do pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Estes são os metadados a serem incluídos no cluster. Aqui, você está fornecendo metadados para a ação de inicialização pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Isso vai definir os componentes opcionais a serem instalados no cluster.

--optional-components=ANACONDA

Isso ativa o gateway de componentes, que permite usar o Gateway de componentes do Dataproc para visualizar interfaces comuns, como o Zeppelin, o Jupyter ou o histórico do Spark.

--enable-component-gateway

Para uma introdução mais detalhada ao Dataproc, confira este codelab.

Como criar um bucket do Google Cloud Storage

Você vai precisar de um bucket do Google Cloud Storage para a saída do job. Defina um nome exclusivo para o bucket e execute o comando a seguir para criar um novo bucket. Os nomes de bucket são exclusivos em todos os projetos do Google Cloud para todos os usuários. Portanto, talvez seja necessário tentar isso algumas vezes com nomes diferentes. Um bucket será criado se você não receber um ServiceException.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Análise exploratória de dados

Antes de realizar o pré-processamento, aprenda mais sobre a natureza dos dados com que você está trabalhando. Para isso, você vai conhecer dois métodos de análise de dados. Primeiro, você vai conferir alguns dados brutos usando a interface da Web do BigQuery e, em seguida, vai calcular o número de postagens por subreddit usando o PySpark e o Dataproc.

Como usar a IU da Web do BigQuery

Comece usando a IU da Web do BigQuery para conferir seus dados. No ícone de menu do Console do Cloud, role para baixo e pressione "BigQuery" para abrir a IU da Web do BigQuery.

242a597d7045b4da.png

Em seguida, execute o comando a seguir no Editor de consultas da IU da Web do BigQuery. Isso vai retornar 10 linhas completas dos dados de janeiro de 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Role a página para conferir todas as colunas disponíveis e alguns exemplos. Você vai encontrar duas colunas que representam o conteúdo textual de cada postagem: "title" e "selftext", sendo que a última é o corpo da postagem. Além disso, observe outras colunas, como "created_utc", que é a hora UTC em que uma postagem foi feita, e "subreddit", que é o subreddit em que a postagem existe.

Como executar um job do PySpark

Execute os comandos abaixo no Cloud Shell para clonar o repositório com o código de exemplo e acessar o diretório correto:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Você pode usar o PySpark para determinar quantas postagens existem em cada subreddit. Abra o editor do Cloud e leia o script cloud-dataproc/codelabs/spark-bigquery antes de executá-lo na próxima etapa:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Clique no botão "Abrir terminal" no Editor do Cloud para voltar ao Cloud Shell e execute o comando a seguir para executar seu primeiro job do PySpark:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Esse comando permite enviar jobs ao Dataproc pela API Jobs. Aqui você indica o tipo de trabalho como pyspark. É possível fornecer o nome do cluster, parâmetros opcionais e o nome do arquivo que contém o job. Aqui, você está fornecendo o parâmetro --jars, que permite incluir o spark-bigquery-connector no job. Também é possível definir os níveis de saída de registro usando --driver-log-levels root=FATAL, que suprime toda a saída de registro, exceto erros. Os registros do Spark tendem a ser bastante barulhentos.

Isso vai levar alguns minutos para ser executado, e o resultado final vai ficar mais ou menos assim:

6c185228db47bb18.png

8. Como conhecer as interfaces do Dataproc e do Spark

Ao executar jobs do Spark no Dataproc, você tem acesso a duas interfaces para verificar o status dos jobs / clusters. A primeira é a interface do Dataproc, que você encontra clicando no ícone de menu e rolando para baixo até o Dataproc. Aqui, você pode conferir a memória atual disponível, a memória pendente e o número de workers.

6f2987346d15c8e2.png

Você também pode clicar na guia "Jobs" para conferir os jobs concluídos. Para conferir detalhes do job, como os registros e a saída, clique no ID do job específico. 114d90129b0e4c88.png

1b2160f0f484594a.png

Também é possível acessar a interface do Spark. Na página da vaga, clique na seta para voltar e em "Interfaces da Web". Várias opções vão aparecer em "Gateway de componentes". Muitas delas podem ser ativadas usando os Componentes opcionais ao configurar o cluster. Para este laboratório, clique em "Servidor de histórico do Spark".

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

A seguinte janela será aberta:

8f6786760f994fe8.png

Todos os jobs concluídos vão aparecer aqui, e você pode clicar em qualquer application_id para saber mais informações sobre o job. Da mesma forma, você pode clicar em "Mostrar aplicativos incompletos" na parte de baixo da página de destino para conferir todos os jobs em execução.

9. Como executar o job de preenchimento

Agora você vai executar um job que carrega dados na memória, extrai as informações necessárias e descarta a saída em um bucket do Google Cloud Storage. Você vai extrair o "title", "body" (texto bruto) e "timestamp created" para cada comentário do Reddit. Em seguida, você vai converter esses dados em um CSV, compactá-los e carregá-los em um bucket com um URI de gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

Consulte o Cloud Editor novamente para ler o código de cloud-dataproc/codelabs/spark-bigquery/backfill.sh, que é um script wrapper para executar o código em cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Em breve, você vai receber várias mensagens de conclusão do job. O job pode levar até 15 minutos para ser concluído. Você também pode verificar novamente o bucket de armazenamento para verificar se a saída de dados foi bem-sucedida usando o gsutil. Quando todos os jobs forem concluídos, execute o seguinte comando:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Você verá esta resposta:

a7c3c7b2e82f9fca.png

Parabéns, você concluiu o preenchimento de dados dos comentários do Reddit. Se você quiser saber como criar modelos com base nesses dados, acesse o codelab Spark-NLP.

10. Limpeza

Para evitar cobranças desnecessárias na sua conta do GCP após a conclusão deste guia de início rápido:

  1. Exclua o bucket do Cloud Storage do ambiente que você criou.
  2. Exclua o ambiente do Dataproc.

Se você criou um projeto apenas para este codelab, também pode excluí-lo:

  1. No Console do GCP, acesse a página Projetos.
  2. Na lista de projetos, selecione o que você quer excluir e clique em Excluir.
  3. Na caixa, digite o ID do projeto e clique em Encerrar para excluir o projeto.

Licença

Este trabalho está licenciado sob a Licença Atribuição 3.0 Genérica da Creative Commons e a Licença Apache 2.0.