1. Visão geral
Neste codelab, você vai aprender a criar um pipeline de processamento de dados usando o Apache Spark com o Dataproc no Google Cloud Platform. Ler dados de um local de armazenamento, realizar transformações nele e gravá-los em outro local de armazenamento é um caso de uso comum em ciência de dados e engenharia de dados. As transformações comuns incluem alterar o conteúdo dos dados, remover informações desnecessárias e mudar os tipos de arquivo.
Neste codelab, você vai aprender sobre o Apache Spark, executar um pipeline de exemplo usando o Dataproc com o PySpark (API Python do Apache Spark), o BigQuery, o Google Cloud Storage e dados do Reddit.
2. Introdução ao Apache Spark (opcional)
De acordo com o site, "o Apache Spark é um mecanismo de análise unificado para processamento de dados em grande escala". Ele permite analisar e processar dados em paralelo e na memória, o que possibilita a computação paralela massiva em várias máquinas e nós diferentes. Ele foi lançado originalmente em 2014 como uma atualização do MapReduce tradicional e ainda é um dos frameworks mais conhecidos para a realização de cálculos em grande escala. O Apache Spark é escrito em Scala e, portanto, tem APIs em Scala, Java, Python e R. Ele contém uma infinidade de bibliotecas, como o Spark SQL para executar consultas SQL nos dados, o Spark Streaming para dados de streaming, o MLlib para machine learning e o GraphX para processamento de gráficos, todos executados no mecanismo Apache Spark.
O Spark pode ser executado sozinho ou pode aproveitar um serviço de gerenciamento de recursos, como Yarn, Mesos ou Kubernetes para escalonamento. Você vai usar o Dataproc neste codelab, que utiliza o Yarn.
Os dados no Spark eram originalmente carregados na memória em um RDD, ou conjunto de dados distribuído resiliente. Desde então, o desenvolvimento no Spark incluiu a adição de dois novos tipos de dados de estilo colunar: o Dataset, que é tipado, e o Dataframe, que não é tipado. De modo geral, os RDDs são ótimos para qualquer tipo de dados, enquanto os conjuntos e dataframes são otimizados para dados tabulares. Como os conjuntos de dados só estão disponíveis com as APIs Java e Scala, vamos usar a API PySpark Dataframe para este codelab. Para mais informações, consulte a documentação do Apache Spark.
3. Caso de uso
Os engenheiros de dados geralmente precisam que os dados sejam facilmente acessíveis aos cientistas de dados. No entanto, os dados geralmente são sujos no início (difíceis de usar para análises no estado atual) e precisam ser limpos antes de serem úteis. Um exemplo disso são os dados extraídos da Web que podem conter codificações estranhas ou tags HTML estranhas.
Neste laboratório, você vai carregar um conjunto de dados do BigQuery na forma de postagens do Reddit em um cluster do Spark hospedado no Dataproc, extrair informações úteis e armazenar os dados processados como arquivos CSV compactados no Google Cloud Storage.
O cientista-chefe de dados da sua empresa quer que as equipes trabalhem em diferentes problemas de processamento de linguagem natural. Especificamente, ele quer analisar os dados no subreddit "r/food". Você vai criar um pipeline para um despejo de dados, começando com um preenchimento de lacunas de janeiro de 2017 a agosto de 2019.
4. Como acessar o BigQuery pela API BigQuery Storage
Extrair dados do BigQuery usando o método de API tabledata.list pode ser demorado e não eficiente, já que a quantidade de dados aumenta. Esse método retorna uma lista de objetos JSON e exige a leitura sequencial de uma página por vez para ler um conjunto de dados inteiro.
A API BigQuery Storage melhora significativamente o acesso aos dados no BigQuery usando um protocolo baseado em RPC. Ele oferece suporte a leituras e gravações de dados em paralelo, bem como diferentes formatos de serialização, como Apache Avro e Apache Arrow. Em um nível alto, isso se traduz em uma melhora significativa no desempenho, especialmente em conjuntos de dados maiores.
Neste codelab, você vai usar o spark-bigquery-connector para ler e gravar dados entre o BigQuery e o Spark.
5. Como criar um projeto
Faça login no console do Google Cloud Platform em console.cloud.google.com e crie um novo projeto:
Em seguida, ative o faturamento no console do Cloud para usar os recursos do Google Cloud.
A execução por meio deste codelab não vai custar mais do que alguns dólares, mas pode ser mais se você decidir usar mais recursos ou se deixá-los em execução. A última seção deste codelab vai mostrar como limpar seu projeto.
Novos usuários do Google Cloud Platform estão qualificados para um teste sem custo financeiro de US$300.
6. Como configurar seu ambiente
Agora você vai configurar seu ambiente:
- Como ativar as APIs Compute Engine, Dataproc e BigQuery Storage
- Como configurar as configurações do projeto
- Como criar um cluster do Dataproc
- Criação de um bucket do Google Cloud Storage
Como ativar APIs e configurar seu ambiente
Para abrir o Cloud Shell, pressione o botão no canto superior direito do Cloud Console.
Depois que o Cloud Shell carregar, execute os comandos a seguir para ativar as APIs do Compute Engine, do Dataproc e do BigQuery Storage:
gcloud services enable compute.googleapis.com \
dataproc.googleapis.com \
bigquerystorage.googleapis.com
Defina o ID do projeto. Para encontrá-lo, acesse a página de seleção de projetos e pesquise o seu. Ele pode não ser igual ao nome do seu projeto.
Execute o comando a seguir para definir o ID do projeto:
gcloud config set project <project_id>
Defina a região do seu projeto escolhendo uma na lista aqui. Um exemplo pode ser us-central1
.
gcloud config set dataproc/region <region>
Escolha um nome para o cluster do Dataproc e crie uma variável de ambiente para ele.
CLUSTER_NAME=<cluster_name>
Como criar um cluster do Dataproc
Crie um cluster do Dataproc executando o seguinte comando:
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--worker-machine-type n1-standard-8 \
--num-workers 8 \
--image-version 1.5-debian \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--metadata 'PIP_PACKAGES=google-cloud-storage' \
--optional-components=ANACONDA \
--enable-component-gateway
A execução desse comando leva alguns minutos. Para detalhar o comando:
Isso vai iniciar a criação de um cluster do Dataproc com o nome fornecido anteriormente. O uso da API beta
ativa os recursos Beta do Dataproc, como o Gateway de componentes.
gcloud beta dataproc clusters create ${CLUSTER_NAME}
Isso vai definir o tipo de máquina a ser usado para os workers.
--worker-machine-type n1-standard-8
Isso vai definir o número de workers que o cluster terá.
--num-workers 8
Isso vai definir a versão da imagem do Dataproc.
--image-version 1.5-debian
Isso vai configurar as ações de inicialização que serão usadas no cluster. Aqui, você está incluindo a ação de inicialização do pip.
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh
Estes são os metadados a serem incluídos no cluster. Aqui, você está fornecendo metadados para a ação de inicialização pip
.
--metadata 'PIP_PACKAGES=google-cloud-storage'
Isso vai definir os componentes opcionais a serem instalados no cluster.
--optional-components=ANACONDA
Isso ativa o gateway de componentes, que permite usar o Gateway de componentes do Dataproc para visualizar interfaces comuns, como o Zeppelin, o Jupyter ou o histórico do Spark.
--enable-component-gateway
Para uma introdução mais detalhada ao Dataproc, confira este codelab.
Como criar um bucket do Google Cloud Storage
Você vai precisar de um bucket do Google Cloud Storage para a saída do job. Defina um nome exclusivo para o bucket e execute o comando a seguir para criar um novo bucket. Os nomes de bucket são exclusivos em todos os projetos do Google Cloud para todos os usuários. Portanto, talvez seja necessário tentar isso algumas vezes com nomes diferentes. Um bucket será criado se você não receber um ServiceException
.
BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}
7. Análise exploratória de dados
Antes de realizar o pré-processamento, aprenda mais sobre a natureza dos dados com que você está trabalhando. Para isso, você vai conhecer dois métodos de análise de dados. Primeiro, você vai conferir alguns dados brutos usando a interface da Web do BigQuery e, em seguida, vai calcular o número de postagens por subreddit usando o PySpark e o Dataproc.
Como usar a IU da Web do BigQuery
Comece usando a IU da Web do BigQuery para conferir seus dados. No ícone de menu do Console do Cloud, role para baixo e pressione "BigQuery" para abrir a IU da Web do BigQuery.
Em seguida, execute o comando a seguir no Editor de consultas da IU da Web do BigQuery. Isso vai retornar 10 linhas completas dos dados de janeiro de 2017:
select * from fh-bigquery.reddit_posts.2017_01 limit 10;
Role a página para conferir todas as colunas disponíveis e alguns exemplos. Você vai encontrar duas colunas que representam o conteúdo textual de cada postagem: "title" e "selftext", sendo que a última é o corpo da postagem. Além disso, observe outras colunas, como "created_utc", que é a hora UTC em que uma postagem foi feita, e "subreddit", que é o subreddit em que a postagem existe.
Como executar um job do PySpark
Execute os comandos abaixo no Cloud Shell para clonar o repositório com o código de exemplo e acessar o diretório correto:
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
Você pode usar o PySpark para determinar quantas postagens existem em cada subreddit. Abra o editor do Cloud e leia o script cloud-dataproc/codelabs/spark-bigquery
antes de executá-lo na próxima etapa:
Clique no botão "Abrir terminal" no Editor do Cloud para voltar ao Cloud Shell e execute o comando a seguir para executar seu primeiro job do PySpark:
cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
--driver-log-levels root=FATAL \
counts_by_subreddit.py
Esse comando permite enviar jobs ao Dataproc pela API Jobs. Aqui você indica o tipo de trabalho como pyspark
. É possível fornecer o nome do cluster, parâmetros opcionais e o nome do arquivo que contém o job. Aqui, você está fornecendo o parâmetro --jars
, que permite incluir o spark-bigquery-connector
no job. Também é possível definir os níveis de saída de registro usando --driver-log-levels root=FATAL
, que suprime toda a saída de registro, exceto erros. Os registros do Spark tendem a ser bastante barulhentos.
Isso vai levar alguns minutos para ser executado, e o resultado final vai ficar mais ou menos assim:
8. Como conhecer as interfaces do Dataproc e do Spark
Ao executar jobs do Spark no Dataproc, você tem acesso a duas interfaces para verificar o status dos jobs / clusters. A primeira é a interface do Dataproc, que você encontra clicando no ícone de menu e rolando para baixo até o Dataproc. Aqui, você pode conferir a memória atual disponível, a memória pendente e o número de workers.
Você também pode clicar na guia "Jobs" para conferir os jobs concluídos. Para conferir detalhes do job, como os registros e a saída, clique no ID do job específico.
Também é possível acessar a interface do Spark. Na página da vaga, clique na seta para voltar e em "Interfaces da Web". Várias opções vão aparecer em "Gateway de componentes". Muitas delas podem ser ativadas usando os Componentes opcionais ao configurar o cluster. Para este laboratório, clique em "Servidor de histórico do Spark".
A seguinte janela será aberta:
Todos os jobs concluídos vão aparecer aqui, e você pode clicar em qualquer application_id para saber mais informações sobre o job. Da mesma forma, você pode clicar em "Mostrar aplicativos incompletos" na parte de baixo da página de destino para conferir todos os jobs em execução.
9. Como executar o job de preenchimento
Agora você vai executar um job que carrega dados na memória, extrai as informações necessárias e descarta a saída em um bucket do Google Cloud Storage. Você vai extrair o "title", "body" (texto bruto) e "timestamp created" para cada comentário do Reddit. Em seguida, você vai converter esses dados em um CSV, compactá-los e carregá-los em um bucket com um URI de gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.
Consulte o Cloud Editor novamente para ler o código de cloud-dataproc/codelabs/spark-bigquery/backfill.sh
, que é um script wrapper para executar o código em cloud-dataproc/codelabs/spark-bigquery/backfill.py
.
cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}
Em breve, você vai receber várias mensagens de conclusão do job. O job pode levar até 15 minutos para ser concluído. Você também pode verificar novamente o bucket de armazenamento para verificar se a saída de dados foi bem-sucedida usando o gsutil. Quando todos os jobs forem concluídos, execute o seguinte comando:
gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz
Você verá esta resposta:
Parabéns, você concluiu o preenchimento de dados dos comentários do Reddit. Se você quiser saber como criar modelos com base nesses dados, acesse o codelab Spark-NLP.
10. Limpeza
Para evitar cobranças desnecessárias na sua conta do GCP após a conclusão deste guia de início rápido:
- Exclua o bucket do Cloud Storage do ambiente que você criou.
- Exclua o ambiente do Dataproc.
Se você criou um projeto apenas para este codelab, também pode excluí-lo:
- No Console do GCP, acesse a página Projetos.
- Na lista de projetos, selecione o que você quer excluir e clique em Excluir.
- Na caixa, digite o ID do projeto e clique em Encerrar para excluir o projeto.
Licença
Este trabalho está licenciado sob a Licença Atribuição 3.0 Genérica da Creative Commons e a Licença Apache 2.0.