1. ภาพรวม
ห้องทดลองนี้จะครอบคลุมวิธีตั้งค่าและใช้ Apache Spark และสมุดบันทึก Jupyter บน Cloud Dataproc
สมุดบันทึก Jupyter ใช้กันอย่างแพร่หลายในการวิเคราะห์ข้อมูลเชิงสำรวจและสร้างโมเดลแมชชีนเลิร์นนิง เนื่องจากให้คุณเรียกใช้โค้ดแบบอินเทอร์แอกทีฟและดูผลลัพธ์ได้ทันที
อย่างไรก็ตาม การตั้งค่าและการใช้ Apache Spark และ Jupyter Notebooks อาจมีความซับซ้อน
Cloud Dataproc ทำให้การดำเนินการนี้เป็นไปอย่างรวดเร็วและง่ายดายโดยให้คุณสร้างคลัสเตอร์ Dataproc ที่มี Apache Spark, คอมโพเนนต์ Jupyter และเกตเวย์คอมโพเนนต์ในเวลาประมาณ 90 วินาที
สิ่งที่คุณจะได้เรียนรู้
คุณจะได้เรียนรู้วิธีการต่อไปนี้ใน Codelab
- สร้างที่เก็บข้อมูล Google Cloud Storage สำหรับคลัสเตอร์
- สร้างคลัสเตอร์ Dataproc ที่มี Jupyter และเกตเวย์คอมโพเนนต์
- เข้าถึง UI เว็บ JupyterLab บน Dataproc
- สร้างสมุดบันทึกโดยใช้เครื่องมือเชื่อมต่อพื้นที่เก็บข้อมูล BigQuery ของ Spark
- เรียกใช้งาน Spark และพล็อตผลลัพธ์
ค่าใช้จ่ายรวมในการเรียกใช้ห้องทดลองบน Google Cloud นี้อยู่ที่ประมาณ $1 ดูรายละเอียดทั้งหมดเกี่ยวกับราคาของ Cloud Dataproc ได้ที่นี่
2. การสร้างโปรเจ็กต์
ลงชื่อเข้าใช้คอนโซล Google Cloud Platform ที่ console.cloud.google.com แล้วสร้างโปรเจ็กต์ใหม่ดังนี้
ถัดไป คุณจะต้องเปิดใช้การเรียกเก็บเงินใน Cloud Console เพื่อใช้ทรัพยากร Google Cloud
การใช้งาน Codelab นี้น่าจะมีค่าใช้จ่ายไม่เกิน 2-3 ดอลลาร์ แต่อาจมากกว่านี้หากคุณตัดสินใจใช้ทรัพยากรมากขึ้นหรือปล่อยให้ทำงานต่อไป ส่วนสุดท้ายของ Codelab นี้จะแนะนำคุณเกี่ยวกับการล้างข้อมูลโปรเจ็กต์
ผู้ใช้ใหม่ของ Google Cloud Platform มีสิทธิ์รับช่วงทดลองใช้ฟรี$300
3. กำลังตั้งค่าสภาพแวดล้อมของคุณ
ก่อนอื่น ให้เปิด Cloud Shell โดยการคลิกปุ่มที่มุมขวาบนของ Cloud Console
หลังจาก Cloud Shell โหลดแล้ว ให้เรียกใช้คำสั่งต่อไปนี้เพื่อตั้งค่ารหัสโปรเจ็กต์จากขั้นตอนก่อนหน้า****
gcloud config set project <project_id>
คุณยังดูรหัสโปรเจ็กต์ได้โดยคลิกโปรเจ็กต์ที่ด้านซ้ายบนของ Cloud Console
ถัดไป ให้เปิดใช้ Dataproc, Compute Engine และ BigQuery Storage API
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
หรือดำเนินการใน Cloud Console ก็ได้ คลิกไอคอนเมนูที่ด้านซ้ายบนของหน้าจอ
เลือกตัวจัดการ API จากเมนูแบบเลื่อนลง
คลิกเปิดใช้ API และบริการ
ค้นหาและเปิดใช้ API ต่อไปนี้
- Compute Engine API
- Dataproc API
- BigQuery API
- BigQuery Storage API
4. สร้างที่เก็บข้อมูล GCS
สร้างที่เก็บข้อมูล Google Cloud Storage ในภูมิภาคที่ใกล้กับข้อมูลของคุณมากที่สุดและตั้งชื่อที่ไม่ซ้ำกัน
ชื่อนี้จะใช้สำหรับคลัสเตอร์ Dataproc
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
คุณควรจะเห็นเอาต์พุตต่อไปนี้
Creating gs://<your-bucket-name>/...
5. สร้างคลัสเตอร์ Dataproc ด้วย Jupyter และ เกตเวย์คอมโพเนนต์
กำลังสร้างคลัสเตอร์
ตั้งค่าตัวแปร env สำหรับคลัสเตอร์
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
จากนั้นเรียกใช้คำสั่ง gcloud นี้เพื่อสร้างคลัสเตอร์ที่มีคอมโพเนนต์ที่จำเป็นทั้งหมดในการทำงานร่วมกับ Jupyter บนคลัสเตอร์
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
คุณควรเห็นเอาต์พุตต่อไปนี้ขณะกำลังสร้างคลัสเตอร์
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
การสร้างคลัสเตอร์ควรใช้เวลาประมาณ 90 วินาที และเมื่อพร้อม คุณจะเข้าถึงคลัสเตอร์ได้จาก UI ของ Dataproc Cloud Console
ระหว่างที่รอ คุณจะอ่านต่อได้ด้านล่างเพื่อดูข้อมูลเพิ่มเติมเกี่ยวกับแฟล็กที่ใช้ในคำสั่ง gcloud
คุณควรได้เอาต์พุตต่อไปนี้เมื่อสร้างคลัสเตอร์แล้ว
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
แฟล็กที่ใช้ในคำสั่งสร้าง gcloud dataproc
รายละเอียดแฟล็กที่ใช้ในคำสั่งสร้าง gcloud dataproc มีดังนี้
--region=${REGION}
ระบุภูมิภาคและโซนของที่จะสร้างคลัสเตอร์ ดูรายชื่อภูมิภาคที่พร้อมให้บริการที่นี่
--image-version=1.4
เวอร์ชันอิมเมจที่จะใช้ในคลัสเตอร์ ดูรายการเวอร์ชันที่พร้อมให้บริการที่นี่
--bucket=${BUCKET_NAME}
ระบุที่เก็บข้อมูล Google Cloud Storage ที่คุณสร้างไว้ก่อนหน้านี้เพื่อใช้กับคลัสเตอร์ หากคุณไม่ระบุที่เก็บข้อมูล GCS ระบบจะสร้างที่เก็บข้อมูลให้คุณ
อีกทั้งระบบจะบันทึกสมุดบันทึกของคุณไว้ที่นี่แม้ว่าคุณจะลบคลัสเตอร์ไปเนื่องจากที่เก็บข้อมูล GCS ยังไม่ถูกลบก็ตาม
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
ประเภทเครื่องที่ใช้สำหรับคลัสเตอร์ Dataproc ดูรายการประเภทเครื่องที่มีให้ใช้งานได้ที่นี่
โดยค่าเริ่มต้น ระบบจะสร้างโหนดหลัก 1 โหนดและโหนดผู้ปฏิบัติงาน 2 โหนดหากคุณไม่ได้ตั้งค่าแฟล็ก - จำนวนผู้ปฏิบัติงาน
--optional-components=ANACONDA,JUPYTER
การตั้งค่าเหล่านี้สำหรับคอมโพเนนต์ที่ไม่บังคับจะติดตั้งไลบรารีที่จำเป็นทั้งหมดสำหรับ Jupyter และ Anaconda (ซึ่งจำเป็นสำหรับสมุดบันทึก Jupyter) ในคลัสเตอร์
--enable-component-gateway
การเปิดใช้เกตเวย์คอมโพเนนต์จะสร้างลิงก์ App Engine โดยใช้ Apache Knox และ Inverting Proxy ซึ่งมอบการเข้าถึงที่ง่าย ปลอดภัย และผ่านการตรวจสอบสิทธิ์ไปยังอินเทอร์เฟซเว็บ Jupyter และ JupyterLab ซึ่งหมายความว่าคุณไม่จำเป็นต้องสร้างอุโมงค์ข้อมูล SSH อีกต่อไป
นอกจากนี้ ยังสร้างลิงก์สำหรับเครื่องมืออื่นๆ ในคลัสเตอร์ ซึ่งรวมถึง Yarn Resource Manager และ Spark History Server ซึ่งมีประโยชน์ในการดูประสิทธิภาพของงานและรูปแบบการใช้งานคลัสเตอร์
6. สร้างสมุดบันทึก Apache Spark
การเข้าถึงอินเทอร์เฟซเว็บ JupyterLab
เมื่อคลัสเตอร์พร้อมใช้งานแล้ว คุณจะค้นหาลิงก์เกตเวย์คอมโพเนนต์ไปยังอินเทอร์เฟซเว็บ JupyterLab ได้โดยไปที่คลัสเตอร์ Dataproc - Cloud Console คลิกคลัสเตอร์ที่คุณสร้างแล้วไปที่แท็บเว็บอินเทอร์เฟซ
คุณจะเห็นว่ามีสิทธิ์เข้าถึง Jupyter ซึ่งเป็นอินเทอร์เฟซสมุดบันทึกแบบคลาสสิกหรือ JupyterLab ซึ่งอธิบายว่าเป็น UI รุ่นถัดไปสำหรับ Project Jupyter
มีฟีเจอร์ UI ใหม่ๆ ที่ยอดเยี่ยมมากมายใน JupyterLab ดังนั้นหากคุณเพิ่งเริ่มใช้สมุดบันทึกหรือกำลังมองหาการปรับปรุงล่าสุด ขอแนะนำให้ใช้ JupyterLab แทนอินเทอร์เฟซ Jupyter แบบคลาสสิกตามเอกสารอย่างเป็นทางการ
สร้างสมุดบันทึกด้วยเคอร์เนล Python 3
จากแท็บ Launcher ให้คลิกไอคอนสมุดบันทึก Python 3 เพื่อสร้างสมุดบันทึกที่มีเคอร์เนล Python 3 (ไม่ใช่เคอร์เนล PySpark) ซึ่งให้คุณกำหนดค่า SparkSession ในสมุดบันทึกได้ และใส่ spark-bigquery-connector ที่จำเป็นสำหรับการใช้ BigQuery Storage API
เปลี่ยนชื่อสมุดบันทึก
คลิกขวาที่ชื่อสมุดบันทึกในแถบด้านข้างทางซ้ายหรือการนำทางด้านบน แล้วเปลี่ยนชื่อสมุดบันทึกเป็น "พื้นที่เก็บข้อมูลและ BigQuery และ Spark DataFrames.ipynb"
เรียกใช้โค้ด Spark ของคุณในสมุดบันทึก
ในสมุดบันทึกนี้ คุณจะใช้ spark-bigquery-connector ซึ่งเป็นเครื่องมือสำหรับการอ่านและเขียนข้อมูลระหว่าง BigQuery และ Spark โดยใช้ BigQuery Storage API
BigQuery Storage API ปรับปรุงที่สำคัญในการเข้าถึงข้อมูลใน BigQuery โดยใช้โปรโตคอลที่ใช้ RPC โปรโตคอลนี้รองรับการอ่านและเขียนข้อมูลพร้อมกัน รวมถึงรูปแบบการซีเรียลไลซ์ต่างๆ เช่น Apache Avro และ Apache Arrow ในระดับสูง ผลลัพธ์นี้จะหมายถึงประสิทธิภาพที่ดีขึ้นอย่างมาก โดยเฉพาะอย่างยิ่งกับชุดข้อมูลขนาดใหญ่ขึ้น
ในเซลล์แรก ให้ตรวจสอบเวอร์ชัน Scala ของคลัสเตอร์เพื่อให้คุณรวม Jar spark-bigquery-connector เวอร์ชันที่ถูกต้องได้
อินพุต [1]:
!scala -version
เอาต์พุต [1]: สร้างเซสชัน Spark และรวมแพ็กเกจ spark-bigquery-connector
หากเวอร์ชัน Scala ของคุณคือ 2.11 ให้ใช้แพ็กเกจต่อไปนี้
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
หากเวอร์ชัน Scala ของคุณคือ 2.12 ให้ใช้แพ็กเกจต่อไปนี้
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
อินพุต [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
เปิดใช้ repl.eagerEval
การดำเนินการนี้จะแสดงผลลัพธ์ของ DataFrames ในแต่ละขั้นตอนโดยไม่ต้องแสดง df.show() ใหม่ และยังช่วยปรับปรุงการจัดรูปแบบของเอาต์พุตด้วย
อินพุต [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
อ่านตาราง BigQuery ใน Spark DataFrame
สร้าง Spark DataFrame โดยการอ่านข้อมูลจากชุดข้อมูล BigQuery สาธารณะ การดำเนินการนี้จะใช้ spark-bigquery-connector และ BigQuery Storage API เพื่อโหลดข้อมูลลงในคลัสเตอร์ Spark
สร้าง Spark DataFrame และโหลดข้อมูลจากชุดข้อมูลสาธารณะของ BigQuery สำหรับการเปิดดูหน้าเว็บ Wikipedia คุณจะเห็นว่าคุณไม่ได้เรียกใช้การค้นหาของข้อมูลเมื่อใช้ spark-bigquery-connector เพื่อโหลดข้อมูลลงใน Spark ซึ่งเป็นที่ที่มีการประมวลผลข้อมูลเกิดขึ้น เมื่อเรียกใช้โค้ดนี้ โค้ดจะไม่โหลดตารางจริงเนื่องจากเป็นการประเมินแบบ Lazy Loading ใน Spark และการดำเนินการจะเกิดขึ้นในขั้นตอนถัดไป
อินพุต [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
เอาต์พุต [4]:
เลือกคอลัมน์ที่จำเป็นและใช้ตัวกรองโดยใช้ where()
ซึ่งเป็นชื่อแทนสำหรับ filter()
เมื่อเรียกใช้โค้ดนี้ จะทริกเกอร์การดำเนินการ Spark และอ่านข้อมูลจากพื้นที่เก็บข้อมูล BigQuery ในจุดนี้
อินพุต [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
เอาต์พุต [5]:
จัดกลุ่มตามชื่อและลำดับตามการดูหน้าเว็บเพื่อดูหน้ายอดนิยม
อินพุต [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
เอาต์พุต [6]:
7. ใช้ไลบรารีการพล็อต Python ในสมุดบันทึก
คุณสามารถใช้ไลบรารีการพล็อตแบบต่างๆ ที่มีอยู่ใน Python เพื่อพล็อตเอาต์พุตของงาน Spark ได้
แปลง Spark DataFrame เป็น Pandas DataFrame
แปลง Spark DataFrame เป็น Pandas DataFrame และตั้งค่า datehours เป็นดัชนี ซึ่งจะเป็นประโยชน์หากคุณต้องการทำงานกับข้อมูลใน Python โดยตรงและพล็อตข้อมูลโดยใช้ไลบรารีการพล็อต Python ที่มีอยู่มากมาย
อินพุต [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
เอาต์พุต [7]:
การพล็อตเฟรมข้อมูลของ Pandas
นำเข้าไลบรารี matplotlib ที่จำเป็นเพื่อแสดงพล็อตในสมุดบันทึก
อินพุต [8]:
import matplotlib.pyplot as plt
ใช้ฟังก์ชันพล็อตแพนด้าเพื่อสร้างแผนภูมิเส้นจาก Pandas DataFrame
อินพุต [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
เอาต์พุต [9]:
ตรวจสอบว่าสมุดบันทึกได้รับการบันทึกใน GCS
ตอนนี้คุณควรจะมีสมุดบันทึก Jupyter ชุดแรกที่เริ่มใช้งานบนคลัสเตอร์ Dataproc แล้ว ตั้งชื่อสมุดบันทึก และระบบจะบันทึกสมุดบันทึกลงในที่เก็บข้อมูล GCS ที่ใช้เมื่อสร้างคลัสเตอร์โดยอัตโนมัติ
คุณตรวจสอบได้โดยใช้คำสั่ง gsutil นี้ใน Cloud Shell
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
คุณควรจะเห็นเอาต์พุตต่อไปนี้
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. เคล็ดลับการเพิ่มประสิทธิภาพ - แคชข้อมูลในหน่วยความจำ
อาจมีบางกรณีที่คุณต้องการข้อมูลไว้ในหน่วยความจำแทนที่จะอ่านจากพื้นที่เก็บข้อมูล BigQuery ทุกครั้ง
งานนี้จะอ่านข้อมูลจาก BigQuery และพุชตัวกรองไปยัง BigQuery จากนั้นการรวมจะได้รับการคำนวณใน Apache Spark
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
คุณสามารถแก้ไขงานด้านบนให้รวมแคชของตาราง และตอนนี้ Apache Spark จะนำตัวกรองในคอลัมน์ wiki ไปใช้ในหน่วยความจำ
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
คุณจึงสามารถกรองหาภาษาวิกิอื่นโดยใช้ข้อมูลที่แคชไว้แทนการอ่านข้อมูลจากพื้นที่เก็บข้อมูล BigQuery อีกครั้ง ซึ่งจะทำให้ทำงานได้รวดเร็วยิ่งขึ้น
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
คุณสามารถนำแคชออกได้โดยเรียกใช้
df_wiki_all.unpersist()
9. ตัวอย่างสมุดบันทึกสำหรับกรณีการใช้งานเพิ่มเติม
ที่เก็บ Cloud Dataproc GitHub ประกอบด้วยสมุดบันทึก Jupyter ที่มีรูปแบบ Apache Spark ทั่วไปสำหรับการโหลดข้อมูล บันทึกข้อมูล และพล็อตข้อมูลด้วยผลิตภัณฑ์ Google Cloud Platform และเครื่องมือโอเพนซอร์สต่างๆ ดังนี้
10. ล้างข้อมูล
เพื่อหลีกเลี่ยงการเรียกเก็บเงินที่ไม่จำเป็นในบัญชี GCP หลังจากการเริ่มต้นอย่างรวดเร็วนี้เสร็จสิ้นแล้ว ให้ทำดังนี้
- ลบที่เก็บข้อมูล Cloud Storage สำหรับสภาพแวดล้อมและที่คุณสร้างขึ้น
- ลบสภาพแวดล้อม Dataproc
หากสร้างโปรเจ็กต์สำหรับ Codelab นี้โดยเฉพาะ คุณจะเลือกลบโปรเจ็กต์ได้ด้วย โดยทำดังนี้
- ในคอนโซล GCP ให้ไปที่หน้าโปรเจ็กต์
- ในรายการโปรเจ็กต์ ให้เลือกโปรเจ็กต์ที่ต้องการลบ แล้วคลิกลบ
- ในช่อง ให้พิมพ์รหัสโปรเจ็กต์ แล้วคลิกปิดเครื่องเพื่อลบโปรเจ็กต์
ใบอนุญาต
ผลงานนี้ได้รับอนุญาตภายใต้ใบอนุญาตครีเอทีฟคอมมอนส์แบบระบุแหล่งที่มา 3.0 และใบอนุญาต Apache 2.0