Apache Spark และ Jupyter Notebooks บน Cloud Dataproc

1. ภาพรวม

ห้องทดลองนี้จะครอบคลุมวิธีตั้งค่าและใช้ Apache Spark และสมุดบันทึก Jupyter บน Cloud Dataproc

สมุดบันทึก Jupyter ใช้กันอย่างแพร่หลายในการวิเคราะห์ข้อมูลเชิงสำรวจและสร้างโมเดลแมชชีนเลิร์นนิง เนื่องจากให้คุณเรียกใช้โค้ดแบบอินเทอร์แอกทีฟและดูผลลัพธ์ได้ทันที

อย่างไรก็ตาม การตั้งค่าและการใช้ Apache Spark และ Jupyter Notebooks อาจมีความซับซ้อน

b9ed855863c57d6.png

Cloud Dataproc ทำให้การดำเนินการนี้เป็นไปอย่างรวดเร็วและง่ายดายโดยให้คุณสร้างคลัสเตอร์ Dataproc ที่มี Apache Spark, คอมโพเนนต์ Jupyter และเกตเวย์คอมโพเนนต์ในเวลาประมาณ 90 วินาที

สิ่งที่คุณจะได้เรียนรู้

คุณจะได้เรียนรู้วิธีการต่อไปนี้ใน Codelab

  • สร้างที่เก็บข้อมูล Google Cloud Storage สำหรับคลัสเตอร์
  • สร้างคลัสเตอร์ Dataproc ที่มี Jupyter และเกตเวย์คอมโพเนนต์
  • เข้าถึง UI เว็บ JupyterLab บน Dataproc
  • สร้างสมุดบันทึกโดยใช้เครื่องมือเชื่อมต่อพื้นที่เก็บข้อมูล BigQuery ของ Spark
  • เรียกใช้งาน Spark และพล็อตผลลัพธ์

ค่าใช้จ่ายรวมในการเรียกใช้ห้องทดลองบน Google Cloud นี้อยู่ที่ประมาณ $1 ดูรายละเอียดทั้งหมดเกี่ยวกับราคาของ Cloud Dataproc ได้ที่นี่

2. การสร้างโปรเจ็กต์

ลงชื่อเข้าใช้คอนโซล Google Cloud Platform ที่ console.cloud.google.com แล้วสร้างโปรเจ็กต์ใหม่ดังนี้

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

ถัดไป คุณจะต้องเปิดใช้การเรียกเก็บเงินใน Cloud Console เพื่อใช้ทรัพยากร Google Cloud

การใช้งาน Codelab นี้น่าจะมีค่าใช้จ่ายไม่เกิน 2-3 ดอลลาร์ แต่อาจมากกว่านี้หากคุณตัดสินใจใช้ทรัพยากรมากขึ้นหรือปล่อยให้ทำงานต่อไป ส่วนสุดท้ายของ Codelab นี้จะแนะนำคุณเกี่ยวกับการล้างข้อมูลโปรเจ็กต์

ผู้ใช้ใหม่ของ Google Cloud Platform มีสิทธิ์รับช่วงทดลองใช้ฟรี$300

3. กำลังตั้งค่าสภาพแวดล้อมของคุณ

ก่อนอื่น ให้เปิด Cloud Shell โดยการคลิกปุ่มที่มุมขวาบนของ Cloud Console

a10c47ee6ca41c54.png

หลังจาก Cloud Shell โหลดแล้ว ให้เรียกใช้คำสั่งต่อไปนี้เพื่อตั้งค่ารหัสโปรเจ็กต์จากขั้นตอนก่อนหน้า****

gcloud config set project <project_id>

คุณยังดูรหัสโปรเจ็กต์ได้โดยคลิกโปรเจ็กต์ที่ด้านซ้ายบนของ Cloud Console

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

ถัดไป ให้เปิดใช้ Dataproc, Compute Engine และ BigQuery Storage API

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

หรือดำเนินการใน Cloud Console ก็ได้ คลิกไอคอนเมนูที่ด้านซ้ายบนของหน้าจอ

2bfc27ef9ba2ec7d.png

เลือกตัวจัดการ API จากเมนูแบบเลื่อนลง

408af5f32c4b7c25.png

คลิกเปิดใช้ API และบริการ

a9c0e84296a7ba5b.png

ค้นหาและเปิดใช้ API ต่อไปนี้

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. สร้างที่เก็บข้อมูล GCS

สร้างที่เก็บข้อมูล Google Cloud Storage ในภูมิภาคที่ใกล้กับข้อมูลของคุณมากที่สุดและตั้งชื่อที่ไม่ซ้ำกัน

ชื่อนี้จะใช้สำหรับคลัสเตอร์ Dataproc

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

คุณควรจะเห็นเอาต์พุตต่อไปนี้

Creating gs://<your-bucket-name>/...

5. สร้างคลัสเตอร์ Dataproc ด้วย Jupyter และ เกตเวย์คอมโพเนนต์

กำลังสร้างคลัสเตอร์

ตั้งค่าตัวแปร env สำหรับคลัสเตอร์

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

จากนั้นเรียกใช้คำสั่ง gcloud นี้เพื่อสร้างคลัสเตอร์ที่มีคอมโพเนนต์ที่จำเป็นทั้งหมดในการทำงานร่วมกับ Jupyter บนคลัสเตอร์

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

คุณควรเห็นเอาต์พุตต่อไปนี้ขณะกำลังสร้างคลัสเตอร์

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

การสร้างคลัสเตอร์ควรใช้เวลาประมาณ 90 วินาที และเมื่อพร้อม คุณจะเข้าถึงคลัสเตอร์ได้จาก UI ของ Dataproc Cloud Console

ระหว่างที่รอ คุณจะอ่านต่อได้ด้านล่างเพื่อดูข้อมูลเพิ่มเติมเกี่ยวกับแฟล็กที่ใช้ในคำสั่ง gcloud

คุณควรได้เอาต์พุตต่อไปนี้เมื่อสร้างคลัสเตอร์แล้ว

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

แฟล็กที่ใช้ในคำสั่งสร้าง gcloud dataproc

รายละเอียดแฟล็กที่ใช้ในคำสั่งสร้าง gcloud dataproc มีดังนี้

--region=${REGION}

ระบุภูมิภาคและโซนของที่จะสร้างคลัสเตอร์ ดูรายชื่อภูมิภาคที่พร้อมให้บริการที่นี่

--image-version=1.4

เวอร์ชันอิมเมจที่จะใช้ในคลัสเตอร์ ดูรายการเวอร์ชันที่พร้อมให้บริการที่นี่

--bucket=${BUCKET_NAME}

ระบุที่เก็บข้อมูล Google Cloud Storage ที่คุณสร้างไว้ก่อนหน้านี้เพื่อใช้กับคลัสเตอร์ หากคุณไม่ระบุที่เก็บข้อมูล GCS ระบบจะสร้างที่เก็บข้อมูลให้คุณ

อีกทั้งระบบจะบันทึกสมุดบันทึกของคุณไว้ที่นี่แม้ว่าคุณจะลบคลัสเตอร์ไปเนื่องจากที่เก็บข้อมูล GCS ยังไม่ถูกลบก็ตาม

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

ประเภทเครื่องที่ใช้สำหรับคลัสเตอร์ Dataproc ดูรายการประเภทเครื่องที่มีให้ใช้งานได้ที่นี่

โดยค่าเริ่มต้น ระบบจะสร้างโหนดหลัก 1 โหนดและโหนดผู้ปฏิบัติงาน 2 โหนดหากคุณไม่ได้ตั้งค่าแฟล็ก - จำนวนผู้ปฏิบัติงาน

--optional-components=ANACONDA,JUPYTER

การตั้งค่าเหล่านี้สำหรับคอมโพเนนต์ที่ไม่บังคับจะติดตั้งไลบรารีที่จำเป็นทั้งหมดสำหรับ Jupyter และ Anaconda (ซึ่งจำเป็นสำหรับสมุดบันทึก Jupyter) ในคลัสเตอร์

--enable-component-gateway

การเปิดใช้เกตเวย์คอมโพเนนต์จะสร้างลิงก์ App Engine โดยใช้ Apache Knox และ Inverting Proxy ซึ่งมอบการเข้าถึงที่ง่าย ปลอดภัย และผ่านการตรวจสอบสิทธิ์ไปยังอินเทอร์เฟซเว็บ Jupyter และ JupyterLab ซึ่งหมายความว่าคุณไม่จำเป็นต้องสร้างอุโมงค์ข้อมูล SSH อีกต่อไป

นอกจากนี้ ยังสร้างลิงก์สำหรับเครื่องมืออื่นๆ ในคลัสเตอร์ ซึ่งรวมถึง Yarn Resource Manager และ Spark History Server ซึ่งมีประโยชน์ในการดูประสิทธิภาพของงานและรูปแบบการใช้งานคลัสเตอร์

6. สร้างสมุดบันทึก Apache Spark

การเข้าถึงอินเทอร์เฟซเว็บ JupyterLab

เมื่อคลัสเตอร์พร้อมใช้งานแล้ว คุณจะค้นหาลิงก์เกตเวย์คอมโพเนนต์ไปยังอินเทอร์เฟซเว็บ JupyterLab ได้โดยไปที่คลัสเตอร์ Dataproc - Cloud Console คลิกคลัสเตอร์ที่คุณสร้างแล้วไปที่แท็บเว็บอินเทอร์เฟซ

afc40202d555de47.png

คุณจะเห็นว่ามีสิทธิ์เข้าถึง Jupyter ซึ่งเป็นอินเทอร์เฟซสมุดบันทึกแบบคลาสสิกหรือ JupyterLab ซึ่งอธิบายว่าเป็น UI รุ่นถัดไปสำหรับ Project Jupyter

มีฟีเจอร์ UI ใหม่ๆ ที่ยอดเยี่ยมมากมายใน JupyterLab ดังนั้นหากคุณเพิ่งเริ่มใช้สมุดบันทึกหรือกำลังมองหาการปรับปรุงล่าสุด ขอแนะนำให้ใช้ JupyterLab แทนอินเทอร์เฟซ Jupyter แบบคลาสสิกตามเอกสารอย่างเป็นทางการ

สร้างสมุดบันทึกด้วยเคอร์เนล Python 3

a463623f2ebf0518.png

จากแท็บ Launcher ให้คลิกไอคอนสมุดบันทึก Python 3 เพื่อสร้างสมุดบันทึกที่มีเคอร์เนล Python 3 (ไม่ใช่เคอร์เนล PySpark) ซึ่งให้คุณกำหนดค่า SparkSession ในสมุดบันทึกได้ และใส่ spark-bigquery-connector ที่จำเป็นสำหรับการใช้ BigQuery Storage API

เปลี่ยนชื่อสมุดบันทึก

196a3276ed07e1f3.png

คลิกขวาที่ชื่อสมุดบันทึกในแถบด้านข้างทางซ้ายหรือการนำทางด้านบน แล้วเปลี่ยนชื่อสมุดบันทึกเป็น "พื้นที่เก็บข้อมูลและ BigQuery และ Spark DataFrames.ipynb"

เรียกใช้โค้ด Spark ของคุณในสมุดบันทึก

fbac38062e5bb9cf.png

ในสมุดบันทึกนี้ คุณจะใช้ spark-bigquery-connector ซึ่งเป็นเครื่องมือสำหรับการอ่านและเขียนข้อมูลระหว่าง BigQuery และ Spark โดยใช้ BigQuery Storage API

BigQuery Storage API ปรับปรุงที่สำคัญในการเข้าถึงข้อมูลใน BigQuery โดยใช้โปรโตคอลที่ใช้ RPC โปรโตคอลนี้รองรับการอ่านและเขียนข้อมูลพร้อมกัน รวมถึงรูปแบบการซีเรียลไลซ์ต่างๆ เช่น Apache Avro และ Apache Arrow ในระดับสูง ผลลัพธ์นี้จะหมายถึงประสิทธิภาพที่ดีขึ้นอย่างมาก โดยเฉพาะอย่างยิ่งกับชุดข้อมูลขนาดใหญ่ขึ้น

ในเซลล์แรก ให้ตรวจสอบเวอร์ชัน Scala ของคลัสเตอร์เพื่อให้คุณรวม Jar spark-bigquery-connector เวอร์ชันที่ถูกต้องได้

อินพุต [1]:

!scala -version

เอาต์พุต [1]:f580e442576b8b1f.png สร้างเซสชัน Spark และรวมแพ็กเกจ spark-bigquery-connector

หากเวอร์ชัน Scala ของคุณคือ 2.11 ให้ใช้แพ็กเกจต่อไปนี้

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

หากเวอร์ชัน Scala ของคุณคือ 2.12 ให้ใช้แพ็กเกจต่อไปนี้

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

อินพุต [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

เปิดใช้ repl.eagerEval

การดำเนินการนี้จะแสดงผลลัพธ์ของ DataFrames ในแต่ละขั้นตอนโดยไม่ต้องแสดง df.show() ใหม่ และยังช่วยปรับปรุงการจัดรูปแบบของเอาต์พุตด้วย

อินพุต [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

อ่านตาราง BigQuery ใน Spark DataFrame

สร้าง Spark DataFrame โดยการอ่านข้อมูลจากชุดข้อมูล BigQuery สาธารณะ การดำเนินการนี้จะใช้ spark-bigquery-connector และ BigQuery Storage API เพื่อโหลดข้อมูลลงในคลัสเตอร์ Spark

สร้าง Spark DataFrame และโหลดข้อมูลจากชุดข้อมูลสาธารณะของ BigQuery สำหรับการเปิดดูหน้าเว็บ Wikipedia คุณจะเห็นว่าคุณไม่ได้เรียกใช้การค้นหาของข้อมูลเมื่อใช้ spark-bigquery-connector เพื่อโหลดข้อมูลลงใน Spark ซึ่งเป็นที่ที่มีการประมวลผลข้อมูลเกิดขึ้น เมื่อเรียกใช้โค้ดนี้ โค้ดจะไม่โหลดตารางจริงเนื่องจากเป็นการประเมินแบบ Lazy Loading ใน Spark และการดำเนินการจะเกิดขึ้นในขั้นตอนถัดไป

อินพุต [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

เอาต์พุต [4]:

c107a33f6fc30ca.png

เลือกคอลัมน์ที่จำเป็นและใช้ตัวกรองโดยใช้ where() ซึ่งเป็นชื่อแทนสำหรับ filter()

เมื่อเรียกใช้โค้ดนี้ จะทริกเกอร์การดำเนินการ Spark และอ่านข้อมูลจากพื้นที่เก็บข้อมูล BigQuery ในจุดนี้

อินพุต [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

เอาต์พุต [5]:

ad363cbe510d625a.png

จัดกลุ่มตามชื่อและลำดับตามการดูหน้าเว็บเพื่อดูหน้ายอดนิยม

อินพุต [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

เอาต์พุต [6]:f718abd05afc0f4.png

7. ใช้ไลบรารีการพล็อต Python ในสมุดบันทึก

คุณสามารถใช้ไลบรารีการพล็อตแบบต่างๆ ที่มีอยู่ใน Python เพื่อพล็อตเอาต์พุตของงาน Spark ได้

แปลง Spark DataFrame เป็น Pandas DataFrame

แปลง Spark DataFrame เป็น Pandas DataFrame และตั้งค่า datehours เป็นดัชนี ซึ่งจะเป็นประโยชน์หากคุณต้องการทำงานกับข้อมูลใน Python โดยตรงและพล็อตข้อมูลโดยใช้ไลบรารีการพล็อต Python ที่มีอยู่มากมาย

อินพุต [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

เอาต์พุต [7]:

3df2aaa2351f028d.png

การพล็อตเฟรมข้อมูลของ Pandas

นำเข้าไลบรารี matplotlib ที่จำเป็นเพื่อแสดงพล็อตในสมุดบันทึก

อินพุต [8]:

import matplotlib.pyplot as plt

ใช้ฟังก์ชันพล็อตแพนด้าเพื่อสร้างแผนภูมิเส้นจาก Pandas DataFrame

อินพุต [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

เอาต์พุต [9]:bade7042c3033594.png

ตรวจสอบว่าสมุดบันทึกได้รับการบันทึกใน GCS

ตอนนี้คุณควรจะมีสมุดบันทึก Jupyter ชุดแรกที่เริ่มใช้งานบนคลัสเตอร์ Dataproc แล้ว ตั้งชื่อสมุดบันทึก และระบบจะบันทึกสมุดบันทึกลงในที่เก็บข้อมูล GCS ที่ใช้เมื่อสร้างคลัสเตอร์โดยอัตโนมัติ

คุณตรวจสอบได้โดยใช้คำสั่ง gsutil นี้ใน Cloud Shell

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

คุณควรจะเห็นเอาต์พุตต่อไปนี้

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. เคล็ดลับการเพิ่มประสิทธิภาพ - แคชข้อมูลในหน่วยความจำ

อาจมีบางกรณีที่คุณต้องการข้อมูลไว้ในหน่วยความจำแทนที่จะอ่านจากพื้นที่เก็บข้อมูล BigQuery ทุกครั้ง

งานนี้จะอ่านข้อมูลจาก BigQuery และพุชตัวกรองไปยัง BigQuery จากนั้นการรวมจะได้รับการคำนวณใน Apache Spark

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

คุณสามารถแก้ไขงานด้านบนให้รวมแคชของตาราง และตอนนี้ Apache Spark จะนำตัวกรองในคอลัมน์ wiki ไปใช้ในหน่วยความจำ

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

คุณจึงสามารถกรองหาภาษาวิกิอื่นโดยใช้ข้อมูลที่แคชไว้แทนการอ่านข้อมูลจากพื้นที่เก็บข้อมูล BigQuery อีกครั้ง ซึ่งจะทำให้ทำงานได้รวดเร็วยิ่งขึ้น

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

คุณสามารถนำแคชออกได้โดยเรียกใช้

df_wiki_all.unpersist()

9. ตัวอย่างสมุดบันทึกสำหรับกรณีการใช้งานเพิ่มเติม

ที่เก็บ Cloud Dataproc GitHub ประกอบด้วยสมุดบันทึก Jupyter ที่มีรูปแบบ Apache Spark ทั่วไปสำหรับการโหลดข้อมูล บันทึกข้อมูล และพล็อตข้อมูลด้วยผลิตภัณฑ์ Google Cloud Platform และเครื่องมือโอเพนซอร์สต่างๆ ดังนี้

10. ล้างข้อมูล

เพื่อหลีกเลี่ยงการเรียกเก็บเงินที่ไม่จำเป็นในบัญชี GCP หลังจากการเริ่มต้นอย่างรวดเร็วนี้เสร็จสิ้นแล้ว ให้ทำดังนี้

  1. ลบที่เก็บข้อมูล Cloud Storage สำหรับสภาพแวดล้อมและที่คุณสร้างขึ้น
  2. ลบสภาพแวดล้อม Dataproc

หากสร้างโปรเจ็กต์สำหรับ Codelab นี้โดยเฉพาะ คุณจะเลือกลบโปรเจ็กต์ได้ด้วย โดยทำดังนี้

  1. ในคอนโซล GCP ให้ไปที่หน้าโปรเจ็กต์
  2. ในรายการโปรเจ็กต์ ให้เลือกโปรเจ็กต์ที่ต้องการลบ แล้วคลิกลบ
  3. ในช่อง ให้พิมพ์รหัสโปรเจ็กต์ แล้วคลิกปิดเครื่องเพื่อลบโปรเจ็กต์

ใบอนุญาต

ผลงานนี้ได้รับอนุญาตภายใต้ใบอนุญาตครีเอทีฟคอมมอนส์แบบระบุแหล่งที่มา 3.0 และใบอนุญาต Apache 2.0