1. Panoramica
Questo lab illustra come configurare e utilizzare Apache Spark e i blocchi note Jupyter su Cloud Dataproc.
I blocchi note Jupyter sono ampiamente utilizzati per l'analisi esplorativa dei dati e la creazione di modelli di machine learning, poiché consentono di eseguire in modo interattivo il codice e visualizzare immediatamente i risultati.
Tuttavia, la configurazione e l'utilizzo dei blocchi note Apache Spark e Jupyter può essere complicato.
Cloud Dataproc rende tutto rapido e semplice consentendo di creare un cluster Dataproc con Apache Spark, un componente Jupyter e un gateway dei componenti in circa 90 secondi.
Obiettivi didattici
In questo codelab, imparerai a:
- Crea un bucket Google Cloud Storage per il tuo cluster
- Creare un cluster Dataproc con Jupyter e un gateway dei componenti.
- Accedi alla UI web JupyterLab su Dataproc
- Creare un blocco note utilizzando il connettore Spark BigQuery Storage
- Esecuzione di un job Spark e tracciamento dei risultati.
Il costo totale per l'esecuzione di questo lab su Google Cloud è di circa 1 $. Puoi trovare tutti i dettagli sui prezzi di Cloud Dataproc qui.
2. Creare un progetto
Accedi alla console della piattaforma Google Cloud all'indirizzo console.cloud.google.com e crea un nuovo progetto:
Successivamente, per utilizzare le risorse Google Cloud, dovrai abilitare la fatturazione nella console Cloud.
L'esecuzione di questo codelab non dovrebbe costare più di pochi dollari, ma potrebbe esserlo di più se decidi di utilizzare più risorse o se le lasci in esecuzione. L'ultima sezione di questo codelab ti guiderà nella pulizia del progetto.
I nuovi utenti della piattaforma Google Cloud hanno diritto a una prova senza costi di$300.
3. Configurazione dell'ambiente
Per prima cosa, apri Cloud Shell facendo clic sul pulsante nell'angolo in alto a destra della console Cloud:
Al termine del caricamento di Cloud Shell, esegui questo comando per impostare l'ID progetto del passaggio precedente**:**
gcloud config set project <project_id>
L'ID progetto puoi trovare anche facendo clic sul tuo progetto in alto a sinistra nella console Cloud:
Successivamente, abilita le API Dataproc, Compute Engine e BigQuery Storage.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
In alternativa, questa operazione può essere eseguita nella console Cloud. Fai clic sull'icona del menu nella parte superiore sinistra dello schermo.
Seleziona Gestore API dal menu a discesa.
Fai clic su Abilita API e servizi.
Cerca e abilita le API seguenti:
- API Compute Engine
- API Dataproc
- API BigQuery
- API BigQuery Storage
4. Crea un bucket GCS
Crea un bucket Google Cloud Storage nella regione più vicina ai tuoi dati e assegnagli un nome univoco.
Verrà utilizzato per il cluster Dataproc.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
Dovresti vedere l'output seguente
Creating gs://<your-bucket-name>/...
5. Creazione di un cluster Dataproc con Jupyter Gateway dei componenti
Creazione del cluster in corso...
Imposta le variabili env per il cluster
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
Poi esegui questo comando gcloud per creare il tuo cluster con tutti i componenti necessari per lavorare con Jupyter sul tuo cluster.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
Durante la creazione del cluster dovresti vedere il seguente output
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
La creazione del cluster dovrebbe richiedere circa 90 secondi e, una volta pronto, potrai accedere al cluster dall'interfaccia utente della console Cloud Dataproc.
Mentre aspetti, puoi continuare a leggere le informazioni riportate di seguito per saperne di più sui flag utilizzati nel comando gcloud.
Una volta creato il cluster, dovresti utilizzare il seguente output:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
Flag utilizzati nel comando gcloud dataproc create
Ecco un'analisi dei flag utilizzati nel comando gcloud dataproc create
--region=${REGION}
Specifica la regione e la zona in cui verrà creato il cluster. Puoi consultare l'elenco delle aree geografiche disponibili qui.
--image-version=1.4
La versione dell'immagine da utilizzare nel cluster. Puoi consultare l'elenco delle versioni disponibili qui.
--bucket=${BUCKET_NAME}
Specifica il bucket Google Cloud Storage che hai creato in precedenza da utilizzare per il cluster. Se non fornisci un bucket GCS, verrà creato automaticamente.
I blocchi note verranno salvati anche se elimini il cluster perché il bucket GCS non viene eliminato.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
I tipi di macchina da utilizzare per il cluster Dataproc. Puoi visualizzare un elenco dei tipi di macchina disponibili qui.
Per impostazione predefinita, vengono creati 1 nodo master e 2 nodi worker se non imposti il flag –num-workers
--optional-components=ANACONDA,JUPYTER
L'impostazione di questi valori per i componenti facoltativi installerà tutte le librerie necessarie per Jupyter e Anaconda (necessarie per i blocchi note Jupyter) sul tuo cluster.
--enable-component-gateway
L'attivazione di Gateway dei componenti crea un collegamento ad App Engine utilizzando Apache Knox e Inverting Proxy, che fornisce un accesso facile, sicuro e autenticato alle interfacce web Jupyter e JupyterLab, il che significa che non è più necessario creare tunnel SSH.
Verrà inoltre creato un link per altri strumenti sul cluster, tra cui Yarn Resource Manager e Spark History Server, che sono utili per vedere le prestazioni dei job e i pattern di utilizzo del cluster.
6. crea un blocco note Apache Spark
Accesso all'interfaccia web JupyterLab
Quando il cluster è pronto, puoi trovare il link del gateway dei componenti all'interfaccia web JupyterLab andando a Cluster Dataproc - Console Cloud, facendo clic sul cluster che hai creato e andando alla scheda Interfacce web.
Noterai di avere accesso a Jupyter, l'interfaccia classica del blocco note, o JupyterLab, descritta come la UI di nuova generazione per Project Jupyter.
Esistono molte nuove fantastiche funzionalità UI in JupyterLab, quindi se non hai esperienza con i blocchi note o sei alla ricerca degli ultimi miglioramenti, ti consigliamo di utilizzare JupyterLab perché alla fine sostituirà la classica interfaccia di Jupyter secondo i documenti ufficiali.
crea un blocco note con un kernel Python 3
Dalla scheda Avvio app, fai clic sull'icona del blocco note Python 3 per creare un blocco note con un kernel Python 3 (non il kernel PySpark) che ti consenta di configurare SparkSession nel blocco note e includere spark-bigquery-connector necessario per utilizzare l'API BigQuery Storage.
Rinomina il blocco note
Fai clic con il pulsante destro del mouse sul nome del blocco note nella barra laterale a sinistra o nella barra di navigazione in alto e rinomina il blocco note in "BigQuery Storage & Spark DataFrames.ipynb"
Esegui il codice Spark nel blocco note
In questo blocco note, utilizzerai spark-bigquery-connector, uno strumento per leggere e scrivere dati tra BigQuery e Spark utilizzando l'API BigQuery Storage.
L'API BigQuery Storage apporta miglioramenti significativi all'accesso ai dati in BigQuery grazie all'utilizzo di un protocollo basato su RPC. Supporta le letture e le scritture dei dati in parallelo, nonché diversi formati di serializzazione, come Apache Avro e Apache Arrow. A livello generale, questo si traduce in un notevole miglioramento delle prestazioni, soprattutto su set di dati più ampi.
Nella prima cella, controlla la versione Scala del cluster in modo da poter includere la versione corretta del jar spark-bigquery-connector.
Input [1]:
!scala -version
Output [1]: crea una sessione Spark e includi il pacchetto spark-bigquery-connector.
Se la versione di Scala è la 2.11, utilizza il pacchetto seguente.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
Se la versione di Scala è la 2.12, utilizza il pacchetto seguente.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
Input [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
Attiva repl.eagerEval
In questo modo verranno restituiti i risultati dei DataFrames in ogni passaggio senza che sia necessario mostrare df.show(). Inoltre, la formattazione dell'output verrà migliorata.
Input [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
Lettura della tabella BigQuery in Spark DataFrame
Creare un DataFrame Spark leggendo i dati di un set di dati pubblico di BigQuery. Utilizza quindi spark-bigquery-connector e l'API BigQuery Storage per caricare i dati nel cluster Spark.
Crea un DataFrame Spark e carica i dati dal set di dati pubblico BigQuery per le visualizzazioni di pagina di Wikipedia. Noterai che non esegui una query sui dati perché stai utilizzando spark-bigquery-connector per caricare i dati in Spark, dove verrà eseguita l'elaborazione dei dati. Quando viene eseguito, questo codice non carica effettivamente la tabella perché si tratta di una valutazione lazy in Spark e l'esecuzione verrà eseguita nel passaggio successivo.
Input [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
Output [4]:
Seleziona le colonne obbligatorie e applica un filtro utilizzando where()
, che è un alias di filter()
.
Quando questo codice viene eseguito, attiva un'azione Spark e a questo punto i dati vengono letti da BigQuery Storage.
Input [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
Output [5]:
Raggruppa per titolo e ordina per visualizzazioni di pagina al fine di vedere le pagine principali
Input [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
Output [6]:
7. Usa le librerie di tracciamento Python nel blocco note
Puoi utilizzare le varie librerie di tracciamento disponibili in Python per tracciare l'output dei tuoi job Spark.
Converti Spark DataFrame in Pandas DataFrame
Converti il DataFrame Spark in DataFrame Pandas e imposta datehour come indice. Ciò è utile se vuoi lavorare con i dati direttamente in Python e tracciare i dati utilizzando le molte librerie di tracciamento Python disponibili.
Input [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
Output [7]:
Tracciare i dataframe Pandas
Importa la libreria matplotlib necessaria per visualizzare i grafici nel blocco note
Input [8]:
import matplotlib.pyplot as plt
Utilizza la funzione di grafico Pandas per creare un grafico a linee dal DataFrame Pandas.
Input [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
Output [9]:
Verifica che il blocco note sia stato salvato in GCS
A questo punto dovresti avere il tuo primo blocco note Jupyter in esecuzione sul cluster Dataproc. Assegna un nome al blocco note, che verrà salvato automaticamente nel bucket GCS utilizzato durante la creazione del cluster.
Puoi verificarlo utilizzando questo comando gsutil in Cloud Shell
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
Dovresti vedere l'output seguente
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. Suggerimento per l'ottimizzazione: memorizza i dati nella cache in memoria
Potrebbero verificarsi scenari in cui potresti voler mantenere i dati in memoria anziché leggerli ogni volta da BigQuery Storage.
Questo job leggerà i dati da BigQuery ed eseguirà il push del filtro a BigQuery. L'aggregazione verrà quindi calcolata in Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Puoi modificare il job sopra per includere una cache della tabella e ora il filtro nella colonna wiki verrà applicato in memoria da Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Puoi quindi filtrare i dati in base a un'altra lingua wiki utilizzando i dati memorizzati nella cache invece di leggere di nuovo i dati dallo spazio di archiviazione di BigQuery, il che a sua volta verrà eseguito molto più velocemente.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
Puoi rimuovere la cache eseguendo
df_wiki_all.unpersist()
9. Blocchi note di esempio per altri casi d'uso
Il repository GitHub di Cloud Dataproc presenta blocchi note Jupyter con pattern Apache Spark comuni per caricare, salvare e tracciare i dati con vari prodotti e strumenti open source della piattaforma Google Cloud:
10. Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati inutilmente addebiti dopo il completamento di questa guida rapida:
- Elimina il bucket Cloud Storage relativo all'ambiente e che hai creato
- Elimina l'ambiente Dataproc.
Se hai creato un progetto solo per questo codelab, puoi anche eliminarlo:
- Nella console di Google Cloud, vai alla pagina Progetti.
- Nell'elenco dei progetti, seleziona il progetto che vuoi eliminare e fai clic su Elimina.
- Nella casella, digita l'ID progetto, quindi fai clic su Chiudi per eliminare il progetto.
Licenza
Questo lavoro è concesso in licenza ai sensi di una licenza Creative Commons Attribution 3.0 Generic, e Apache 2.0.