Pre-elaborazione dei dati di BigQuery con PySpark su Dataproc

1. Panoramica

Questo codelab illustra come creare una pipeline di elaborazione dei dati utilizzando Apache Spark con Dataproc sulla Google Cloud Platform. È un caso d'uso comune in data science e data engineering leggere i dati da una posizione di archiviazione, applicare trasformazioni e scriverli in un'altra posizione di archiviazione. Le trasformazioni comuni includono la modifica dei contenuti dei dati, l'eliminazione di informazioni non necessarie e la modifica dei tipi di file.

In questo codelab imparerai ad Apache Spark, a eseguire una pipeline di esempio utilizzando Dataproc con PySpark (l'API Python di Apache Spark), BigQuery, Google Cloud Storage e i dati di Reddit.

2. Introduzione ad Apache Spark (facoltativo)

Secondo il sito web, "Apache Spark è un motore di analisi unificato per l'elaborazione di dati su larga scala". Ti consente di analizzare ed elaborare i dati in parallelo e in memoria, il che consente un calcolo parallelo di grandi dimensioni su più macchine e nodi diversi. È stato originariamente rilasciato nel 2014 come upgrade al MapReduce tradizionale ed è ancora uno dei framework più utilizzati per eseguire calcoli su larga scala. Apache Spark è scritto in Scala e di conseguenza ha API in Scala, Java, Python e R. Contiene una serie di librerie come Spark SQL per eseguire query SQL sui dati, Spark Streaming per i dati in streaming, MLlib per il machine learning e GraphX per l'elaborazione di grafici, tutte in esecuzione sul motore Apache Spark.

32add0b6a47bafbc.png

Spark può essere eseguito autonomamente o può sfruttare un servizio di gestione delle risorse come Yarn, Mesos o Kubernetes per la scalabilità. Per questo codelab utilizzerai Dataproc, che utilizza Yarn.

I dati in Spark sono stati originariamente caricati in memoria in un cosiddetto RDD, ovvero un set di dati distribuito resiliente. Da allora, lo sviluppo di Spark ha incluso l'aggiunta di due nuovi tipi di dati in stile colonnare: il set di dati, che è tipizzato, e il dataframe, che non è tipizzato. In termini generali, gli RDD sono ottimi per qualsiasi tipo di dati, mentre i set di dati e i dataframe sono ottimizzati per i dati tabulari. Poiché i set di dati sono disponibili solo con le API Java e Scala, per questo codelab utilizzeremo l'API Dataframe di PySpark. Per saperne di più, consulta la documentazione di Apache Spark.

3. Caso d'uso

Spesso i data engineer hanno bisogno che i dati siano facilmente accessibili ai data scientist. Tuttavia, i dati sono spesso inizialmente sporchi (difficili da utilizzare per l'analisi nel loro stato attuale) e devono essere puliti prima di poter essere di grande utilità. Un esempio sono i dati estratti dal web che potrebbero contenere codifiche strane o tag HTML estranei.

In questo lab caricherai un insieme di dati da BigQuery sotto forma di post di Reddit in un cluster Spark ospitato su Dataproc, estrarrai informazioni utili e archivierà i dati elaborati come file CSV compressi in Google Cloud Storage.

be2a4551ece63bfc.png

Il chief data scientist della tua azienda è interessato a far lavorare i suoi team su diversi problemi di elaborazione del linguaggio naturale. In particolare, è interessato ad analizzare i dati del subreddit "r/food". Creerai una pipeline per un dump di dati a partire da un backfill da gennaio 2017 ad agosto 2019.

4. Accesso a BigQuery tramite l'API BigQuery Storage

L'estrazione dei dati da BigQuery utilizzando il metodo dell'API tabledata.list può richiedere molto tempo e non essere efficiente man mano che la quantità di dati aumenta. Questo metodo restituisce un elenco di oggetti JSON e richiede la lettura sequenziale di una pagina alla volta per leggere un intero set di dati.

L'API BigQuery Storage apporta miglioramenti significativi all'accesso ai dati in BigQuery utilizzando un protocollo basato su RPC. Supporta le letture e le scritture dei dati in parallelo, nonché diversi formati di serializzazione come Apache Avro e Apache Arrow. A livello generale, ciò si traduce in un miglioramento significativo delle prestazioni, in particolare su set di dati più grandi.

In questo codelab utilizzerai spark-bigquery-connector per leggere e scrivere dati tra BigQuery e Spark.

5. Creazione di un progetto

Accedi alla console della piattaforma Google Cloud all'indirizzo console.cloud.google.com e crea un nuovo progetto:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Successivamente, dovrai abilitare la fatturazione nella console Cloud per poter utilizzare le risorse Google Cloud.

L'esecuzione di questo codelab non dovrebbe costarti più di qualche dollaro, ma potrebbe essere più cara se decidi di utilizzare più risorse o se le lasci in esecuzione. L'ultima sezione di questo codelab ti guiderà nella pulizia del progetto.

I nuovi utenti della piattaforma Google Cloud possono beneficiare di una prova senza costi di 300$.

6. Configurazione dell'ambiente

Ora devi configurare l'ambiente:

  • Abilitazione delle API Compute Engine, Dataproc e BigQuery Storage
  • Configurazione delle impostazioni del progetto
  • Creazione di un cluster Dataproc
  • Creazione di un bucket Google Cloud Storage

Abilitazione delle API e configurazione dell'ambiente

Apri Cloud Shell premendo il pulsante nell'angolo in alto a destra di Cloud Console.

a10c47ee6ca41c54.png

Dopo il caricamento di Cloud Shell, esegui i seguenti comandi per abilitare le API Compute Engine, Dataproc e BigQuery Storage:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Imposta l'ID progetto del progetto. Puoi trovarlo nella pagina di selezione dei progetti cercando il progetto. Potrebbe non essere uguale al nome del progetto.

e682e8227aa3c781.png

76d45fb295728542.png

Esegui il seguente comando per impostare l'ID progetto:

gcloud config set project <project_id>

Imposta la regione del progetto scegliendone una dall'elenco qui. Un esempio potrebbe essere us-central1.

gcloud config set dataproc/region <region>

Scegli un nome per il cluster Dataproc e crea una variabile di ambiente.

CLUSTER_NAME=<cluster_name>

Creazione di un cluster Dataproc

Crea un cluster Dataproc eseguendo il seguente comando:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

Il completamento del comando richiederà un paio di minuti. Per analizzare il comando:

Verrà avviata la creazione di un cluster Dataproc con il nome fornito in precedenza. L'utilizzo dell'API beta abiliterà le funzionalità beta di Dataproc, come il gateway dei componenti.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Verrà impostato il tipo di macchina da utilizzare per i worker.

--worker-machine-type n1-standard-8

In questo modo verrà impostato il numero di worker del cluster.

--num-workers 8

Verrà impostata la versione dell'immagine di Dataproc.

--image-version 1.5-debian

Verranno configurate le azioni di inizializzazione da utilizzare nel cluster. Qui includi l'azione di inizializzazione pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Questi sono i metadati da includere nel cluster. Qui fornisci i metadati per l'azione di inizializzazione pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Verranno impostati i componenti facoltativi da installare sul cluster.

--optional-components=ANACONDA

In questo modo verrà attivato il gateway dei componenti, che ti consente di utilizzare il gateway dei componenti di Dataproc per visualizzare interfacce utente comuni come Zeppelin, Jupyter o la cronologia di Spark.

--enable-component-gateway

Per un'introduzione più approfondita a Dataproc, consulta questo codelab.

Creazione di un bucket Google Cloud Storage

Avrai bisogno di un bucket Google Cloud Storage per l'output del job. Scegli un nome univoco per il bucket ed esegui il seguente comando per crearlo. I nomi dei bucket sono univoci in tutti i progetti Google Cloud per tutti gli utenti, pertanto potrebbe essere necessario tentare questa operazione alcune volte con nomi diversi. Un bucket viene creato correttamente se non ricevi un messaggio ServiceException.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Analisi esplorativa dati

Prima di eseguire la preelaborazione, devi scoprire di più sulla natura dei dati con cui hai a che fare. A questo scopo, esaminerai due metodi di esplorazione dei dati. Innanzitutto, visualizzerai alcuni dati non elaborati utilizzando l'interfaccia utente web di BigQuery, quindi calcolerai il numero di post per subreddit utilizzando PySpark e Dataproc.

Using the BigQuery Web UI

Per iniziare, utilizza l'interfaccia utente web di BigQuery per visualizzare i dati. Dall'icona del menu in Cloud Console, scorri verso il basso e premi "BigQuery" per aprire l'interfaccia utente web di BigQuery.

242a597d7045b4da.png

Quindi, esegui il seguente comando nell'editor di query dell'UI web di BigQuery. Verranno restituite 10 righe complete dei dati di gennaio 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Puoi scorrere la pagina per visualizzare tutte le colonne disponibili e alcuni esempi. In particolare, vedrai due colonne che rappresentano i contenuti testuali di ogni post: "title" e"selftext ", quest'ultima corrispondente al corpo del post. Notare anche altre colonne come "created_utc", che indica l'ora UTC in cui è stato creato un post, e "subreddit", che indica il subreddit in cui si trova il post.

Esecuzione di un job PySpark

Esegui i comandi seguenti in Cloud Shell per clonare il repository con il codice di esempio ed eseguire il cd nella directory corretta:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Puoi utilizzare PySpark per determinare il numero di post esistenti per ogni subreddit. Puoi aprire Cloud Editor e leggere lo script cloud-dataproc/codelabs/spark-bigquery prima di eseguirlo nel passaggio successivo:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Fai clic sul pulsante "Apri terminale" in Cloud Editor per tornare a Cloud Shell ed eseguire il seguente comando per eseguire il tuo primo job PySpark:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Questo comando consente di inviare job a Dataproc tramite l'API Jobs. Qui indichi il tipo di job come pyspark. Puoi fornire il nome del cluster, i parametri facoltativi e il nome del file contenente il job. Qui fornisci il parametro --jars che ti consente di includere spark-bigquery-connector nel tuo job. Puoi anche impostare i livelli di output dei log utilizzando --driver-log-levels root=FATAL, che elimina tutto l'output dei log, ad eccezione degli errori. I log di Spark tendono ad essere piuttosto rumorosi.

L'esecuzione dovrebbe richiedere alcuni minuti e l'output finale dovrebbe avere il seguente aspetto:

6c185228db47bb18.png

8. Esplorazione delle UI di Dataproc e Spark

Quando esegui job Spark su Dataproc, hai accesso a due interfacce utente per controllare lo stato dei job / cluster. La prima è l'interfaccia utente di Dataproc, che puoi trovare facendo clic sull'icona del menu e scorrendo verso il basso fino a Dataproc. Qui puoi vedere la memoria attualmente disponibile, la memoria in attesa e il numero di worker.

6f2987346d15c8e2.png

Puoi anche fare clic sulla scheda Job per visualizzare i job completati. Puoi visualizzare i dettagli dei job, ad esempio i log e l'output, facendo clic sull'ID di un job specifico. 114d90129b0e4c88.png

1b2160f0f484594a.png

Puoi anche visualizzare l'interfaccia utente di Spark. Nella pagina del job, fai clic sulla freccia Indietro e poi su Interfacce web. Dovresti vedere diverse opzioni in Gateway dei componenti. Molti di questi possono essere attivati tramite Componenti facoltativi durante la configurazione del cluster. Per questo lab, fai clic su "Server di cronologia Spark".

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

Dovrebbe aprirsi la seguente finestra:

8f6786760f994fe8.png

Qui vengono visualizzati tutti i job completati e puoi fare clic su qualsiasi application_id per visualizzare ulteriori informazioni sul job. Analogamente, puoi fare clic su "Mostra richieste incomplete" in fondo alla pagina di destinazione per visualizzare tutti i job in esecuzione.

9. Eseguire il job di backfill

Ora esegui un job che carica i dati in memoria, estrae le informazioni necessarie e scarica l'output in un bucket Google Cloud Storage. Estrarrai "title", "body" (testo non elaborato) e "timestamp created" per ogni commento di Reddit. Poi, dovrai prendere questi dati, convertirli in un file CSV, comprimerli e caricarli in un bucket con un URI gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

Puoi fare di nuovo riferimento all'editor Cloud per leggere il codice di cloud-dataproc/codelabs/spark-bigquery/backfill.sh, uno script wrapper per eseguire il codice in cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

A breve dovresti vedere una serie di messaggi di completamento del job. Il completamento del job potrebbe richiedere fino a 15 minuti. Puoi anche controllare il bucket di archiviazione per verificare l'output dei dati utilizzando gsutil. Al termine di tutti i job, esegui il seguente comando:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Dovresti vedere l'output seguente:

a7c3c7b2e82f9fca.png

Complimenti, hai completato correttamente un backfill per i dati dei commenti di Reddit. Se ti interessa scoprire come creare modelli in base a questi dati, vai al codelab Spark-NLP.

10. Esegui la pulizia

Per evitare di incorrere in costi non necessari sul tuo account Google Cloud dopo aver completato questa guida rapida:

  1. Elimina il bucket Cloud Storage per l'ambiente che hai creato
  2. Elimina l'ambiente Dataproc.

Se hai creato un progetto solo per questo codelab, puoi anche eliminarlo:

  1. Nella console di Google Cloud, vai alla pagina Progetti.
  2. Nell'elenco dei progetti, seleziona il progetto che vuoi eliminare e fai clic su Elimina.
  3. Nella casella, digita l'ID progetto e fai clic su Chiudi per eliminare il progetto.

Licenza

Questo lavoro è concesso in licenza ai sensi di una licenza Creative Commons Attribution 3.0 Generic e di una licenza Apache 2.0.