Apache Spark e Jupyter Notebooks su Cloud Dataproc

1. Panoramica

Questo lab illustrerà come configurare e utilizzare Apache Spark e i blocchi note Jupyter su Cloud Dataproc.

I notebook Jupyter sono ampiamente utilizzati per l'analisi esplorativa dei dati e la creazione di modelli di machine learning, in quanto consentono di eseguire il codice in modo interattivo e visualizzare immediatamente i risultati.

Tuttavia, la configurazione e l'utilizzo di Apache Spark e Jupyter Notebooks possono essere complicati.

b9ed855863c57d6.png

Cloud Dataproc rende questa operazione rapida e semplice consentendoti di creare un cluster Dataproc con Apache Spark, il componente Jupyter e il componente gateway in circa 90 secondi.

Obiettivi didattici

In questo codelab imparerai a:

  • Crea un bucket Google Cloud Storage per il cluster
  • Crea un cluster Dataproc con Jupyter e il gateway dei componenti.
  • Accedere all'interfaccia utente web di JupyterLab su Dataproc
  • Crea un notebook che utilizzi il connettore Spark BigQuery Storage
  • Esecuzione di un job Spark e tracciamento dei risultati.

Il costo totale per eseguire questo lab su Google Cloud è di circa 1 $. I dettagli completi sui prezzi di Cloud Dataproc sono disponibili qui.

2. Creare un progetto

Accedi alla console Google Cloud all'indirizzo console.cloud.google.com e crea un nuovo progetto:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Successivamente, dovrai abilitare la fatturazione nella console Cloud per utilizzare le risorse Google Cloud.

L'esecuzione di questo codelab non dovrebbe costarti più di qualche dollaro, ma potrebbe essere più cara se decidi di utilizzare più risorse o se le lasci in esecuzione. L'ultima sezione di questo codelab ti guiderà nella pulizia del progetto.

I nuovi utenti di Google Cloud Platform possono beneficiare di una prova senza costi di 300$.

3. Configurazione dell'ambiente

Innanzitutto, apri Cloud Shell facendo clic sul pulsante nell'angolo in alto a destra della console cloud:

a10c47ee6ca41c54.png

Dopo il caricamento di Cloud Shell, esegui questo comando per impostare l'ID progetto del passaggio precedente**:**

gcloud config set project <project_id>

Puoi trovare l'ID progetto anche facendo clic sul tuo progetto in alto a sinistra nella console cloud:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

Successivamente, abilita le API Dataproc, Compute Engine e BigQuery Storage.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

In alternativa, puoi farlo nella console Cloud. Fai clic sull'icona del menu nella parte superiore sinistra dello schermo.

2bfc27ef9ba2ec7d.png

Seleziona API Manager dal menu a discesa.

408af5f32c4b7c25.png

Fai clic su Abilita API e servizi.

a9c0e84296a7ba5b.png

Cerca e abilita le seguenti API:

  • API Compute Engine
  • API Dataproc
  • API BigQuery
  • API BigQuery Storage

4. Crea un bucket GCS

Crea un bucket Google Cloud Storage nella regione più vicina ai tuoi dati e assegnagli un nome univoco.

che verrà utilizzato per il cluster Dataproc.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

Dovresti vedere l'output seguente

Creating gs://<your-bucket-name>/...

5. Crea il cluster Dataproc con Jupyter e Component Gateway

Creazione del cluster

Imposta le variabili di ambiente per il cluster

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

Quindi esegui questo comando gcloud per creare il cluster con tutti i componenti necessari per utilizzare Jupyter nel cluster.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

Durante la creazione del cluster, dovresti visualizzare il seguente output

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

La creazione del cluster dovrebbe richiedere circa 90 secondi e, una volta pronto, potrai accedervi dall'interfaccia utente della console Cloud Dataproc.

Mentre aspetti, puoi continuare a leggere di seguito per scoprire di più sui flag utilizzati nel comando gcloud.

Una volta creato il cluster, dovresti visualizzare il seguente output:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

Flag utilizzati nel comando gcloud dataproc create

Di seguito è riportata una suddivisione dei flag utilizzati nel comando gcloud dataproc create

--region=${REGION}

Specifica la regione e la zona in cui verrà creato il cluster. Puoi consultare l'elenco delle regioni disponibili qui.

--image-version=1.4

Versione dell'immagine da utilizzare nel cluster. Puoi consultare l'elenco delle versioni disponibili qui.

--bucket=${BUCKET_NAME}

Specifica il bucket Google Cloud Storage che hai creato in precedenza da utilizzare per il cluster. Se non fornisci un bucket GCS, verrà creato per te.

Qui verranno salvati anche i tuoi notebook, anche se elimini il cluster, poiché il bucket GCS non viene eliminato.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

I tipi di macchina da utilizzare per il cluster Dataproc. Puoi consultare un elenco dei tipi di macchine disponibili qui.

Per impostazione predefinita, se non imposti il flag --num-workers, vengono creati 1 nodo master e 2 nodi worker.

--optional-components=ANACONDA,JUPYTER

L'impostazione di questi valori per i componenti facoltativi installerà tutte le librerie necessarie per Jupyter e Anaconda (necessario per i notebook Jupyter) sul cluster.

--enable-component-gateway

L'attivazione di Component Gateway crea un link App Engine utilizzando Apache Knox e Inverting Proxy, che fornisce un accesso facile, sicuro e autenticato alle interfacce web Jupyter e JupyterLab, il che significa che non è più necessario creare tunnel SSH.

Verranno creati anche link per altri strumenti sul cluster, tra cui Yarn Resource Manager e Spark History Server, utili per visualizzare il rendimento dei job e i pattern di utilizzo del cluster.

6. Crea un notebook Apache Spark

Accesso all'interfaccia web di JupyterLab

Una volta pronto il cluster, puoi trovare il link al gateway dei componenti all'interfaccia web di JupyterLab andando a Cluster Dataproc - Console Cloud, facendo clic sul cluster che hai creato e andando alla scheda Interfacce web.

afc40202d555de47.png

Noterai di avere accesso a Jupyter, che è l'interfaccia classica del notebook, o a JupyterLab, descritta come la UI di nuova generazione per Project Jupyter.

JupyterLab offre molte nuove funzionalità dell'interfaccia utente. Se non hai mai utilizzato i notebook o stai cercando gli ultimi miglioramenti, ti consigliamo di utilizzare JupyterLab, che alla fine sostituirà la classica interfaccia Jupyter, come indicato nella documentazione ufficiale.

Crea un notebook con un kernel Python 3

a463623f2ebf0518.png

Nella scheda del launcher, fai clic sull'icona del notebook Python 3 per creare un notebook con un kernel Python 3 (non il kernel PySpark) che ti consente di configurare SparkSession nel notebook e includere spark-bigquery-connector necessario per utilizzare l'API BigQuery Storage.

Rinomina il notebook

196a3276ed07e1f3.png

Fai clic con il tasto destro del mouse sul nome del blocco note nella barra laterale a sinistra o nella navigazione in alto e rinominalo "BigQuery Storage & Spark DataFrames.ipynb".

Esegui il codice Spark nel notebook

fbac38062e5bb9cf.png

In questo blocco note utilizzerai spark-bigquery-connector, uno strumento per leggere e scrivere dati tra BigQuery e Spark utilizzando l'API BigQuery Storage.

L'API BigQuery Storage introduce miglioramenti significativi all'accesso ai dati in BigQuery utilizzando un protocollo basato su RPC. Supporta letture e scritture di dati in parallelo, nonché diversi formati di serializzazione come Apache Avro e Apache Arrow. A livello generale, ciò si traduce in un miglioramento significativo delle prestazioni, soprattutto su set di dati più grandi.

Nella prima cella controlla la versione di Scala del cluster in modo da poter includere la versione corretta del file JAR spark-bigquery-connector.

Input [1]:

!scala -version

Output [1]:f580e442576b8b1f.png crea una sessione Spark e includi il pacchetto spark-bigquery-connector.

Se la tua versione di Scala è 2.11, utilizza il seguente pacchetto.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

Se la tua versione di Scala è 2.12, utilizza il seguente pacchetto.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

Input [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

Attiva repl.eagerEval

In questo modo vengono visualizzati i risultati dei DataFrame in ogni passaggio senza la necessità di mostrare df.show() e viene migliorata anche la formattazione dell'output.

Input [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

Leggi la tabella BigQuery in Spark DataFrame

Crea un DataFrame Spark leggendo i dati da un set di dati pubblico BigQuery. Utilizza spark-bigquery-connector e l'API BigQuery Storage per caricare i dati nel cluster Spark.

Crea un DataFrame Spark e carica i dati dal set di dati pubblico BigQuery per le visualizzazioni di pagina di Wikipedia. Noterai che non stai eseguendo una query sui dati perché stai utilizzando spark-bigquery-connector per caricare i dati in Spark, dove verrà eseguita l'elaborazione. Quando viene eseguito, questo codice non carica effettivamente la tabella perché si tratta di una valutazione pigra in Spark e l'esecuzione avverrà nel passaggio successivo.

Input [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

Output [4]:

c107a33f6fc30ca.png

Seleziona le colonne richieste e applica un filtro utilizzando where(), che è un alias di filter().

Quando questo codice viene eseguito, viene attivata un'azione Spark e i dati vengono letti da BigQuery Storage a questo punto.

Input [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

Output [5]:

ad363cbe510d625a.png

Raggruppa per titolo e ordina per visualizzazioni di pagina per visualizzare le pagine più visitate

Input [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

Output [6]:f718abd05afc0f4.png

7. Utilizzare le librerie di tracciamento Python nel notebook

Puoi utilizzare le varie librerie di tracciamento disponibili in Python per tracciare l'output dei job Spark.

Converti Spark DataFrame in Pandas DataFrame

Converti lo Spark DataFrame in Pandas DataFrame e imposta datehour come indice. Ciò è utile se vuoi lavorare con i dati direttamente in Python e tracciarli utilizzando le numerose librerie di tracciamento Python disponibili.

Input [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

Output [7]:

3df2aaa2351f028d.png

Tracciamento del DataFrame Pandas

Importa la libreria matplotlib, necessaria per visualizzare i grafici nel blocco note.

Input [8]:

import matplotlib.pyplot as plt

Utilizza la funzione di tracciamento di Pandas per creare un grafico a linee dal DataFrame Pandas.

Input [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

Output [9]:bade7042c3033594.png

Controlla che il notebook sia stato salvato in GCS

Ora dovresti avere il tuo primo notebook Jupyter in esecuzione sul cluster Dataproc. Assegna un nome al notebook, che verrà salvato automaticamente nel bucket GCS utilizzato durante la creazione del cluster.

Puoi verificarlo utilizzando questo comando gsutil in Cloud Shell

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

Dovresti vedere l'output seguente

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. Suggerimento per l'ottimizzazione: memorizza i dati nella cache

Potrebbero esserci scenari in cui vuoi che i dati siano in memoria anziché leggerli da BigQuery Storage ogni volta.

Questo job leggerà i dati da BigQuery e invierà il filtro a BigQuery. L'aggregazione verrà quindi calcolata in Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Puoi modificare il job precedente in modo da includere una cache della tabella e ora il filtro sulla colonna wiki verrà applicato in memoria da Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Puoi quindi filtrare per un'altra lingua wiki utilizzando i dati memorizzati nella cache anziché leggere di nuovo i dati dall'archiviazione BigQuery, in modo da velocizzare notevolmente l'operazione.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

Puoi rimuovere la cache eseguendo

df_wiki_all.unpersist()

9. Notebook di esempio per altri casi d'uso

Il repository GitHub di Cloud Dataproc include notebook Jupyter con pattern Apache Spark comuni per caricare, salvare e tracciare i dati con vari prodotti Google Cloud e strumenti open source:

10. Esegui la pulizia

Per evitare di sostenere costi inutili per il tuo account GCP al termine di questa guida rapida:

  1. Elimina il bucket Cloud Storage per l'ambiente che hai creato
  2. Elimina l'ambiente Dataproc.

Se hai creato un progetto solo per questo codelab, puoi anche eliminarlo facoltativamente:

  1. Nella console di GCP, vai alla pagina Progetti.
  2. Nell'elenco dei progetti, seleziona quello da eliminare e fai clic su Elimina.
  3. Nella casella, digita l'ID progetto, quindi fai clic su Chiudi per eliminare il progetto.

Licenza

Questo lavoro è concesso in licenza ai sensi di una licenza Creative Commons Attribution 3.0 Generic e di una licenza Apache 2.0.