Notebooks do Apache Spark e Jupyter no Cloud Dataproc

1. Visão geral

Neste laboratório, você vai aprender a configurar e usar o Apache Spark e os notebooks do Jupyter no Cloud Dataproc.

Os notebooks Jupyter são amplamente usados para análise exploratória de dados e criação de modelos de machine learning, porque permitem que você execute seu código de maneira interativa e veja imediatamente os resultados.

No entanto, configurar e usar o Apache Spark e os Notebooks do Jupyter pode ser complicado.

b9ed855863c57d6.png

O Cloud Dataproc facilita e agiliza a criação de um cluster do Dataproc com o Apache Spark, o componente Jupyter e o gateway de componentes em cerca de 90 segundos.

O que você vai aprender

Neste codelab, você aprende a:

  • Criar um bucket do Google Cloud Storage para o cluster
  • Criar um cluster do Dataproc com o Jupyter e o Gateway de Componentes
  • Acessar a IU da Web do JupyterLab no Dataproc
  • Criar um notebook usando o conector de armazenamento do BigQuery do Spark
  • executar um job do Spark e plotar os resultados.

O custo total da execução deste laboratório no Google Cloud é de aproximadamente US $1. Você pode encontrar todos os detalhes sobre os preços do Cloud Dataproc aqui.

2. Como criar um projeto

Faça login no console do Google Cloud Platform em console.cloud.google.com e crie um novo projeto:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Em seguida, você precisará ativar o faturamento no console do Cloud para usar os recursos do Google Cloud.

A execução deste codelab não deve custar mais do que alguns dólares, mas pode ser mais se você decidir usar mais recursos ou deixá-los em execução. A última seção deste codelab vai ajudar você a limpar seu projeto.

Novos usuários do Google Cloud Platform estão qualificados para um teste sem custo financeiro de US$300.

3. Como configurar o ambiente

Primeiro, clique no botão no canto superior direito do console do Cloud para abrir o Cloud Shell:

a10c47ee6ca41c54.png

Depois que o Cloud Shell for carregado, execute o seguinte comando para definir o ID do projeto da etapa anterior**:**

gcloud config set project <project_id>

Para encontrar o ID do projeto, clique no seu projeto no canto superior esquerdo do console do Cloud:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

Em seguida, ative as APIs Dataproc, Compute Engine e BigQuery Storage.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

Outra opção é fazer isso no Console do Cloud. Clique no ícone de menu no canto superior esquerdo da tela.

2bfc27ef9ba2ec7d.png

Selecione "API Manager" no menu suspenso.

408af5f32c4b7c25.png

Clique em Ativar APIs e serviços.

a9c0e84296a7ba5b.png

Pesquise e ative as seguintes APIs:

  • API Compute Engine
  • API Dataproc
  • API BigQuery
  • API BigQuery Storage

4. Criar um bucket do GCS

Crie um bucket do Google Cloud Storage na região mais próxima dos seus dados e dê um nome exclusivo a ele.

Isso será usado para o cluster do Dataproc.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

A saída a seguir será exibida

Creating gs://<your-bucket-name>/...

5. Crie seu cluster do Dataproc com o Jupyter Gateway de componentes

Criando o cluster

Defina as variáveis de ambiente do cluster

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

Em seguida, execute este comando gcloud para criar seu cluster com todos os componentes necessários para trabalhar com o Jupyter no cluster.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

A saída a seguir será exibida enquanto o cluster está sendo criado.

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

A criação do cluster leva cerca de 90 segundos. Quando ele estiver pronto, você poderá acessá-lo pela interface do console do Cloud do Dataproc.

Enquanto espera, continue lendo para saber mais sobre as flags usadas no comando gcloud.

Você verá a seguinte saída assim que o cluster for criado:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

Sinalizações usadas no comando gcloud dataproc create

Este é um detalhamento das sinalizações usadas no comando gcloud dataproc create

--region=${REGION}

Especifica a região e a zona em que o cluster será criado. Consulte a lista das regiões disponíveis.

--image-version=1.4

A versão da imagem a ser usada no cluster. Consulte a lista das versões disponíveis aqui.

--bucket=${BUCKET_NAME}

Especifique o bucket do Google Cloud Storage criado anteriormente para usar no cluster. Se você não fornecer um bucket do GCS, ele será criado.

Também é aqui que seus notebooks serão salvos mesmo se você excluir o cluster, já que o bucket do GCS não é excluído.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

Os tipos de máquina a serem usados no cluster do Dataproc. Confira uma lista de tipos de máquinas disponíveis aqui.

Por padrão, se você não definir a sinalização –num-workers, um nó mestre e dois nós de trabalho serão criados.

--optional-components=ANACONDA,JUPYTER

Definir esses valores para componentes opcionais instalará todas as bibliotecas necessárias para Jupyter e Anaconda (necessárias para notebooks do Jupyter) no cluster.

--enable-component-gateway

Ativar o Gateway de componentes cria um link do App Engine usando o Apache Knox e o Inverting Proxy, que oferece acesso fácil, seguro e autenticado às interfaces da Web do Jupyter e do JupyterLab, o que significa que você não precisa mais criar túneis SSH.

Ele também cria links para outras ferramentas do cluster, incluindo o Yarn Resource Manager e o Spark History Server, que são úteis para conferir o desempenho dos jobs e os padrões de uso do cluster.

6. Criar um notebook Apache Spark

Como acessar a interface da Web do JupyterLab

Quando o cluster estiver pronto, você vai encontrar o link do Gateway de componentes para a interface da Web do JupyterLab acessando Clusters do Dataproc: console do Cloud, clicando no cluster que você criou e acessando a guia "Interfaces da Web".

afc40202d555de47.png

Você notará que tem acesso ao Jupyter, que é a interface clássica de notebook, ou JupyterLab, descrito como a interface de última geração para o Projeto Jupyter.

O JupyterLab tem muitos recursos de interface novos e incríveis. Por isso, se você não tem experiência com notebooks ou está procurando as melhorias mais recentes, recomendamos usar o JupyterLab, porque ele vai substituir a interface clássica do Jupyter conforme a documentação oficial.

Criar um notebook com um kernel do Python 3

a463623f2ebf0518.png

Na guia de acesso rápido, clique no ícone do notebook Python 3 para criar um notebook com um kernel do Python 3 (não o kernel do PySpark) para configurar a SparkSession no notebook e incluir o spark-bigquery-connector necessário para usar a API BigQuery Storage.

Renomear o notebook

196a3276ed07e1f3.png

Clique com o botão direito do mouse no nome do notebook na barra lateral à esquerda ou na barra de navegação superior e renomeie o notebook como "BigQuery Storage & DataFrames.ipynb do Spark"

Executar o código Spark no notebook

fbac38062e5bb9cf.png

Neste notebook, você vai usar o spark-bigquery-connector, uma ferramenta para ler e gravar dados entre o BigQuery e o Spark com a API BigQuery Storage.

A API BigQuery Storage traz melhorias significativas para o acesso a dados no BigQuery usando um protocolo baseado em RPC. Ele é compatível com leituras e gravações de dados em paralelo e diferentes formatos de serialização, como Apache Avro e Apache Arrow. Isso se traduz em uma melhora significativa no desempenho, especialmente em conjuntos de dados maiores.

Na primeira célula, verifique a versão do Scala do cluster para incluir a versão correta do jar do spark-bigquery-connector.

Entrada [1]:

!scala -version

Saída [1]:f580e442576b8b1f.png Crie uma sessão Spark e inclua o pacote spark-bigquery-connector.

Se a versão do Scala for 2.11, use o pacote a seguir.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

Se a versão do Scala for 2.12, use o pacote a seguir.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

Entrada [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

Ativar repl.eagerEval

Isso gera os resultados dos DataFrames em cada etapa sem a nova necessidade de mostrar df.show(), além de melhorar a formatação da saída.

Entrada [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

Leia a tabela do BigQuery no Spark DataFrame

Criar um DataFrame do Spark lendo os dados de um conjunto de dados público do BigQuery. Isso usa o spark-bigquery-connector e a API BigQuery Storage para carregar os dados no cluster do Spark.

Criar um DataFrame do Spark e carregar dados do conjunto de dados público do BigQuery para exibições de página da Wikipédia. Você vai perceber que não está executando uma consulta nos dados porque está usando o spark-bigquery-connector para carregar os dados no Spark, onde o processamento dos dados vai ocorrer. Quando esse código é executado, ele não carrega a tabela de fato porque é uma avaliação lenta no Spark, e a execução ocorre na próxima etapa.

Entrada [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

Saída [4]:

c107a33f6fc30ca.png

Selecione as colunas obrigatórias e aplique um filtro usando where(), que é um alias de filter().

Quando esse código é executado, ele aciona uma ação do Spark e, nesse momento, os dados são lidos do armazenamento do BigQuery.

Entrada [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

Saída [5]:

ad363cbe510d625a.png

Agrupe por título e ordene por visualizações de página para ver as principais páginas

Entrada [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

Saída [6]:f718abd05afc0f4.png

7. Usar bibliotecas de plotagem do Python no notebook

É possível usar as várias bibliotecas de plotagem disponíveis em Python para plotar a saída dos jobs do Spark.

Converter o DataFrame do Spark em um DataFrame do Pandas

Converter o DataFrame do Spark em DataFrame do Pandas e definir a datehour como o índice. Isso é útil se você quer trabalhar com os dados diretamente em Python e plotar os dados usando as muitas bibliotecas de plotagem disponíveis do Python.

Entrada [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

Saída [7]:

3df2aaa2351f028d.png

Como representar DataFrame do Pandas

Importe a biblioteca matplotlib, necessária para exibir os gráficos no notebook

Entrada [8]:

import matplotlib.pyplot as plt

Use a função Panda plot para criar um gráfico de linhas a partir do DataFrame do Pandas.

Entrada [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

Saída [9]:bade7042c3033594.png

Verificar se o notebook foi salvo no GCS

Seu primeiro notebook do Jupyter já deve estar em execução no cluster do Dataproc. Dê um nome ao notebook. Ele será salvo automaticamente no bucket do GCS usado ao criar o cluster.

É possível verificar isso usando este comando gsutil no Cloud Shell

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

A saída a seguir será exibida

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. Dica de otimização: armazenar dados em cache na memória

Pode haver cenários em que você queira os dados na memória em vez de sempre fazer leituras no armazenamento do BigQuery.

Esse job vai ler os dados do BigQuery e enviar o filtro para o BigQuery. A agregação será calculada no Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

É possível modificar o job acima para incluir um cache da tabela. Agora o filtro na coluna wiki será aplicado na memória pelo Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Depois, é possível filtrar por outra linguagem wiki usando os dados armazenados em cache em vez de ler dados do armazenamento do BigQuery novamente e, portanto, a execução será muito mais rápida.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

Para remover o cache, execute

df_wiki_all.unpersist()

9. Notebooks de exemplo para mais casos de uso

O repositório do Cloud Dataproc no GitHub apresenta notebooks do Jupyter com padrões comuns do Apache Spark para carregar e salvar dados, além de criar gráficos com diversos produtos do Google Cloud Platform e ferramentas de código aberto:

10. Limpar

Para evitar cobranças desnecessárias na sua conta do GCP após a conclusão deste guia de início rápido:

  1. Exclua o bucket do Cloud Storage que você criou no ambiente
  2. Exclua o ambiente do Dataproc.

Se você criou um projeto apenas para este codelab, também é possível excluí-lo:

  1. No Console do GCP, acesse a página Projetos.
  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir.
  3. Na caixa, digite o ID do projeto e clique em Encerrar para excluí-lo.

Licença

Este trabalho está sob a licença Atribuição 3.0 Genérica da Creative Commons e a licença Apache 2.0.