Notebooks do Apache Spark e Jupyter no Cloud Dataproc

1. Visão geral

Neste laboratório, você vai aprender a configurar e usar o Apache Spark e os notebooks Jupyter no Cloud Dataproc.

Os notebooks Jupyter são amplamente usados para análise de dados exploratória e criação de modelos de aprendizado de máquina, já que permitem executar o código de maneira interativa e ver os resultados imediatamente.

No entanto, configurar e usar o Apache Spark e os notebooks Jupyter pode ser complicado.

b9ed855863c57d6.png

O Cloud Dataproc torna isso rápido e fácil, permitindo que você crie um cluster do Dataproc com o Apache Spark, o componente Jupyter e o Gateway de componentes em cerca de 90 segundos.

O que você vai aprender

Neste codelab, você aprende a:

  • Crie um bucket do Cloud Storage para seu cluster
  • Crie um cluster do Dataproc com o Jupyter e o Gateway de componentes.
  • Acessar a interface da Web do JupyterLab no Dataproc
  • Criar um notebook usando o conector de armazenamento do BigQuery do Spark
  • Executar um job do Spark e criar um gráfico com os resultados.

O custo total da execução deste laboratório no Google Cloud é de aproximadamente US $1. Confira todos os detalhes sobre os preços do Cloud Dataproc aqui.

2. Como criar um projeto

Faça login no console do Google Cloud Platform em console.cloud.google.com e crie um projeto:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Em seguida, será necessário ativar o faturamento no console do Cloud para usar os recursos do Google Cloud.

A execução deste codelab não deve custar mais do que alguns dólares, mas pode ser mais se você decidir usar mais recursos ou deixá-los em execução. A última seção deste codelab vai mostrar como limpar seu projeto.

Novos usuários do Google Cloud Platform estão qualificados para uma avaliação sem custo financeiro de US$300.

3. Como configurar o ambiente

Primeiro, abra o Cloud Shell clicando no botão no canto superior direito do console do Cloud:

a10c47ee6ca41c54.png

Depois que o Cloud Shell for carregado, execute o comando a seguir para definir o ID do projeto da etapa anterior**:**

gcloud config set project <project_id>

O ID do projeto também pode ser encontrado clicando no projeto no canto superior esquerdo do console do Cloud:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

Em seguida, ative as APIs Dataproc, Compute Engine e BigQuery Storage.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

Como alternativa, isso pode ser feito no Console do Cloud. Clique no ícone de menu no canto superior esquerdo da tela.

2bfc27ef9ba2ec7d.png

Selecione "API Manager" no menu suspenso.

408af5f32c4b7c25.png

Clique em Ativar APIs e serviços.

a9c0e84296a7ba5b.png

Procure e ative as seguintes APIs:

  • API Compute Engine
  • API Dataproc
  • API BigQuery
  • API BigQuery Storage

4. Criar um bucket do GCS

Crie um bucket do Cloud Storage na região mais próxima aos seus dados e dê a ele um nome exclusivo.

Ele será usado para o cluster do Dataproc.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

Você verá esta resposta

Creating gs://<your-bucket-name>/...

5. Criar um cluster do Dataproc com o Jupyter e o Gateway de componentes

Como criar o cluster

Definir as variáveis de ambiente do cluster

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

Em seguida, execute este comando gcloud para criar o cluster com todos os componentes necessários para trabalhar com o Jupyter nele.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

Você vai ver a seguinte saída enquanto o cluster é criado

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

A criação do cluster leva cerca de 90 segundos. Depois que ele estiver pronto, você poderá acessar o cluster na interface do console do Cloud Dataproc.

Enquanto espera, continue lendo abaixo para saber mais sobre as flags usadas no comando gcloud.

Você vai receber a seguinte saída quando o cluster for criado:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

Flags usadas no comando gcloud dataproc create

Confira uma análise das flags usadas no comando gcloud dataproc create

--region=${REGION}

Especifica a região e a zona em que o cluster será criado. Confira a lista de regiões disponíveis aqui.

--image-version=1.4

A versão da imagem a ser usada no cluster. Confira a lista de versões disponíveis aqui.

--bucket=${BUCKET_NAME}

Especifique o bucket do Cloud Storage criado anteriormente para usar no cluster. Se você não fornecer um bucket do GCS, ele será criado para você.

É aqui também que seus notebooks serão salvos, mesmo que você exclua o cluster, já que o bucket do GCS não é excluído.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

Os tipos de máquinas a serem usados no cluster do Dataproc. Confira uma lista dos tipos de máquina disponíveis aqui.

Por padrão, um nó mestre e dois nós de trabalho são criados se você não definir a flag "–num-workers".

--optional-components=ANACONDA,JUPYTER

Ao definir esses valores para componentes opcionais, todas as bibliotecas necessárias para o Jupyter e o Anaconda (obrigatório para notebooks Jupyter) serão instaladas no cluster.

--enable-component-gateway

Ao ativar o Component Gateway, um link do App Engine é criado usando o Apache Knox e o Inverting Proxy, o que facilita o acesso seguro e autenticado às interfaces da Web do Jupyter e do JupyterLab. Isso significa que não é mais necessário criar túneis SSH.

Ele também cria links para outras ferramentas no cluster, incluindo o gerenciador de recursos do Yarn e o servidor de histórico do Spark, que são úteis para conferir o desempenho dos seus jobs e padrões de uso do cluster.

6. Criar um notebook do Apache Spark

Como acessar a interface da Web do JupyterLab

Quando o cluster estiver pronto, acesse Clusters do Dataproc - Console do Cloud, clique no cluster criado e acesse a guia "Interfaces da Web" para encontrar o link do Gateway de componentes para a interface da Web do JupyterLab.

afc40202d555de47.png

Você vai notar que tem acesso ao Jupyter, que é a interface clássica de notebook, ou ao JupyterLab, descrito como a interface de próxima geração para o Projeto Jupyter.

O JupyterLab tem muitos recursos novos e excelentes de interface do usuário. Por isso, se você não tem experiência com notebooks ou quer conhecer as melhorias mais recentes, recomendamos usar o JupyterLab, já que ele vai substituir a interface clássica do Jupyter, de acordo com a documentação oficial.

Criar um notebook com um kernel Python 3

a463623f2ebf0518.png

Na guia "Launcher", clique no ícone do notebook Python 3 para criar um notebook com um kernel Python 3 (não o kernel PySpark). Isso permite configurar o SparkSession no notebook e incluir o spark-bigquery-connector necessário para usar a API BigQuery Storage.

Renomear o notebook

196a3276ed07e1f3.png

Clique com o botão direito do mouse no nome do notebook na barra lateral à esquerda ou na navegação superior e renomeie o notebook como "BigQuery Storage & Spark DataFrames.ipynb".

Executar o código do Spark no notebook

fbac38062e5bb9cf.png

Neste notebook, você vai usar o spark-bigquery-connector, uma ferramenta para ler e gravar dados entre o BigQuery e o Spark usando a API BigQuery Storage.

A API BigQuery Storage traz melhorias significativas para o acesso a dados no BigQuery usando um protocolo baseado em RPC. Ele oferece suporte a leituras e gravações de dados em paralelo, além de diferentes formatos de serialização, como Apache Avro e Apache Arrow. Em um nível alto, isso se traduz em um desempenho significativamente melhor, especialmente em conjuntos de dados maiores.

Na primeira célula, verifique a versão do Scala do seu cluster para incluir a versão correta do jar spark-bigquery-connector.

Entrada [1]:

!scala -version

Saída [1]:f580e442576b8b1f.png Crie uma sessão do Spark e inclua o pacote spark-bigquery-connector.

Se a versão do Scala for 2.11, use o seguinte pacote.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

Se a versão do Scala for 2.12, use o seguinte pacote.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

Entrada [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

Ativar repl.eagerEval

Isso vai gerar os resultados dos DataFrames em cada etapa sem a nova necessidade de mostrar df.show() e também melhora a formatação da saída.

Entrada [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

Ler a tabela do BigQuery em um DataFrame do Spark

Crie um DataFrame do Spark lendo dados de um conjunto de dados público do BigQuery. Isso usa o spark-bigquery-connector e a API BigQuery Storage para carregar os dados no cluster do Spark.

Crie um DataFrame do Spark e carregue dados do conjunto de dados público do BigQuery para visualizações de página da Wikipédia. Você vai notar que não está executando uma consulta nos dados, já que está usando o spark-bigquery-connector para carregar os dados no Spark, onde o processamento deles vai ocorrer. Quando esse código é executado, ele não carrega a tabela, porque é uma avaliação lenta no Spark, e a execução ocorre na próxima etapa.

Entrada [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

Saída [4]:

c107a33f6fc30ca.png

Selecione as colunas necessárias e aplique um filtro usando where(), que é um alias de filter().

Quando esse código é executado, ele aciona uma ação do Spark, e os dados são lidos do BigQuery Storage nesse momento.

Entrada [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

Saída [5]:

ad363cbe510d625a.png

Agrupe por título e ordene por visualizações de página para ver as páginas principais

Entrada [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

Saída [6]:f718abd05afc0f4.png

7. Usar bibliotecas de geração de gráficos do Python no notebook

Você pode usar as várias bibliotecas de geração de gráficos disponíveis em Python para representar a saída dos seus jobs do Spark.

Converter DataFrame do Spark em DataFrame do Pandas

Converta o DataFrame do Spark em um DataFrame do Pandas e defina datehour como o índice. Isso é útil se você quiser trabalhar com os dados diretamente em Python e criar gráficos usando as várias bibliotecas de criação de gráficos disponíveis em Python.

Entrada [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

Saída [7]:

3df2aaa2351f028d.png

Como criar gráficos de DataFrames do Pandas

Importe a biblioteca matplotlib, que é necessária para mostrar os gráficos no notebook.

Entrada [8]:

import matplotlib.pyplot as plt

Use a função de plotagem do Pandas para criar um gráfico de linhas com o DataFrame do Pandas.

Entrada [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

Saída [9]:bade7042c3033594.png

Verifique se o notebook foi salvo no GCS

Agora você tem seu primeiro notebook Jupyter em execução no cluster do Dataproc. Dê um nome ao notebook, que será salvo automaticamente no bucket do GCS usado ao criar o cluster.

Para verificar, use este comando gsutil no Cloud Shell:

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

Você verá esta resposta

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. Dica de otimização: armazenar dados em cache na memória

Em alguns casos, talvez você queira os dados na memória em vez de ler do armazenamento do BigQuery sempre.

Esse job vai ler os dados do BigQuery e enviar o filtro para o BigQuery. A agregação será calculada no Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Você pode modificar o job acima para incluir um cache da tabela. Agora, o filtro na coluna "wiki" será aplicado na memória pelo Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Em seguida, é possível filtrar outro idioma da wiki usando os dados armazenados em cache em vez de ler os dados do armazenamento do BigQuery novamente, o que torna o processo muito mais rápido.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

Para remover o cache, execute

df_wiki_all.unpersist()

9. Notebooks de exemplo para mais casos de uso

O repositório do Cloud Dataproc no GitHub (em inglês) tem notebooks Jupyter com padrões comuns do Apache Spark para carregar, salvar e representar dados com vários produtos do Google Cloud Platform e ferramentas de código aberto:

10. Limpar

Para evitar cobranças desnecessárias na sua conta do GCP após a conclusão deste guia de início rápido:

  1. Exclua o bucket do Cloud Storage do ambiente que você criou.
  2. Exclua o ambiente do Dataproc.

Se você criou um projeto apenas para este codelab, também é possível excluir o projeto:

  1. No Console do GCP, acesse a página Projetos.
  2. Na lista de projetos, selecione um e clique em Excluir.
  3. Na caixa, digite o ID do projeto e clique em Encerrar para excluí-lo.

Licença

Este trabalho está licenciado sob uma Licença Creative Commons Atribuição 3.0 Genérica e uma licença Apache 2.0.