Pre-elaborazione dei dati BigQuery con PySpark su Dataproc

1. Panoramica

Questo codelab illustra come creare una pipeline di elaborazione dati utilizzando Apache Spark con Dataproc su Google Cloud Platform. È un caso d'uso comune nel data science e nel data engineering quello di leggere i dati da una posizione di archiviazione, eseguire trasformazioni e scriverli in un'altra posizione di archiviazione. Le trasformazioni più comuni includono la modifica dei contenuti dei dati, l'eliminazione di informazioni non necessarie e la modifica dei tipi di file.

In questo codelab scoprirai Apache Spark, esegui una pipeline di esempio utilizzando Dataproc con PySpark (API Python di Apache Spark), BigQuery, Google Cloud Storage e dati di Reddit.

2. Introduzione ad Apache Spark (facoltativo)

Secondo il sito web, " Apache Spark è un motore di analisi unificato per l'elaborazione dei dati su larga scala." Consente di analizzare ed elaborare i dati in parallelo e in memoria, il che consente un calcolo parallelo di massa su più macchine e nodi diversi. Originariamente rilasciato nel 2014 come upgrade del tradizionale MapReduce, è ancora uno dei framework più popolari per l'esecuzione di calcoli su larga scala. Apache Spark è scritto in Scala e, successivamente, dispone di API in Scala, Java, Python e R. Contiene un'ampia gamma di librerie, come Spark SQL per l'esecuzione di query SQL sui dati, Spark Streaming per i flussi di dati, MLlib per il machine learning e GraphX per l'elaborazione dei grafici, tutte eseguite sul motore Apache Spark.

32add0b6a47bafbc.png

Spark può essere eseguito da solo o può utilizzare un servizio di gestione delle risorse come Yarn, Mesos o Kubernetes per la scalabilità. Utilizzerai Dataproc per questo codelab, in cui si usa Yarn.

I dati in Spark sono stati originariamente caricati in memoria in quello che viene chiamato RDD, o set di dati distribuito resiliente. Da allora, lo sviluppo su Spark ha incluso l'aggiunta di due nuovi tipi di dati in stile colonna: il set di dati, che viene digitato, e il Dataframe, che non è digitato. In generale, gli RDD sono ottimi per qualsiasi tipo di dati, mentre i set di dati e i DataFrame sono ottimizzati per i dati tabulari. Poiché i set di dati sono disponibili solo con le API Java e Scala, continueremo a utilizzare l'API PySpark Dataframe per questo codelab. Per saperne di più, consulta la documentazione di Apache Spark.

3. Caso d'uso

I data engineer spesso hanno bisogno che i dati siano facilmente accessibili ai data scientist. Tuttavia, i dati all'inizio sono spesso non elaborati (difficile da utilizzare per l'analisi nello stato attuale) e devono essere puliti prima di poter essere utilizzati a lungo. Un esempio sono i dati estratti dal Web che potrebbero contenere codifiche strane o tag HTML estranei.

In questo lab caricherai un set di dati da BigQuery sotto forma di post Reddit in un cluster Spark ospitato su Dataproc, estrarrai informazioni utili e archivierai i dati elaborati come file CSV compressi in Google Cloud Storage.

be2a4551ece63bfc.png

Il capo data scientist della tua azienda è interessato a far lavorare i suoi team su diversi problemi di elaborazione del linguaggio naturale. Nello specifico, è interessato ad analizzare i dati nel subreddit "r/food". Creerai una pipeline per un dump dei dati iniziando con un backfill da gennaio 2017 ad agosto 2019.

4. Accesso a BigQuery tramite l'API BigQuery Storage

Il pull dei dati da BigQuery con il metodo API tabledata.list può rivelarsi impegnativo in termini di tempo e poco efficiente se la quantità di dati viene scalata. Questo metodo restituisce un elenco di oggetti JSON e richiede la lettura in sequenza di una pagina alla volta per leggere un intero set di dati.

L'API BigQuery Storage apporta miglioramenti significativi all'accesso ai dati in BigQuery grazie all'utilizzo di un protocollo basato su RPC. Supporta le letture e le scritture dei dati in parallelo, nonché diversi formati di serializzazione, come Apache Avro e Apache Arrow. A livello generale, questo si traduce in un notevole miglioramento delle prestazioni, soprattutto su set di dati più ampi.

In questo codelab utilizzerai spark-bigquery-connector per leggere e scrivere dati tra BigQuery e Spark.

5. Creazione di un progetto

Accedi alla console della piattaforma Google Cloud all'indirizzo console.cloud.google.com e crea un nuovo progetto:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Successivamente, per utilizzare le risorse Google Cloud, dovrai abilitare la fatturazione nella console Cloud.

L'esecuzione di questo codelab non dovrebbe costare più di pochi dollari, ma potrebbe esserlo di più se decidi di utilizzare più risorse o se le lasci in esecuzione. L'ultima sezione di questo codelab ti guiderà nella pulizia del progetto.

I nuovi utenti della piattaforma Google Cloud hanno diritto a una prova senza costi di$300.

6. Configurazione dell'ambiente

Ora passerai alla configurazione dell'ambiente:

  • Abilitazione delle API Compute Engine, Dataproc e BigQuery Storage
  • Configurazione delle impostazioni del progetto
  • Creazione di un cluster Dataproc
  • Creazione di un bucket Google Cloud Storage

Abilitazione delle API e configurazione dell'ambiente

Apri Cloud Shell premendo il pulsante nell'angolo in alto a destra della console Cloud.

a10c47ee6ca41c54.png

Dopo il caricamento di Cloud Shell, esegui questi comandi per abilitare le API Compute Engine, Dataproc e BigQuery Storage:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Imposta l'ID progetto del tuo progetto. Per trovarlo, vai alla pagina di selezione del progetto e cercalo. Potrebbe non essere uguale al nome del progetto.

e682e8227aa3c781.png

76d45fb295728542.png

Esegui questo comando per impostare il tuo ID progetto:

gcloud config set project <project_id>

Imposta l'regione del tuo progetto scegliendone una dall'elenco qui. Un esempio potrebbe essere us-central1.

gcloud config set dataproc/region <region>

Scegli un nome per il cluster Dataproc e crea una variabile di ambiente.

CLUSTER_NAME=<cluster_name>

Creazione di un cluster Dataproc

Crea un cluster Dataproc eseguendo questo comando:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

Il completamento di questo comando richiederà un paio di minuti. Per suddividere il comando:

Verrà avviata la creazione di un cluster Dataproc con il nome che hai fornito in precedenza. L'utilizzo dell'API beta abiliterà funzionalità beta di Dataproc come Gateway dei componenti.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Verrà impostato il tipo di macchina da utilizzare per i worker.

--worker-machine-type n1-standard-8

Questo imposterà il numero di worker che avrà il cluster.

--num-workers 8

Verrà impostata la versione immagine di Dataproc.

--image-version 1.5-debian

Questo configurerà le azioni di inizializzazione da utilizzare sul cluster. Qui includi l'azione di inizializzazione pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Questi sono i metadati da includere nel cluster. Qui fornisci i metadati per l'azione di inizializzazione di pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Questa operazione consente di impostare i Componenti facoltativi da installare sul cluster.

--optional-components=ANACONDA

In questo modo verrà attivato il gateway dei componenti, che consente di utilizzare il gateway dei componenti di Dataproc per visualizzare le UI comuni come Zeppelin, Jupyter o la cronologia Spark

--enable-component-gateway

Per un'introduzione più approfondita a Dataproc, consulta questo codelab.

Creazione di un bucket Google Cloud Storage

Per l'output del job avrai bisogno di un bucket Google Cloud Storage. Scegli un nome univoco per il tuo bucket ed esegui questo comando per creare un nuovo bucket. I nomi dei bucket sono univoci in tutti i progetti Google Cloud per tutti gli utenti, quindi potresti dover ripetere l'operazione più volte con nomi diversi. Se non ricevi un ServiceException, il bucket viene creato correttamente.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Analisi esplorativa dati

Prima di eseguire la pre-elaborazione, dovresti saperne di più sulla natura dei dati che gestisci. Per farlo, esplorerai due metodi di esplorazione dei dati. Innanzitutto, visualizzerai alcuni dati non elaborati utilizzando l'interfaccia utente web di BigQuery, quindi calcolerai il numero di post per subreddit utilizzando PySpark e Dataproc.

Using the BigQuery Web UI

Inizia utilizzando l'interfaccia utente web di BigQuery per visualizzare i tuoi dati. Dall'icona del menu nella console Cloud, scorri verso il basso e premi "BigQuery" per aprire la UI web di BigQuery.

242a597d7045b4da.png

Quindi, esegui questo comando nell'editor di query dell'interfaccia utente web di BigQuery. Verranno restituite 10 righe complete di dati a partire da gennaio 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Puoi scorrere la pagina per visualizzare tutte le colonne disponibili e alcuni esempi. In particolare, vengono visualizzate due colonne che rappresentano i contenuti testuali di ciascun post: "title" e "selftext", quest'ultimo con il corpo del post. Nota anche altre colonne, ad esempio "create_utc" ossia l'ora utc in cui un post è stato creato e "subreddit" che è il subreddit in cui si trova il post.

Esecuzione di un job PySpark

Esegui questi comandi in Cloud Shell per clonare il repository con il codice di esempio e il cd nella directory corretta:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Puoi utilizzare PySpark per determinare il numero di post presenti per ogni subreddit. Puoi aprire Cloud Editor e leggere lo script cloud-dataproc/codelabs/spark-bigquery prima di eseguirlo nel passaggio successivo:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Fai clic su "Apri terminale" in Cloud Editor per tornare a Cloud Shell ed eseguire il comando seguente per eseguire il tuo primo job PySpark:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Questo comando consente di inviare job a Dataproc tramite l'API Jobs. Qui indichi il tipo di prestazione come pyspark. Puoi fornire il nome del cluster, i parametri facoltativi e il nome del file contenente il job. In questo caso fornisci il parametro --jars, che ti consente di includere spark-bigquery-connector nel tuo job. Puoi anche impostare i livelli di output dei log utilizzando --driver-log-levels root=FATAL. In questo modo verranno eliminati tutti gli output dei log, ad eccezione degli errori. I log di Spark tendono a essere piuttosto rumorosi.

L'esecuzione dovrebbe richiedere alcuni minuti e l'output finale dovrebbe essere simile al seguente:

6c185228db47bb18.png

8. Esplorazione delle UI di Dataproc e Spark

Quando esegui job Spark su Dataproc, hai accesso a due UI per controllare lo stato dei tuoi job / cluster. La prima è l'interfaccia utente di Dataproc, che puoi trovare facendo clic sull'icona del menu e scorrendo verso il basso fino a Dataproc. Qui puoi vedere la memoria corrente disponibile, la memoria in sospeso e il numero di worker.

6f2987346d15c8e2.png

Puoi anche fare clic sulla scheda Job per visualizzare i job completati. Puoi visualizzarne i dettagli, ad esempio i log e il relativo output, facendo clic sull'ID di un determinato job. 114d90129b0e4c88.png

1b2160f0f484594a.png

Puoi anche visualizzare la UI di Spark. Dalla pagina del job, fai clic sulla freccia indietro e quindi su Interfacce web. Dovresti vedere diverse opzioni sotto Gateway dei componenti. Molti di questi possono essere abilitati tramite Componenti facoltativi durante la configurazione del cluster. Per questo lab, fai clic su "Server cronologia Spark

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

Dovrebbe aprirsi la seguente finestra:

8f6786760f994fe8.png

Tutti i job completati verranno visualizzati qui e puoi fare clic su qualsiasi application_id per saperne di più sul job. Analogamente, puoi fare clic su "Mostra applicazioni incomplete" in fondo alla pagina di destinazione per visualizzare tutti i job attualmente in esecuzione.

9. Esecuzione del job di backfill

Ora eseguirai un job che carica i dati in memoria, estrae le informazioni necessarie e scarica l'output in un bucket Google Cloud Storage. Estrai i campi "title" e "body" (testo non elaborato) e "timestamp creato" per ogni commento su Reddit. Dovrai quindi prendere questi dati, convertirli in un file CSV, comprimerli e caricarli in un bucket con URI gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

Puoi fare nuovamente riferimento all'Editor Cloud per leggere il codice per cloud-dataproc/codelabs/spark-bigquery/backfill.sh, che è uno script wrapper per l'esecuzione del codice in cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

A breve dovresti vedere una serie di messaggi di completamento del job. Il completamento del job potrebbe richiedere fino a 15 minuti. Puoi anche controllare il bucket di archiviazione per verificare che l'output dei dati sia andato a buon fine utilizzando gsutil. Una volta completati tutti i job, esegui questo comando:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Dovresti vedere l'output seguente:

a7c3c7b2e82f9fca.png

Congratulazioni, hai completato correttamente un backfill per i dati dei tuoi commenti su Reddit. Se ti interessa come creare modelli sulla base di questi dati, passa al codelab Spark-NLP.

10. Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati inutilmente addebiti dopo il completamento di questa guida rapida:

  1. Elimina il bucket Cloud Storage relativo all'ambiente e che hai creato
  2. Elimina l'ambiente Dataproc.

Se hai creato un progetto solo per questo codelab, puoi anche eliminarlo:

  1. Nella console di Google Cloud, vai alla pagina Progetti.
  2. Nell'elenco dei progetti, seleziona il progetto che vuoi eliminare e fai clic su Elimina.
  3. Nella casella, digita l'ID progetto, quindi fai clic su Chiudi per eliminare il progetto.

Licenza

Questo lavoro è concesso in licenza ai sensi di una licenza Creative Commons Attribution 3.0 Generic, e Apache 2.0.