Apache Spark dan Jupyter Notebooks di Cloud Dataproc

1. Ringkasan

Lab ini akan membahas cara menyiapkan dan menggunakan Apache Spark dan notebook Jupyter di Cloud Dataproc.

Notebook Jupyter banyak digunakan untuk analisis data eksploratif dan membangun model machine learning karena memungkinkan Anda menjalankan kode secara interaktif dan langsung melihat hasilnya.

Namun, penyiapan dan penggunaan Apache Spark dan Jupyter Notebook bisa jadi rumit.

b9ed855863c57d6.png

Cloud Dataproc membuat proses ini menjadi cepat dan mudah dengan memungkinkan Anda membuat Cluster Dataproc dengan Apache Spark, komponen Jupyter, dan Component Gateway dalam waktu sekitar 90 detik.

Yang akan Anda pelajari

Dalam codelab ini, Anda akan mempelajari cara:

  • Membuat bucket Google Cloud Storage untuk cluster Anda
  • Membuat Cluster Dataproc dengan Jupyter dan Component Gateway,
  • Mengakses UI web JupyterLab di Dataproc
  • Membuat Notebook yang menggunakan konektor Spark BigQuery Storage
  • Menjalankan tugas Spark dan memetakan hasilnya.

Total biaya untuk menjalankan lab ini di Google Cloud adalah sekitar $1. Detail lengkap tentang harga Cloud Dataproc dapat ditemukan di sini.

2. Membuat project

Login ke Konsol Google Cloud Platform di console.cloud.google.com dan buat project baru:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Selanjutnya, Anda harus mengaktifkan penagihan di Konsol Cloud untuk menggunakan resource Google Cloud.

Menjalankan operasi dalam codelab ini tidak akan menghabiskan biaya lebih dari beberapa dolar, tetapi bisa lebih jika Anda memutuskan untuk menggunakan lebih banyak resource atau jika Anda membiarkannya berjalan. Bagian terakhir codelab ini akan memandu Anda membersihkan project.

Pengguna baru Google Cloud Platform memenuhi syarat untuk mendapatkan uji coba gratis senilai$300.

3. Menyiapkan lingkungan Anda

Pertama, buka Cloud Shell dengan mengklik tombol di pojok kanan atas konsol cloud:

a10c47ee6ca41c54.png

Setelah Cloud Shell dimuat, jalankan perintah berikut untuk menetapkan project ID dari langkah sebelumnya**:**

gcloud config set project <project_id>

Project ID juga dapat ditemukan dengan mengklik project Anda di kiri atas konsol cloud:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

Selanjutnya, aktifkan Dataproc API, Compute Engine API, dan BigQuery Storage API.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

Atau, hal ini dapat dilakukan di Konsol Cloud. Klik ikon menu di kiri atas layar.

2bfc27ef9ba2ec7d.png

Pilih API Manager dari menu drop-down.

408af5f32c4b7c25.png

Klik Enable APIs and Services.

a9c0e84296a7ba5b.png

Telusuri dan aktifkan API berikut:

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. Buat bucket GCS

Buat bucket Google Cloud Storage di region yang paling dekat dengan data Anda dan beri nama unik.

Hal ini akan digunakan untuk cluster Dataproc.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

Anda akan melihat output berikut:

Creating gs://<your-bucket-name>/...

5. Membuat Cluster Dataproc dengan Jupyter & Component Gateway

Membuat cluster

Tetapkan variabel lingkungan untuk cluster Anda

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

Kemudian, jalankan perintah gcloud ini untuk membuat cluster dengan semua komponen yang diperlukan agar dapat menggunakan Jupyter di cluster Anda.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

Anda akan melihat output berikut saat cluster Anda dibuat

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

Proses pembuatan cluster akan memakan waktu sekitar 90 detik dan setelah siap, Anda dapat mengakses cluster dari UI konsol Cloud Dataproc.

Sambil menunggu, Anda dapat melanjutkan membaca di bawah untuk mempelajari lebih lanjut flag yang digunakan dalam perintah gcloud.

Anda akan melihat output berikut setelah cluster dibuat:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

Flag yang digunakan dalam perintah gcloud dataproc create

Berikut adalah perincian flag yang digunakan dalam perintah gcloud dataproc create

--region=${REGION}

Menentukan region dan zona tempat cluster akan dibuat. Anda dapat melihat daftar region yang menyediakan fitur ini di sini.

--image-version=1.4

Versi image yang akan digunakan di cluster Anda. Anda dapat melihat daftar versi yang tersedia di sini.

--bucket=${BUCKET_NAME}

Tentukan bucket Google Cloud Storage yang Anda buat sebelumnya untuk digunakan pada cluster. Jika Anda tidak menyediakan bucket GCS, bucket tersebut akan dibuat untuk Anda.

Di sini juga tempat notebook Anda akan disimpan meskipun Anda menghapus cluster karena bucket GCS tidak dihapus.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

Jenis mesin yang akan digunakan untuk cluster Dataproc Anda. Anda dapat melihat daftar jenis mesin yang tersedia di sini.

Secara default, 1 node master dan 2 node pekerja dibuat jika Anda tidak menetapkan flag –num-workers

--optional-components=ANACONDA,JUPYTER

Menetapkan nilai ini untuk komponen opsional akan menginstal semua library yang diperlukan untuk Jupyter dan Anaconda (yang diperlukan untuk notebook Jupyter) di cluster Anda.

--enable-component-gateway

Mengaktifkan Component Gateway akan membuat link App Engine menggunakan Apache Knox dan Inverting Proxy yang memberikan akses mudah, aman, dan terautentikasi ke antarmuka web Jupyter dan JupyterLab, sehingga Anda tidak perlu lagi membuat tunnel SSH.

Skrip ini juga akan membuat link untuk alat lain di cluster, termasuk Yarn Resource Manager dan Spark History Server yang berguna untuk melihat performa tugas dan pola penggunaan cluster Anda.

6. Membuat notebook Apache Spark

Mengakses antarmuka web JupyterLab

Setelah cluster siap, Anda dapat menemukan link Component Gateway ke antarmuka web JupyterLab dengan membuka Dataproc Clusters - Cloud console, mengklik cluster yang Anda buat, lalu membuka tab Web Interfaces.

afc40202d555de47.png

Anda akan melihat bahwa Anda memiliki akses ke Jupyter yang merupakan antarmuka notebook klasik atau JupyterLab yang dideskripsikan sebagai UI generasi berikutnya untuk Project Jupyter.

Ada banyak fitur UI baru yang hebat di JupyterLab, jadi jika Anda baru menggunakan notebook atau mencari peningkatan terbaru, sebaiknya gunakan JupyterLab karena pada akhirnya akan menggantikan antarmuka Jupyter klasik menurut dokumen resmi.

Membuat notebook dengan kernel Python 3

a463623f2ebf0518.png

Dari tab peluncur, klik ikon notebook Python 3 untuk membuat notebook dengan kernel Python 3 (bukan kernel PySpark) yang memungkinkan Anda mengonfigurasi SparkSession di notebook dan menyertakan spark-bigquery-connector yang diperlukan untuk menggunakan BigQuery Storage API.

Mengganti nama notebook

196a3276ed07e1f3.png

Klik kanan nama notebook di sidebar di sebelah kiri atau navigasi atas, lalu ganti nama notebook menjadi "BigQuery Storage & Spark DataFrames.ipynb"

Menjalankan kode Spark di notebook

fbac38062e5bb9cf.png

Dalam notebook ini, Anda akan menggunakan spark-bigquery-connector yang merupakan alat untuk membaca dan menulis data antara BigQuery dan Spark menggunakan BigQuery Storage API.

BigQuery Storage API memberikan peningkatan signifikan pada akses data di BigQuery dengan menggunakan protokol berbasis RPC. Layanan ini mendukung pembacaan dan penulisan data secara paralel serta berbagai format serialisasi seperti Apache Avro dan Apache Arrow. Secara umum, hal ini menghasilkan peningkatan performa yang signifikan, terutama pada set data yang lebih besar.

Di sel pertama, periksa versi Scala cluster Anda sehingga Anda dapat menyertakan versi jar spark-bigquery-connector yang benar.

Input [1]:

!scala -version

Output [1]:f580e442576b8b1f.png Buat sesi Spark dan sertakan paket spark-bigquery-connector.

Jika versi Scala Anda adalah 2.11, gunakan paket berikut.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

Jika versi Scala Anda adalah 2.12, gunakan paket berikut.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

Input [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

Aktifkan repl.eagerEval

Hal ini akan menampilkan hasil DataFrame di setiap langkah tanpa perlu menampilkan df.show() dan juga meningkatkan format output.

Input [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

Membaca tabel BigQuery ke dalam DataFrame Spark

Buat DataFrame Spark dengan membaca data dari set data BigQuery publik. Hal ini menggunakan spark-bigquery-connector dan BigQuery Storage API untuk memuat data ke dalam cluster Spark.

Buat DataFrame Spark dan muat data dari set data publik BigQuery untuk penayangan halaman Wikipedia. Anda akan melihat bahwa Anda tidak menjalankan kueri pada data karena Anda menggunakan spark-bigquery-connector untuk memuat data ke Spark tempat pemrosesan data akan terjadi. Saat dijalankan, kode ini tidak akan memuat tabel karena merupakan evaluasi lambat di Spark dan eksekusi akan terjadi pada langkah berikutnya.

Input [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

Output [4]:

c107a33f6fc30ca.png

Pilih kolom yang diperlukan dan terapkan filter menggunakan where() yang merupakan alias untuk filter().

Saat kode ini dijalankan, tindakan Spark akan dipicu dan data akan dibaca dari BigQuery Storage pada saat ini.

Input [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

Output [5]:

ad363cbe510d625a.png

Kelompokkan menurut judul dan urutkan menurut tayangan halaman untuk melihat halaman teratas

Input [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

Output [6]:f718abd05afc0f4.png

7. Menggunakan library plotting Python di notebook

Anda dapat memanfaatkan berbagai library plotting yang tersedia di Python untuk memplot output tugas Spark Anda.

Mengonversi DataFrame Spark menjadi DataFrame Pandas

Konversi DataFrame Spark ke DataFrame Pandas dan tetapkan datehour sebagai indeks. Hal ini berguna jika Anda ingin langsung mengolah data di Python dan memplot data menggunakan banyak library plotting Python yang tersedia.

Input [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

Output [7]:

3df2aaa2351f028d.png

Membuat Plot Pandas DataFrame

Impor library matplotlib yang diperlukan untuk menampilkan plot di notebook

Input [8]:

import matplotlib.pyplot as plt

Gunakan fungsi plot Pandas untuk membuat diagram garis dari Pandas DataFrame.

Input [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

Output [9]:bade7042c3033594.png

Periksa apakah notebook disimpan di GCS

Sekarang Anda akan memiliki notebook Jupyter pertama yang aktif dan berjalan di cluster Dataproc. Beri nama notebook Anda dan notebook tersebut akan disimpan otomatis ke bucket GCS yang digunakan saat membuat cluster.

Anda dapat memeriksanya menggunakan perintah gsutil ini di cloud shell

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

Anda akan melihat output berikut:

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. Tips pengoptimalan - Menyimpan data dalam cache di memori

Mungkin ada skenario saat Anda menginginkan data dalam memori, bukan membaca dari BigQuery Storage setiap saat.

Tugas ini akan membaca data dari BigQuery dan mengirimkan filter ke BigQuery. Agregasi kemudian akan dihitung di Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Anda dapat mengubah tugas di atas untuk menyertakan cache tabel dan sekarang filter pada kolom wiki akan diterapkan dalam memori oleh Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Kemudian, Anda dapat memfilter bahasa wiki lain menggunakan data yang di-cache, bukan membaca data dari penyimpanan BigQuery lagi, sehingga akan berjalan jauh lebih cepat.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

Anda dapat menghapus cache dengan menjalankan

df_wiki_all.unpersist()

9. Contoh notebook untuk kasus penggunaan lainnya

Repositori GitHub Cloud Dataproc menampilkan notebook Jupyter dengan pola Apache Spark umum untuk memuat data, menyimpan data, dan memplot data Anda dengan berbagai produk Google Cloud Platform dan alat open source:

10. Pembersihan

Agar tidak menimbulkan biaya yang tidak perlu pada akun GCP Anda setelah menyelesaikan panduan memulai ini:

  1. Hapus bucket Cloud Storage untuk lingkungan dan yang Anda buat
  2. Hapus lingkungan Dataproc.

Jika membuat project hanya untuk codelab ini, Anda juga dapat menghapus project tersebut jika mau:

  1. Di Konsol GCP, buka halaman Projects.
  2. Dalam daftar project, pilih project yang ingin Anda hapus, lalu klik Hapus.
  3. Di kotak, ketik project ID, lalu klik Shut down untuk menghapus project.

Lisensi

Karya ini dilisensikan berdasarkan Lisensi Umum Creative Commons Attribution 3.0, dan lisensi Apache 2.0.