1. Visão geral
Este codelab mostra como criar um pipeline de processamento de dados usando o Apache Spark com o Dataproc no Google Cloud Platform. Ler dados de um local de armazenamento, realizar transformações nele e gravá-los em outro local de armazenamento é um caso de uso comum em ciência de dados e engenharia de dados. Transformações comuns incluem alterar o conteúdo dos dados, eliminar informações desnecessárias e alterar os tipos de arquivo.
Neste codelab, você vai aprender sobre o Apache Spark, executar um pipeline de amostra usando o Dataproc com o PySpark (API Python do Apache Spark), o BigQuery, o Google Cloud Storage e os dados do Reddit.
2. Introdução ao Apache Spark (opcional)
De acordo com o site, " O Apache Spark é um mecanismo de análise unificado para processamento de dados em grande escala." Ele permite analisar e processar dados em paralelo e na memória, o que permite uma enorme computação paralela em várias máquinas e nós diferentes. Ele foi originalmente lançado em 2014 como um upgrade para o MapReduce tradicional e ainda é um dos frameworks mais conhecidos para realizar cálculos em grande escala. O Apache Spark é escrito em Scala e, consequentemente, tem APIs em Scala, Java, Python e R. Ele contém uma infinidade de bibliotecas, como Spark SQL, para realizar consultas SQL nos dados, Spark Streaming, para dados de streaming, MLlib para machine learning e GraphX para processamento de gráficos. Todas elas são executadas no mecanismo Apache Spark.
O Spark pode ser executado sozinho ou usar um serviço de gerenciamento de recursos, como Yarn, Mesos ou Kubernetes, para escalonamento. Neste codelab, você vai usar o Dataproc, que utiliza o Yarn.
Os dados no Spark foram originalmente carregados na memória em um RDD, ou conjunto de dados distribuído resiliente. Desde então, o desenvolvimento no Spark incluiu a adição de dois novos tipos de dados no estilo colunar: o conjunto de dados, que é digitado, e o Dataframe, que não é digitado. Em termos gerais, RDDs são ótimos para qualquer tipo de dados, enquanto conjuntos de dados e Dataframes são otimizados para dados tabulares. Como os conjuntos de dados só estão disponíveis com as APIs Java e Scala, continuaremos usando a API Dataframe do PySpark para este codelab. Confira mais informações na documentação do Apache Spark.
3. Caso de uso
Os engenheiros de dados geralmente precisam que os dados sejam facilmente acessíveis aos cientistas de dados. No entanto, os dados geralmente são inicialmente sujos (difíceis de usar para análise no estado atual) e precisam ser limpos antes de serem muito úteis. Um exemplo disso são os dados que foram copiados da Web e podem conter codificações estranhas ou tags HTML irrelevantes.
Neste laboratório, você vai carregar um conjunto de dados do BigQuery na forma de postagens do Reddit em um cluster do Spark hospedado no Dataproc, extrair informações úteis e armazenar os dados processados como arquivos CSV compactados no Google Cloud Storage.
O diretor de ciência de dados da sua empresa quer que as equipes dele trabalhem em diferentes problemas de processamento de linguagem natural. Especificamente, eles estão interessados em analisar os dados no subreddit "r/food". Você criará um pipeline para um despejo de dados começando com um preenchimento de janeiro de 2017 a agosto de 2019.
4. Como acessar o BigQuery pela API BigQuery Storage
A extração de dados do BigQuery usando o método da API tabledata.list pode ser demorada e ineficiente à medida que a quantidade de dados aumenta. Esse método retorna uma lista de objetos JSON e requer a leitura sequencial de uma página por vez para ler um conjunto de dados inteiro.
A API BigQuery Storage traz melhorias significativas para o acesso a dados no BigQuery usando um protocolo baseado em RPC. Ele é compatível com leituras e gravações de dados em paralelo e diferentes formatos de serialização, como Apache Avro e Apache Arrow. Isso se traduz em uma melhora significativa no desempenho, especialmente em conjuntos de dados maiores.
Neste codelab, você usará o spark-bigquery-connector para ler e gravar dados entre o BigQuery e o Spark.
5. Como criar um projeto
Faça login no console do Google Cloud Platform em console.cloud.google.com e crie um novo projeto:
Em seguida, você precisará ativar o faturamento no console do Cloud para usar os recursos do Google Cloud.
A execução deste codelab não deve custar mais do que alguns dólares, mas pode ser mais se você decidir usar mais recursos ou deixá-los em execução. A última seção deste codelab vai ajudar você a limpar seu projeto.
Novos usuários do Google Cloud Platform estão qualificados para um teste sem custo financeiro de US$300.
6. Como configurar seu ambiente
Agora você concluirá a configuração do ambiente da seguinte forma:
- Como ativar as APIs Compute Engine, Dataproc e BigQuery Storage
- Como definir as configurações do projeto
- Como criar um cluster do Dataproc
- Como criar um bucket do Google Cloud Storage
Como ativar APIs e configurar o ambiente
Pressione o botão no canto superior direito do console do Cloud para abrir o Cloud Shell.
Depois que o Cloud Shell for carregado, execute os seguintes comandos para ativar as APIs Compute Engine, Dataproc e BigQuery Storage:
gcloud services enable compute.googleapis.com \
dataproc.googleapis.com \
bigquerystorage.googleapis.com
Defina o ID do projeto. Para encontrá-lo, acesse a página de seleção do projeto e procure seu projeto. Ele pode não ser igual ao nome do seu projeto.
Execute o seguinte comando para definir o project id:
gcloud config set project <project_id>
Defina a região do seu projeto escolhendo uma desta lista. Por exemplo, us-central1
.
gcloud config set dataproc/region <region>
Escolha um nome para o cluster do Dataproc e crie uma variável de ambiente para ele.
CLUSTER_NAME=<cluster_name>
Como criar um cluster do Dataproc
Crie um cluster do Dataproc executando o seguinte comando:
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--worker-machine-type n1-standard-8 \
--num-workers 8 \
--image-version 1.5-debian \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--metadata 'PIP_PACKAGES=google-cloud-storage' \
--optional-components=ANACONDA \
--enable-component-gateway
Esse comando leva alguns minutos para ser concluído. Para detalhar o comando:
Isso iniciará a criação de um cluster do Dataproc com o nome que você forneceu anteriormente. O uso da API beta
ativará os recursos Beta do Dataproc, como o Gateway de componentes.
gcloud beta dataproc clusters create ${CLUSTER_NAME}
Isso definirá o tipo de máquina a ser usado pelos workers.
--worker-machine-type n1-standard-8
Isso vai definir o número de workers que o cluster terá.
--num-workers 8
Isso definirá a versão de imagem do Dataproc.
--image-version 1.5-debian
Isso vai configurar as ações de inicialização que serão usadas no cluster. Aqui, você está incluindo a ação de inicialização pip.
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh
Esses são os metadados a serem incluídos no cluster. Aqui, você está fornecendo metadados para a ação de inicialização pip
.
--metadata 'PIP_PACKAGES=google-cloud-storage'
Isso definirá os componentes opcionais que serão instalados no cluster.
--optional-components=ANACONDA
Isso ativa o gateway de componentes, que permite usar o Gateway de componentes do Dataproc para visualizar interfaces comuns, como Zeppelin, Jupyter ou o histórico do Spark.
--enable-component-gateway
Para uma introdução mais detalhada ao Dataproc, confira este codelab.
Como criar um bucket do Google Cloud Storage
Você precisará de um bucket do Google Cloud Storage para a saída do job. Determine um nome exclusivo para o bucket e execute o comando a seguir para criar um novo bucket. Como os nomes dos buckets são exclusivos para todos os usuários em todos os projetos do Google Cloud, talvez seja necessário tentar várias vezes com nomes diferentes. Um bucket será criado com sucesso se você não receber um ServiceException
.
BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}
7. Análise exploratória de dados
Antes de realizar o pré-processamento, é importante saber mais sobre a natureza dos dados com os quais você está lidando. Para isso, você vai conhecer dois métodos de análise detalhada dos dados. Primeiro, você verá alguns dados brutos usando a interface da Web do BigQuery e, em seguida, calculará o número de postagens por subreddit usando o PySpark e o Dataproc.
Como usar a IU da Web do BigQuery
Comece usando a IU da Web do BigQuery para visualizar seus dados. No ícone de menu do console do Cloud, role para baixo e pressione "BigQuery". para abrir a IU da Web do BigQuery.
Em seguida, execute o comando a seguir no Editor de consultas da interface da Web do BigQuery. Isso retornará 10 linhas completas dos dados de janeiro de 2017:
select * from fh-bigquery.reddit_posts.2017_01 limit 10;
Você pode rolar a página para ver todas as colunas disponíveis, bem como alguns exemplos. Você verá duas colunas que representam o conteúdo textual de cada postagem: "título" e "selftext", sendo o último o corpo da postagem. Observe também outras colunas, como "created_utc" que é o horário universal em que uma postagem foi feita e "subreddit" que é o subreddit em que a postagem existe.
Como executar um job do PySpark
Execute os comandos a seguir no Cloud Shell para clonar o repositório com o exemplo de código e "cd" no diretório correto:
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
É possível usar o PySpark para determinar o número de postagens existentes em cada subreddit. Você pode abrir o Cloud Editor e ler o script cloud-dataproc/codelabs/spark-bigquery
antes de executá-lo na próxima etapa:
Clique em "Abrir terminal". no Cloud Editor para voltar ao Cloud Shell e usar o comando a seguir para executar seu primeiro job do PySpark:
cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
--driver-log-levels root=FATAL \
counts_by_subreddit.py
Esse comando permite enviar jobs para o Dataproc pela API Jobs. Aqui você indica o tipo de job como pyspark
. É possível fornecer o nome do cluster, parâmetros opcionais e o nome do arquivo que contém o job. Aqui, você está fornecendo o parâmetro --jars
, que permite incluir o spark-bigquery-connector
com seu job. Também é possível definir os níveis de saída do registro usando --driver-log-levels root=FATAL
, que suprimirá toda a saída do registro, exceto erros. Os registros do Spark tendem a ter muito ruído.
A execução leva alguns minutos. A saída final vai ficar assim:
8. Conheça as IUs do Dataproc e do Spark
Ao executar jobs do Spark no Dataproc, você tem acesso a duas IUs para verificar o status dos jobs / clusters. A primeira é a interface do Dataproc, que pode ser acessada clicando no ícone do menu e rolando para baixo até o Dataproc. Aqui é possível ver a memória atual disponível, a memória pendente e o número de workers.
Também é possível clicar na guia "Jobs" para ver os jobs concluídos. Para ver detalhes do job, como os registros e a saída, clique no ID de um job específico.
Também é possível visualizar a interface do Spark. Na página do job, clique na seta para voltar e depois em "Interfaces da Web". Várias opções vão aparecer abaixo do gateway do componente. Muitos deles podem ser ativados por meio de componentes opcionais durante a configuração do cluster. Para este laboratório, clique em "Servidor de histórico do Spark.
A seguinte janela será aberta:
Todos os trabalhos concluídos serão mostrados aqui, e você poderá clicar em qualquer application_id para saber mais informações sobre o trabalho. Da mesma forma, você pode clicar em "Mostrar aplicativos incompletos" na parte de baixo da página de destino para ver todos os jobs em execução.
9. Como executar o job de preenchimento
Agora você executará um job que carrega dados na memória, extrai as informações necessárias e despeja a saída em um bucket do Google Cloud Storage. Você vai extrair as seções "title", "body" (texto bruto) e "carimbo de data/hora criado" para cada comentário do Reddit. Em seguida, você vai converter esses dados em um csv, compactá-los e carregá-los em um bucket com um URI de gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.
Você pode consultar o Cloud Editor novamente para ler o código de cloud-dataproc/codelabs/spark-bigquery/backfill.sh
, que é um script de wrapper para executar o código em cloud-dataproc/codelabs/spark-bigquery/backfill.py
.
cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}
Em breve, você verá várias mensagens de conclusão de job. O job pode levar até 15 minutos para ser concluído. Também é possível verificar novamente seu bucket de armazenamento para conferir a saída de dados bem-sucedida usando gsutil. Quando todos os jobs forem concluídos, execute o seguinte comando:
gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz
Você verá esta resposta:
Parabéns, você concluiu o preenchimento dos seus dados de comentários do Reddit. Se você tiver interesse em como criar modelos com base nesses dados, prossiga para o codelab Spark-NLP.
10. Limpeza
Para evitar cobranças desnecessárias na sua conta do GCP após a conclusão deste guia de início rápido:
- Exclua o bucket do Cloud Storage que você criou no ambiente
- Exclua o ambiente do Dataproc.
Se você criou um projeto apenas para este codelab, também é possível excluí-lo:
- No Console do GCP, acesse a página Projetos.
- Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir.
- Na caixa, digite o ID do projeto e clique em Encerrar para excluí-lo.
Licença
Este trabalho está sob a licença Atribuição 3.0 Genérica da Creative Commons e a licença Apache 2.0.