Apache Spark dan Jupyter Notebooks di Cloud Dataproc

1. Ringkasan

Lab ini akan membahas cara menyiapkan dan menggunakan Apache Spark dan notebook Jupyter di Cloud Dataproc.

Jupyter notebooks banyak digunakan untuk analisis data eksploratif dan membangun model machine learning karena memungkinkan Anda menjalankan kode secara interaktif dan langsung melihat hasil Anda.

Namun, menyiapkan dan menggunakan Apache Spark dan Jupyter Notebooks bisa jadi rumit.

b9ed855863c57d6.png

Cloud Dataproc mempercepat dan mempermudah proses ini dengan memungkinkan Anda membuat Cluster Dataproc dengan Apache Spark, komponen Jupyter, dan Gateway Komponen dalam waktu sekitar 90 detik.

Yang akan Anda pelajari

Dalam codelab ini, Anda akan mempelajari cara:

  • Membuat bucket Google Cloud Storage untuk cluster Anda
  • Buat Cluster Dataproc dengan Jupyter dan Gateway Komponen,
  • Mengakses UI web JupyterLab di Dataproc
  • Membuat Notebook menggunakan konektor Spark BigQuery Storage
  • Menjalankan tugas Spark dan merencanakan hasilnya.

Total biaya untuk menjalankan lab ini di Google Cloud adalah sekitar $1. Detail selengkapnya tentang harga Cloud Dataproc dapat ditemukan di sini.

2. Membuat project

Login ke konsol Google Cloud Platform di console.cloud.google.com dan buat project baru:

7e541d932b20c074.pngS

2deefc9295d114ea.pngS

a92a49afe05008a.png

Selanjutnya, Anda harus mengaktifkan penagihan di Cloud Console untuk menggunakan resource Google Cloud.

Menjalankan operasi dalam codelab ini seharusnya tidak menghabiskan biaya lebih dari beberapa dolar, tetapi bisa lebih mahal jika Anda memutuskan untuk menggunakan lebih banyak resource atau jika Anda membiarkannya berjalan. Bagian terakhir dari codelab ini akan memandu Anda menyelesaikan project.

Pengguna baru Google Cloud Platform memenuhi syarat untuk mendapatkan uji coba gratis senilai$300.

3. Menyiapkan lingkungan Anda

Pertama, buka Cloud Shell dengan mengklik tombol di pojok kanan atas Cloud Console:

a10c47ee6ca41c54.png

Setelah Cloud Shell dimuat, jalankan perintah berikut untuk menetapkan project ID dari langkah sebelumnya**:**

gcloud config set project <project_id>

Project ID juga dapat ditemukan dengan mengklik project Anda di kiri atas konsol cloud:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

Selanjutnya, aktifkan Dataproc, Compute Engine, dan BigQuery Storage API.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

Atau, Anda dapat melakukannya di Cloud Console. Klik ikon menu di kiri atas layar.

2bfc27ef9ba2ec7d.pngS

Pilih Pengelola API dari menu drop-down.

408af5f32c4b7c25.pngS

Klik Enable APIs and Services.

a9c0e84296a7ba5b.png

Telusuri dan aktifkan API berikut:

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. Membuat bucket GCS

Buat bucket Google Cloud Storage di region yang paling dekat dengan data Anda dan beri nama unik.

Ini akan digunakan untuk cluster Dataproc.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

Anda akan melihat output berikut

Creating gs://<your-bucket-name>/...

5. Membuat Cluster Dataproc dengan Jupyter & Gateway Komponen

Membuat cluster Anda

Menetapkan variabel env untuk cluster Anda

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

Kemudian, jalankan perintah gcloud ini untuk membuat cluster dengan semua komponen yang diperlukan agar dapat digunakan dengan Jupyter di cluster Anda.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

Anda akan melihat output berikut saat cluster sedang dibuat

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

Perlu waktu sekitar 90 detik untuk membuat cluster dan setelah siap, Anda akan dapat mengakses cluster dari UI konsol Cloud Dataproc.

Sambil menunggu, Anda dapat melanjutkan bacaan di bawah untuk mempelajari lebih lanjut flag yang digunakan dalam perintah gcloud.

Output Anda akan terlihat seperti berikut setelah cluster dibuat:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

Tanda yang digunakan dalam perintah gcloud dataproc create

Berikut adalah perincian tanda yang digunakan dalam perintah gcloud dataproc create

--region=${REGION}

Menentukan region dan zona tempat cluster akan dibuat. Anda dapat melihat daftar wilayah yang tersedia di sini.

--image-version=1.4

Versi image yang akan digunakan di cluster Anda. Anda dapat melihat daftar versi yang tersedia di sini.

--bucket=${BUCKET_NAME}

Tentukan bucket Google Cloud Storage yang Anda buat sebelumnya untuk digunakan pada cluster. Jika tidak menyediakan bucket GCS, bucket tersebut akan dibuat untuk Anda.

Di sini, notebook juga akan disimpan meskipun Anda menghapus cluster karena bucket GCS tidak dihapus.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

Jenis mesin yang akan digunakan untuk cluster Dataproc Anda. Anda dapat melihat daftar jenis mesin yang tersedia di sini.

Secara default, 1 node master dan 2 worker node akan dibuat jika Anda tidak menyetel flag –num-worker

--optional-components=ANACONDA,JUPYTER

Menetapkan nilai ini untuk komponen opsional akan menginstal semua library yang diperlukan untuk Jupyter dan Anaconda (yang diperlukan untuk notebook Jupyter) di cluster Anda.

--enable-component-gateway

Mengaktifkan Component Gateway akan membuat link App Engine menggunakan Apache Knox dan Inverting Proxy yang memberikan akses yang mudah, aman, serta terautentikasi ke antarmuka web Jupyter dan JupyterLab yang berarti Anda tidak perlu lagi membuat tunnel SSH.

Kode ini juga akan membuat link untuk alat lain di cluster, termasuk Yarn Resource Manager dan Spark History Server yang berguna untuk melihat performa tugas dan pola penggunaan cluster Anda.

6. Membuat notebook Apache Spark

Mengakses antarmuka web JupyterLab

Setelah cluster siap, Anda dapat menemukan link Gateway Komponen ke antarmuka web JupyterLab dengan membuka Cluster Dataproc - Konsol Cloud, mengklik cluster yang telah dibuat, lalu membuka tab Antarmuka Web.

afc40202d555de47.png

Anda akan melihat bahwa Anda memiliki akses ke Jupyter yang merupakan antarmuka {i>notebook<i} klasik atau JupyterLab yang digambarkan sebagai UI generasi berikutnya untuk Project Jupyter.

Ada banyak fitur UI baru yang bagus di JupyterLab dan jika Anda baru menggunakan notebook atau mencari peningkatan terbaru, sebaiknya gunakan JupyterLab karena pada akhirnya akan menggantikan antarmuka Jupyter klasik sesuai dengan dokumen resmi.

Membuat notebook dengan kernel Python 3

a463623f2ebf0518.png

Dari tab peluncur, klik ikon notebook Python 3 untuk membuat notebook dengan kernel Python 3 (bukan kernel PySpark) yang memungkinkan Anda mengonfigurasi SparkSession di notebook dan menyertakan spark-bigquery-connector yang diperlukan untuk menggunakan BigQuery Storage API.

Mengganti nama notebook

196a3276ed07e1f3.pngS

Klik kanan nama notebook di sidebar sebelah kiri atau navigasi atas dan ganti nama notebook menjadi "BigQuery Storage & Spark DataFrame.ipynb"

Menjalankan kode Spark di notebook

fbac38062e5bb9cf.png

Di notebook ini, Anda akan menggunakan spark-bigquery-connector yang merupakan alat untuk membaca dan menulis data antara BigQuery dan Spark menggunakan BigQuery Storage API.

BigQuery Storage API menghadirkan peningkatan yang signifikan dalam mengakses data di BigQuery dengan menggunakan protokol berbasis RPC. Platform ini mendukung pembacaan dan penulisan data secara paralel serta berbagai format serialisasi seperti Apache Avro dan Apache Arrow. Pada tingkat tinggi, hal ini berarti peningkatan performa secara signifikan, terutama pada set data yang lebih besar.

Di sel pertama, periksa versi cluster Anda Scala sehingga Anda dapat menyertakan versi jar spark-bigquery-connector yang tepat.

Input [1]:

!scala -version

Output [1]:f580e442576b8b1f.png Buat sesi Spark dan sertakan paket konektor spark-bigquery-.

Jika versi Scala Anda adalah 2.11, gunakan paket berikut.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

Jika versi Scala Anda adalah 2.12, gunakan paket berikut.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

Input [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

Mengaktifkan repl.eagerEval

Ini akan menampilkan hasil DataFrame di setiap langkah tanpa perlu menampilkan df.show() dan juga meningkatkan pemformatan output.

Input [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

Membaca tabel BigQuery ke dalam Spark DataFrame

Membuat Spark DataFrame dengan membaca data dari set data BigQuery publik. Untuk itu, Anda akan menggunakan spark-bigquery-connector dan BigQuery Storage API untuk memuat data ke cluster Spark.

Buat Spark DataFrame dan muat data dari set data publik BigQuery untuk kunjungan halaman Wikipedia. Anda akan mendapati bahwa Anda tidak menjalankan kueri pada data saat menggunakan spark-bigquery-connector untuk memuat data ke Spark tempat pemrosesan data akan terjadi. Ketika dijalankan, kode ini tidak akan memuat tabel karena merupakan evaluasi lambat di Spark dan eksekusi akan terjadi di langkah berikutnya.

Input [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

Output [4]:

c107a33f6fc30ca.png

Pilih kolom yang wajib diisi dan terapkan filter menggunakan where() yang merupakan alias untuk filter().

Ketika dijalankan, kode ini akan memicu tindakan Spark dan pada tahap ini data dibaca dari BigQuery Storage.

Input [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

Output [5]:

ad363cbe510d625a.png

Kelompokkan menurut judul dan urutkan berdasarkan tayangan halaman untuk melihat halaman teratas

Input [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

Output [6]:f718abd05afc0f4.png

7. Menggunakan library Python di notebook

Anda dapat memanfaatkan berbagai library pemetaan yang tersedia di Python untuk memetakan output tugas Spark Anda.

Mengonversi Spark DataFrame ke DataFrame Pandas

Konversi Spark DataFrame ke Pandas DataFrame dan setel datehour sebagai indeks. Ini berguna jika Anda ingin bekerja dengan data secara langsung di Python dan memetakan data menggunakan banyak library pemetaan Python yang tersedia.

Input [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

Output [7]:

3df2aaa2351f028d.pngS

Merencanakan Dataframe Pandas

Impor library matplotlib yang diperlukan untuk menampilkan plot di notebook

Masukan [8]:

import matplotlib.pyplot as plt

Gunakan fungsi plot Pandas untuk membuat diagram garis dari DataFrame Pandas.

Masukan [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

Output [9]:bade7042c3033594.png

Memastikan notebook disimpan di GCS

Sekarang Anda seharusnya sudah menyiapkan notebook Jupyter pertama di cluster Dataproc. Beri nama notebook Anda yang akan otomatis disimpan ke bucket GCS yang digunakan saat membuat cluster.

Anda dapat memeriksanya menggunakan perintah gsutil ini di Cloud Shell

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

Anda akan melihat output berikut

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. Tips pengoptimalan - Meng-cache data dalam memori

Mungkin ada beberapa skenario di mana Anda ingin data berada dalam memori daripada membaca dari BigQuery Storage setiap saat.

Tugas ini akan membaca data dari BigQuery dan mengirim filter ke BigQuery. Kemudian, agregasi akan dikomputasi di Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Anda dapat memodifikasi tugas di atas untuk menyertakan cache tabel dan sekarang filter pada kolom wiki akan diterapkan di memori oleh Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Anda kemudian dapat memfilter bahasa wiki lain menggunakan data yang di-cache daripada membaca data dari penyimpanan BigQuery lagi dan karenanya akan berjalan jauh lebih cepat.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

Anda dapat menghapus {i>cache<i} dengan menjalankan

df_wiki_all.unpersist()

9. Notebook contoh untuk kasus penggunaan lainnya

Repo GitHub Cloud Dataproc menampilkan notebook Jupyter dengan pola Apache Spark umum untuk memuat data, menyimpan data, dan memetakan data Anda dengan berbagai produk dan alat open source Google Cloud Platform:

10. Pembersihan

Agar tidak menimbulkan tagihan yang tidak perlu pada akun GCP Anda setelah menyelesaikan panduan memulai ini:

  1. Hapus bucket Cloud Storage untuk lingkungan dan yang Anda buat
  2. Menghapus lingkungan Dataproc.

Jika membuat project hanya untuk codelab ini, Anda juga dapat menghapus project tersebut secara opsional:

  1. Di GCP Console, buka halaman Project.
  2. Dalam daftar project, pilih project yang ingin Anda hapus, lalu klik Delete.
  3. Pada kotak, ketik project ID, lalu klik Shut down untuk menghapus project.

Lisensi

Karya ini dilisensikan berdasarkan Lisensi Generik Creative Commons Attribution 3.0 dan Apache 2.0.