1. Ringkasan
Lab ini akan membahas cara menyiapkan dan menggunakan Apache Spark dan notebook Jupyter di Cloud Dataproc.
Jupyter notebooks banyak digunakan untuk analisis data eksploratif dan membangun model machine learning karena memungkinkan Anda menjalankan kode secara interaktif dan langsung melihat hasil Anda.
Namun, menyiapkan dan menggunakan Apache Spark dan Jupyter Notebooks bisa jadi rumit.
Cloud Dataproc mempercepat dan mempermudah proses ini dengan memungkinkan Anda membuat Cluster Dataproc dengan Apache Spark, komponen Jupyter, dan Gateway Komponen dalam waktu sekitar 90 detik.
Yang akan Anda pelajari
Dalam codelab ini, Anda akan mempelajari cara:
- Membuat bucket Google Cloud Storage untuk cluster Anda
- Buat Cluster Dataproc dengan Jupyter dan Gateway Komponen,
- Mengakses UI web JupyterLab di Dataproc
- Membuat Notebook menggunakan konektor Spark BigQuery Storage
- Menjalankan tugas Spark dan merencanakan hasilnya.
Total biaya untuk menjalankan lab ini di Google Cloud adalah sekitar $1. Detail selengkapnya tentang harga Cloud Dataproc dapat ditemukan di sini.
2. Membuat project
Login ke konsol Google Cloud Platform di console.cloud.google.com dan buat project baru:
Selanjutnya, Anda harus mengaktifkan penagihan di Cloud Console untuk menggunakan resource Google Cloud.
Menjalankan operasi dalam codelab ini seharusnya tidak menghabiskan biaya lebih dari beberapa dolar, tetapi bisa lebih mahal jika Anda memutuskan untuk menggunakan lebih banyak resource atau jika Anda membiarkannya berjalan. Bagian terakhir dari codelab ini akan memandu Anda menyelesaikan project.
Pengguna baru Google Cloud Platform memenuhi syarat untuk mendapatkan uji coba gratis senilai$300.
3. Menyiapkan lingkungan Anda
Pertama, buka Cloud Shell dengan mengklik tombol di pojok kanan atas Cloud Console:
Setelah Cloud Shell dimuat, jalankan perintah berikut untuk menetapkan project ID dari langkah sebelumnya**:**
gcloud config set project <project_id>
Project ID juga dapat ditemukan dengan mengklik project Anda di kiri atas konsol cloud:
Selanjutnya, aktifkan Dataproc, Compute Engine, dan BigQuery Storage API.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
Atau, Anda dapat melakukannya di Cloud Console. Klik ikon menu di kiri atas layar.
Pilih Pengelola API dari menu drop-down.
Klik Enable APIs and Services.
Telusuri dan aktifkan API berikut:
- Compute Engine API
- Dataproc API
- BigQuery API
- BigQuery Storage API
4. Membuat bucket GCS
Buat bucket Google Cloud Storage di region yang paling dekat dengan data Anda dan beri nama unik.
Ini akan digunakan untuk cluster Dataproc.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
Anda akan melihat output berikut
Creating gs://<your-bucket-name>/...
5. Membuat Cluster Dataproc dengan Jupyter & Gateway Komponen
Membuat cluster Anda
Menetapkan variabel env untuk cluster Anda
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
Kemudian, jalankan perintah gcloud ini untuk membuat cluster dengan semua komponen yang diperlukan agar dapat digunakan dengan Jupyter di cluster Anda.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
Anda akan melihat output berikut saat cluster sedang dibuat
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
Perlu waktu sekitar 90 detik untuk membuat cluster dan setelah siap, Anda akan dapat mengakses cluster dari UI konsol Cloud Dataproc.
Sambil menunggu, Anda dapat melanjutkan bacaan di bawah untuk mempelajari lebih lanjut flag yang digunakan dalam perintah gcloud.
Output Anda akan terlihat seperti berikut setelah cluster dibuat:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
Tanda yang digunakan dalam perintah gcloud dataproc create
Berikut adalah perincian tanda yang digunakan dalam perintah gcloud dataproc create
--region=${REGION}
Menentukan region dan zona tempat cluster akan dibuat. Anda dapat melihat daftar wilayah yang tersedia di sini.
--image-version=1.4
Versi image yang akan digunakan di cluster Anda. Anda dapat melihat daftar versi yang tersedia di sini.
--bucket=${BUCKET_NAME}
Tentukan bucket Google Cloud Storage yang Anda buat sebelumnya untuk digunakan pada cluster. Jika tidak menyediakan bucket GCS, bucket tersebut akan dibuat untuk Anda.
Di sini, notebook juga akan disimpan meskipun Anda menghapus cluster karena bucket GCS tidak dihapus.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
Jenis mesin yang akan digunakan untuk cluster Dataproc Anda. Anda dapat melihat daftar jenis mesin yang tersedia di sini.
Secara default, 1 node master dan 2 worker node akan dibuat jika Anda tidak menyetel flag –num-worker
--optional-components=ANACONDA,JUPYTER
Menetapkan nilai ini untuk komponen opsional akan menginstal semua library yang diperlukan untuk Jupyter dan Anaconda (yang diperlukan untuk notebook Jupyter) di cluster Anda.
--enable-component-gateway
Mengaktifkan Component Gateway akan membuat link App Engine menggunakan Apache Knox dan Inverting Proxy yang memberikan akses yang mudah, aman, serta terautentikasi ke antarmuka web Jupyter dan JupyterLab yang berarti Anda tidak perlu lagi membuat tunnel SSH.
Kode ini juga akan membuat link untuk alat lain di cluster, termasuk Yarn Resource Manager dan Spark History Server yang berguna untuk melihat performa tugas dan pola penggunaan cluster Anda.
6. Membuat notebook Apache Spark
Mengakses antarmuka web JupyterLab
Setelah cluster siap, Anda dapat menemukan link Gateway Komponen ke antarmuka web JupyterLab dengan membuka Cluster Dataproc - Konsol Cloud, mengklik cluster yang telah dibuat, lalu membuka tab Antarmuka Web.
Anda akan melihat bahwa Anda memiliki akses ke Jupyter yang merupakan antarmuka {i>notebook<i} klasik atau JupyterLab yang digambarkan sebagai UI generasi berikutnya untuk Project Jupyter.
Ada banyak fitur UI baru yang bagus di JupyterLab dan jika Anda baru menggunakan notebook atau mencari peningkatan terbaru, sebaiknya gunakan JupyterLab karena pada akhirnya akan menggantikan antarmuka Jupyter klasik sesuai dengan dokumen resmi.
Membuat notebook dengan kernel Python 3
Dari tab peluncur, klik ikon notebook Python 3 untuk membuat notebook dengan kernel Python 3 (bukan kernel PySpark) yang memungkinkan Anda mengonfigurasi SparkSession di notebook dan menyertakan spark-bigquery-connector yang diperlukan untuk menggunakan BigQuery Storage API.
Mengganti nama notebook
Klik kanan nama notebook di sidebar sebelah kiri atau navigasi atas dan ganti nama notebook menjadi "BigQuery Storage & Spark DataFrame.ipynb"
Menjalankan kode Spark di notebook
Di notebook ini, Anda akan menggunakan spark-bigquery-connector yang merupakan alat untuk membaca dan menulis data antara BigQuery dan Spark menggunakan BigQuery Storage API.
BigQuery Storage API menghadirkan peningkatan yang signifikan dalam mengakses data di BigQuery dengan menggunakan protokol berbasis RPC. Platform ini mendukung pembacaan dan penulisan data secara paralel serta berbagai format serialisasi seperti Apache Avro dan Apache Arrow. Pada tingkat tinggi, hal ini berarti peningkatan performa secara signifikan, terutama pada set data yang lebih besar.
Di sel pertama, periksa versi cluster Anda Scala sehingga Anda dapat menyertakan versi jar spark-bigquery-connector yang tepat.
Input [1]:
!scala -version
Output [1]: Buat sesi Spark dan sertakan paket konektor spark-bigquery-.
Jika versi Scala Anda adalah 2.11, gunakan paket berikut.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
Jika versi Scala Anda adalah 2.12, gunakan paket berikut.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
Input [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
Mengaktifkan repl.eagerEval
Ini akan menampilkan hasil DataFrame di setiap langkah tanpa perlu menampilkan df.show() dan juga meningkatkan pemformatan output.
Input [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
Membaca tabel BigQuery ke dalam Spark DataFrame
Membuat Spark DataFrame dengan membaca data dari set data BigQuery publik. Untuk itu, Anda akan menggunakan spark-bigquery-connector dan BigQuery Storage API untuk memuat data ke cluster Spark.
Buat Spark DataFrame dan muat data dari set data publik BigQuery untuk kunjungan halaman Wikipedia. Anda akan mendapati bahwa Anda tidak menjalankan kueri pada data saat menggunakan spark-bigquery-connector untuk memuat data ke Spark tempat pemrosesan data akan terjadi. Ketika dijalankan, kode ini tidak akan memuat tabel karena merupakan evaluasi lambat di Spark dan eksekusi akan terjadi di langkah berikutnya.
Input [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
Output [4]:
Pilih kolom yang wajib diisi dan terapkan filter menggunakan where()
yang merupakan alias untuk filter()
.
Ketika dijalankan, kode ini akan memicu tindakan Spark dan pada tahap ini data dibaca dari BigQuery Storage.
Input [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
Output [5]:
Kelompokkan menurut judul dan urutkan berdasarkan tayangan halaman untuk melihat halaman teratas
Input [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
Output [6]:
7. Menggunakan library Python di notebook
Anda dapat memanfaatkan berbagai library pemetaan yang tersedia di Python untuk memetakan output tugas Spark Anda.
Mengonversi Spark DataFrame ke DataFrame Pandas
Konversi Spark DataFrame ke Pandas DataFrame dan setel datehour sebagai indeks. Ini berguna jika Anda ingin bekerja dengan data secara langsung di Python dan memetakan data menggunakan banyak library pemetaan Python yang tersedia.
Input [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
Output [7]:
Merencanakan Dataframe Pandas
Impor library matplotlib yang diperlukan untuk menampilkan plot di notebook
Masukan [8]:
import matplotlib.pyplot as plt
Gunakan fungsi plot Pandas untuk membuat diagram garis dari DataFrame Pandas.
Masukan [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
Output [9]:
Memastikan notebook disimpan di GCS
Sekarang Anda seharusnya sudah menyiapkan notebook Jupyter pertama di cluster Dataproc. Beri nama notebook Anda yang akan otomatis disimpan ke bucket GCS yang digunakan saat membuat cluster.
Anda dapat memeriksanya menggunakan perintah gsutil ini di Cloud Shell
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
Anda akan melihat output berikut
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. Tips pengoptimalan - Meng-cache data dalam memori
Mungkin ada beberapa skenario di mana Anda ingin data berada dalam memori daripada membaca dari BigQuery Storage setiap saat.
Tugas ini akan membaca data dari BigQuery dan mengirim filter ke BigQuery. Kemudian, agregasi akan dikomputasi di Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Anda dapat memodifikasi tugas di atas untuk menyertakan cache tabel dan sekarang filter pada kolom wiki akan diterapkan di memori oleh Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Anda kemudian dapat memfilter bahasa wiki lain menggunakan data yang di-cache daripada membaca data dari penyimpanan BigQuery lagi dan karenanya akan berjalan jauh lebih cepat.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
Anda dapat menghapus {i>cache<i} dengan menjalankan
df_wiki_all.unpersist()
9. Notebook contoh untuk kasus penggunaan lainnya
Repo GitHub Cloud Dataproc menampilkan notebook Jupyter dengan pola Apache Spark umum untuk memuat data, menyimpan data, dan memetakan data Anda dengan berbagai produk dan alat open source Google Cloud Platform:
10. Pembersihan
Agar tidak menimbulkan tagihan yang tidak perlu pada akun GCP Anda setelah menyelesaikan panduan memulai ini:
- Hapus bucket Cloud Storage untuk lingkungan dan yang Anda buat
- Menghapus lingkungan Dataproc.
Jika membuat project hanya untuk codelab ini, Anda juga dapat menghapus project tersebut secara opsional:
- Di GCP Console, buka halaman Project.
- Dalam daftar project, pilih project yang ingin Anda hapus, lalu klik Delete.
- Pada kotak, ketik project ID, lalu klik Shut down untuk menghapus project.
Lisensi
Karya ini dilisensikan berdasarkan Lisensi Generik Creative Commons Attribution 3.0 dan Apache 2.0.