Apache Spark และ Jupyter Notebooks บน Cloud Dataproc

1. ภาพรวม

แล็บนี้จะครอบคลุมวิธีตั้งค่าและใช้ Apache Spark และ Jupyter Notebook ใน Cloud Dataproc

สมุดบันทึก Jupyter มีการใช้งานอย่างแพร่หลายในการวิเคราะห์ข้อมูลเบื้องต้นและการสร้างโมเดลแมชชีนเลิร์นนิง เนื่องจากช่วยให้คุณเรียกใช้โค้ดแบบอินเทอร์แอกทีฟและดูผลลัพธ์ได้ทันที

อย่างไรก็ตาม การตั้งค่าและการใช้ Apache Spark และ Jupyter Notebook อาจซับซ้อน

b9ed855863c57d6.png

Cloud Dataproc ช่วยให้คุณทำสิ่งนี้ได้อย่างรวดเร็วและง่ายดายโดยให้คุณสร้างคลัสเตอร์ Dataproc ด้วย Apache Spark, คอมโพเนนต์ Jupyter และเกตเวย์คอมโพเนนต์ได้ภายในเวลาประมาณ 90 วินาที

สิ่งที่คุณจะได้เรียนรู้

ในโค้ดแล็บนี้ คุณจะได้เรียนรู้วิธีทำสิ่งต่อไปนี้

  • สร้าง Bucket ของ Google Cloud Storage สำหรับคลัสเตอร์
  • สร้างคลัสเตอร์ Dataproc ด้วย Jupyter และ Component Gateway
  • เข้าถึงเว็บ UI ของ JupyterLab ใน Dataproc
  • สร้าง Notebook โดยใช้ตัวเชื่อมต่อที่เก็บข้อมูล Spark BigQuery
  • เรียกใช้งาน Spark Job และพล็อตผลลัพธ์

ค่าใช้จ่ายทั้งหมดในการเรียกใช้ Lab นี้ใน Google Cloud อยู่ที่ประมาณ $1 ดูรายละเอียดทั้งหมดเกี่ยวกับราคาของ Cloud Dataproc ได้ที่นี่

2. การสร้างโปรเจ็กต์

ลงชื่อเข้าใช้คอนโซล Google Cloud Platform ที่ console.cloud.google.com แล้วสร้างโปรเจ็กต์ใหม่โดยทำดังนี้

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

จากนั้นคุณจะต้องเปิดใช้การเรียกเก็บเงินใน Cloud Console เพื่อใช้ทรัพยากรของ Google Cloud

การทำตาม Codelab นี้ไม่ควรมีค่าใช้จ่ายเกิน 2-3 ดอลลาร์ แต่ก็อาจมีค่าใช้จ่ายมากกว่านี้หากคุณตัดสินใจใช้ทรัพยากรเพิ่มเติมหรือปล่อยให้ทรัพยากรทำงานต่อไป ส่วนสุดท้ายของโค้ดแล็บนี้จะแนะนำวิธีล้างข้อมูลโปรเจ็กต์

ผู้ใช้ใหม่ของ Google Cloud Platform มีสิทธิ์รับช่วงทดลองใช้ฟรีมูลค่า$300

3. การตั้งค่าสภาพแวดล้อม

ก่อนอื่น ให้เปิด Cloud Shell โดยคลิกปุ่มที่มุมบนขวาของคอนโซลระบบคลาวด์

a10c47ee6ca41c54.png

หลังจากโหลด Cloud Shell แล้ว ให้เรียกใช้คำสั่งต่อไปนี้เพื่อตั้งค่ารหัสโปรเจ็กต์จากขั้นตอนก่อนหน้า**:**

gcloud config set project <project_id>

คุณยังดูรหัสโปรเจ็กต์ได้โดยคลิกโปรเจ็กต์ที่ด้านซ้ายบนของคอนโซลระบบคลาวด์

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

จากนั้นเปิดใช้ Dataproc, Compute Engine และ BigQuery Storage API

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

หรือจะทำใน Cloud Console ก็ได้ คลิกไอคอนเมนูที่ด้านซ้ายบนของหน้าจอ

2bfc27ef9ba2ec7d.png

เลือก API Manager จากเมนูแบบเลื่อนลง

408af5f32c4b7c25.png

คลิกเปิดใช้ API และบริการ

a9c0e84296a7ba5b.png

ค้นหาและเปิดใช้ API ต่อไปนี้

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. สร้างที่เก็บข้อมูล GCS

สร้าง Bucket ของ Google Cloud Storage ในภูมิภาคที่ใกล้กับข้อมูลของคุณมากที่สุดและตั้งชื่อที่ไม่ซ้ำกัน

ซึ่งจะใช้สำหรับคลัสเตอร์ Dataproc

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

คุณควรเห็นเอาต์พุตต่อไปนี้

Creating gs://<your-bucket-name>/...

5. สร้างคลัสเตอร์ Dataproc ด้วย Jupyter และ Component Gateway

การสร้างคลัสเตอร์

ตั้งค่าตัวแปรสภาพแวดล้อมสำหรับคลัสเตอร์

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

จากนั้นเรียกใช้คำสั่ง gcloud นี้เพื่อสร้างคลัสเตอร์ที่มีคอมโพเนนต์ที่จำเป็นทั้งหมดในการทำงานกับ Jupyter ในคลัสเตอร์

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

คุณควรเห็นเอาต์พุตต่อไปนี้ขณะที่ระบบสร้างคลัสเตอร์

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

การสร้างคลัสเตอร์จะใช้เวลาประมาณ 90 วินาที และเมื่อคลัสเตอร์พร้อมแล้ว คุณจะเข้าถึงคลัสเตอร์ได้จากUI ของคอนโซล Dataproc ในระบบคลาวด์

ในระหว่างรอ คุณสามารถอ่านต่อด้านล่างเพื่อดูข้อมูลเพิ่มเติมเกี่ยวกับ Flag ที่ใช้ในคำสั่ง gcloud

คุณควรเห็นเอาต์พุตต่อไปนี้เมื่อสร้างคลัสเตอร์แล้ว

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

Flag ที่ใช้ในคำสั่ง gcloud dataproc create

ต่อไปนี้คือรายละเอียดของแฟล็กที่ใช้ในคำสั่ง gcloud dataproc create

--region=${REGION}

ระบุภูมิภาคและโซนที่จะสร้างคลัสเตอร์ คุณดูรายชื่อภูมิภาคที่พร้อมให้บริการได้ที่นี่

--image-version=1.4

เวอร์ชันอิมเมจที่จะใช้ในคลัสเตอร์ คุณดูรายการเวอร์ชันที่พร้อมให้บริการได้ที่นี่

--bucket=${BUCKET_NAME}

ระบุ Bucket ของ Google Cloud Storage ที่คุณสร้างไว้ก่อนหน้านี้เพื่อใช้กับคลัสเตอร์ หากคุณไม่ได้ระบุที่เก็บข้อมูล GCS ระบบจะสร้างให้คุณ

นอกจากนี้ ระบบจะบันทึก Notebook ของคุณไว้ที่นี่ด้วย แม้ว่าคุณจะลบคลาสเตอร์แล้วก็ตาม เนื่องจากระบบจะไม่ลบบัคเก็ต GCS

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

ประเภทเครื่องที่จะใช้สำหรับคลัสเตอร์ Dataproc ดูรายการประเภทเครื่องที่พร้อมใช้งานได้ที่นี่

โดยค่าเริ่มต้น ระบบจะสร้างโหนดหลัก 1 โหนดและโหนดผู้ปฏิบัติงาน 2 โหนดหากคุณไม่ได้ตั้งค่าแฟล็ก -num-workers

--optional-components=ANACONDA,JUPYTER

การตั้งค่าเหล่านี้สำหรับคอมโพเนนต์ที่ไม่บังคับจะติดตั้งไลบรารีที่จำเป็นทั้งหมดสำหรับ Jupyter และ Anaconda (ซึ่งจำเป็นสำหรับสมุดบันทึก Jupyter) ในคลัสเตอร์

--enable-component-gateway

การเปิดใช้ Component Gateway จะสร้างลิงก์ App Engine โดยใช้ Apache Knox และ Inverting Proxy ซึ่งช่วยให้เข้าถึงเว็บอินเทอร์เฟซของ Jupyter และ JupyterLab ได้อย่างง่ายดาย ปลอดภัย และมีการตรวจสอบสิทธิ์ ซึ่งหมายความว่าคุณไม่จำเป็นต้องสร้างอุโมงค์ SSH อีกต่อไป

นอกจากนี้ยังสร้างลิงก์สำหรับเครื่องมืออื่นๆ ในคลัสเตอร์ด้วย ซึ่งรวมถึง Yarn Resource Manager และ Spark History Server ซึ่งมีประโยชน์ในการดูประสิทธิภาพของงานและรูปแบบการใช้งานคลัสเตอร์

6. สร้าง Notebook ของ Apache Spark

การเข้าถึงอินเทอร์เฟซเว็บของ JupyterLab

เมื่อคลัสเตอร์พร้อมใช้งานแล้ว คุณจะดูลิงก์ Component Gateway ไปยังอินเทอร์เฟซเว็บ JupyterLab ได้โดยไปที่คลัสเตอร์ Dataproc - คอนโซล Cloud คลิกคลัสเตอร์ที่คุณสร้างขึ้น แล้วไปที่แท็บอินเทอร์เฟซเว็บ

afc40202d555de47.png

คุณจะเห็นว่าคุณมีสิทธิ์เข้าถึง Jupyter ซึ่งเป็นอินเทอร์เฟซสมุดบันทึกแบบคลาสสิก หรือ JupyterLab ซึ่งอธิบายว่าเป็น UI รุ่นถัดไปสำหรับโปรเจ็กต์ Jupyter

JupyterLab มีฟีเจอร์ UI ใหม่ๆ ที่ยอดเยี่ยมมากมาย ดังนั้นหากคุณเพิ่งเริ่มใช้ Notebook หรือกำลังมองหาการปรับปรุงล่าสุด เราขอแนะนำให้ใช้ JupyterLab เนื่องจากในที่สุดแล้ว JupyterLab จะมาแทนที่อินเทอร์เฟซ Jupyter แบบคลาสสิกตามเอกสารอย่างเป็นทางการ

สร้าง Notebook ด้วยเคอร์เนล Python 3

a463623f2ebf0518.png

จากแท็บ Launcher ให้คลิกไอคอน Notebook ของ Python 3 เพื่อสร้าง Notebook ที่มีเคอร์เนล Python 3 (ไม่ใช่เคอร์เนล PySpark) ซึ่งจะช่วยให้คุณกำหนดค่า SparkSession ใน Notebook และรวม spark-bigquery-connector ที่จำเป็นต่อการใช้ BigQuery Storage API ได้

เปลี่ยนชื่อ Notebook

196a3276ed07e1f3.png

คลิกขวาที่ชื่อ Notebook ในแถบด้านข้างทางด้านซ้ายหรือการนำทางด้านบน แล้วเปลี่ยนชื่อ Notebook เป็น "BigQuery Storage & Spark DataFrames.ipynb"

เรียกใช้โค้ด Spark ใน Notebook

fbac38062e5bb9cf.png

ใน Notebook นี้ คุณจะใช้ spark-bigquery-connector ซึ่งเป็นเครื่องมือสำหรับอ่านและเขียนข้อมูลระหว่าง BigQuery กับ Spark โดยใช้ BigQuery Storage API

BigQuery Storage API ช่วยปรับปรุงการเข้าถึงข้อมูลใน BigQuery ได้อย่างมากโดยใช้โปรโตคอลที่อิงตาม RPC โดยรองรับการอ่านและเขียนข้อมูลแบบขนาน รวมถึงรูปแบบการซีเรียลไลซ์ต่างๆ เช่น Apache Avro และ Apache Arrow ในระดับสูง การดำเนินการนี้จะส่งผลให้ประสิทธิภาพดีขึ้นอย่างมาก โดยเฉพาะในชุดข้อมูลขนาดใหญ่

ในเซลล์แรก ให้ตรวจสอบเวอร์ชัน Scala ของคลัสเตอร์เพื่อให้คุณรวมไฟล์ spark-bigquery-connector jar เวอร์ชันที่ถูกต้องได้

อินพุต [1]:

!scala -version

เอาต์พุต [1]:f580e442576b8b1f.png สร้างเซสชัน Spark และรวมแพ็กเกจ spark-bigquery-connector

หากใช้ Scala เวอร์ชัน 2.11 ให้ใช้แพ็กเกจต่อไปนี้

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

หาก Scala เวอร์ชันของคุณคือ 2.12 ให้ใช้แพ็กเกจต่อไปนี้

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

อินพุต [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

เปิดใช้ repl.eagerEval

ซึ่งจะแสดงผลลัพธ์ของ DataFrame ในแต่ละขั้นตอนโดยไม่ต้องแสดง df.show() และยังปรับปรุงการจัดรูปแบบเอาต์พุตด้วย

อินพุต [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

อ่านตาราง BigQuery ลงใน Spark DataFrame

สร้าง Spark DataFrame โดยการอ่านข้อมูลจากชุดข้อมูล BigQuery สาธารณะ ซึ่งจะใช้ spark-bigquery-connector และ BigQuery Storage API เพื่อโหลดข้อมูลลงในคลัสเตอร์ Spark

สร้าง Spark DataFrame และโหลดข้อมูลจากชุดข้อมูลสาธารณะ BigQuery สำหรับการดูหน้า Wikipedia คุณจะเห็นว่าคุณไม่ได้เรียกใช้การค้นหาในข้อมูลเนื่องจากคุณใช้ spark-bigquery-connector เพื่อโหลดข้อมูลลงใน Spark ซึ่งจะมีการประมวลผลข้อมูล เมื่อเรียกใช้โค้ดนี้ ระบบจะไม่โหลดตารางจริงเนื่องจากเป็นการประเมินแบบเลซีใน Spark และการดำเนินการจะเกิดขึ้นในขั้นตอนถัดไป

อินพุต [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

เอาต์พุต [4]:

c107a33f6fc30ca.png

เลือกคอลัมน์ที่ต้องการและใช้ตัวกรองโดยใช้ where() ซึ่งเป็นชื่อแทนของ filter()

เมื่อเรียกใช้โค้ดนี้ ระบบจะทริกเกอร์การดำเนินการ Spark และอ่านข้อมูลจาก BigQuery Storage ณ จุดนี้

อินพุต [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

เอาต์พุต [5]:

ad363cbe510d625a.png

จัดกลุ่มตามชื่อและจัดเรียงตามการดูหน้าเว็บเพื่อดูหน้าเว็บยอดนิยม

อินพุต [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

เอาต์พุต [6]:f718abd05afc0f4.png

7. ใช้ไลบรารีการพล็อต Python ใน Notebook

คุณสามารถใช้ประโยชน์จากไลบรารีการพล็อตต่างๆ ที่มีอยู่ใน Python เพื่อพล็อตเอาต์พุตของงาน Spark ได้

แปลง Spark DataFrame เป็น Pandas DataFrame

แปลง Spark DataFrame เป็น Pandas DataFrame และตั้งค่า datehour เป็นดัชนี ซึ่งจะเป็นประโยชน์หากคุณต้องการทำงานกับข้อมูลใน Python โดยตรงและพล็อตข้อมูลโดยใช้ไลบรารีการพล็อต Python ที่มีอยู่มากมาย

อินพุต [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

เอาต์พุต [7]:

3df2aaa2351f028d.png

การพล็อต Pandas Dataframe

นำเข้าไลบรารี matplotlib ซึ่งจำเป็นต่อการแสดงพล็อตใน Notebook

อินพุต [8]:

import matplotlib.pyplot as plt

ใช้ฟังก์ชันการพล็อตของ Pandas เพื่อสร้างแผนภูมิเส้นจาก Pandas DataFrame

อินพุต [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

เอาต์พุต [9]:bade7042c3033594.png

ตรวจสอบว่าได้บันทึก Notebook ใน GCS แล้ว

ตอนนี้คุณควรมีสมุดบันทึก Jupyter แรกที่ทำงานบนคลัสเตอร์ Dataproc ตั้งชื่อ Notebook แล้วระบบจะบันทึก Notebook ลงในที่เก็บข้อมูล GCS ที่ใช้เมื่อสร้างคลัสเตอร์โดยอัตโนมัติ

คุณตรวจสอบได้โดยใช้คำสั่ง gsutil นี้ใน Cloud Shell

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

คุณควรเห็นเอาต์พุตต่อไปนี้

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. เคล็ดลับการเพิ่มประสิทธิภาพ - แคชข้อมูลในหน่วยความจำ

อาจมีบางกรณีที่คุณต้องการให้ข้อมูลอยู่ในหน่วยความจำแทนที่จะอ่านจากที่เก็บข้อมูล BigQuery ทุกครั้ง

งานนี้จะอ่านข้อมูลจาก BigQuery และส่งตัวกรองไปยัง BigQuery จากนั้นระบบจะคำนวณการรวบรวมข้อมูลใน Apache Spark

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

คุณสามารถแก้ไขงานข้างต้นให้รวมแคชของตารางได้ และตอนนี้ Apache Spark จะใช้ตัวกรองในคอลัมน์วิกิในหน่วยความจำ

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

จากนั้นคุณจะกรองภาษาของวิกิอื่นได้โดยใช้ข้อมูลที่แคชไว้แทนที่จะอ่านข้อมูลจากพื้นที่เก็บข้อมูล BigQuery อีกครั้ง จึงทำให้การทำงานเร็วขึ้นมาก

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

คุณนำแคชออกได้โดยเรียกใช้

df_wiki_all.unpersist()

9. ตัวอย่างสมุดบันทึกสำหรับกรณีการใช้งานเพิ่มเติม

ที่เก็บ GitHub ของ Cloud Dataproc มีสมุดบันทึก Jupyter ที่มีรูปแบบ Apache Spark ทั่วไปสำหรับการโหลดข้อมูล การบันทึกข้อมูล และการพล็อตข้อมูลด้วยผลิตภัณฑ์ Google Cloud Platform และเครื่องมือโอเพนซอร์สต่างๆ

10. ล้างข้อมูล

โปรดดำเนินการดังนี้เพื่อเลี่ยงไม่ให้เกิดการเรียกเก็บเงินที่ไม่จำเป็นกับบัญชี GCP หลังจากที่ทำตามการเริ่มต้นอย่างรวดเร็วนี้เสร็จแล้ว

  1. ลบ Bucket ของ Cloud Storage สำหรับสภาพแวดล้อมและที่คุณสร้างไว้
  2. ลบสภาพแวดล้อม Dataproc

หากสร้างโปรเจ็กต์เพื่อใช้กับ Codelab นี้โดยเฉพาะ คุณจะลบโปรเจ็กต์ได้ด้วย (ไม่บังคับ) โดยทำดังนี้

  1. ในคอนโซล GCP ให้ไปที่หน้าโปรเจ็กต์
  2. ในรายการโปรเจ็กต์ ให้เลือกโปรเจ็กต์ที่ต้องการลบ แล้วคลิกลบ
  3. พิมพ์รหัสโปรเจ็กต์ในช่อง แล้วคลิกปิดเพื่อลบโปรเจ็กต์

ใบอนุญาต

ผลงานนี้ได้รับอนุญาตภายใต้สัญญาอนุญาตครีเอทีฟคอมมอนส์แบบระบุแหล่งที่มา 3.0 ทั่วไป และสัญญาอนุญาต Apache 2.0