1. ภาพรวม
Codelab นี้จะอธิบายถึงวิธีสร้างไปป์ไลน์การประมวลผลข้อมูลโดยใช้ Apache Spark ร่วมกับ Dataproc บน Google Cloud Platform เป็นกรณีการใช้งานทั่วไปในด้านวิทยาศาสตร์ข้อมูลและวิศวกรรมข้อมูลในการอ่านข้อมูลจากตำแหน่งพื้นที่เก็บข้อมูลหนึ่ง ดำเนินการเปลี่ยนรูปแบบพื้นที่เก็บข้อมูล และเขียนลงพื้นที่เก็บข้อมูลอีกตำแหน่งหนึ่ง การแปลงที่พบบ่อย ได้แก่ การเปลี่ยนเนื้อหาของข้อมูล การนำข้อมูลที่ไม่จำเป็นออก และการเปลี่ยนประเภทไฟล์
ใน Codelab นี้ คุณจะได้เรียนรู้เกี่ยวกับ Apache Spark, เรียกใช้ไปป์ไลน์ตัวอย่างโดยใช้ Dataproc ที่มี PySpark (Python API ของ Apache Spark), BigQuery, Google Cloud Storage และข้อมูลจาก Reddit
2. ข้อมูลเบื้องต้นเกี่ยวกับ Apache Spark (ไม่บังคับ)
ตามข้อมูลในเว็บไซต์ " Apache Spark เป็นเครื่องมือการวิเคราะห์แบบรวมสำหรับการประมวลผลข้อมูลขนาดใหญ่" ซึ่งจะช่วยให้คุณวิเคราะห์และประมวลผลข้อมูลแบบขนานและในหน่วยความจำได้ ซึ่งทำให้สามารถประมวลผลแบบขนานขนาดใหญ่ในเครื่องและโหนดต่างๆ ได้ เครื่องมือนี้เปิดตัวครั้งแรกเมื่อปี 2014 เพื่ออัปเกรดเป็น MapReduce แบบดั้งเดิม และยังคงเป็นหนึ่งในเฟรมเวิร์กที่ได้รับความนิยมมากที่สุดสำหรับการคำนวณขนาดใหญ่ Apache Spark เขียนขึ้นใน Scala และมี API ใน Scala, Java, Python และ R ในเวลาต่อมา ซึ่งประกอบด้วยไลบรารีจำนวนมาก เช่น Spark SQL สำหรับดำเนินการค้นหา SQL กับข้อมูล, Spark Streaming สำหรับข้อมูลสตรีมมิง, MLlib สำหรับแมชชีนเลิร์นนิง และ GraphX สำหรับการประมวลผลกราฟ ซึ่งล้วนทำงานบน Apache Spark Engine
Spark สามารถดำเนินการโดยลำพังหรือใช้ประโยชน์จากบริการจัดการทรัพยากร เช่น Yarn, Mesos หรือ Kubernetes เพื่อปรับขนาดก็ได้ คุณจะใช้ Dataproc สำหรับ Codelab นี้ซึ่งใช้ Yarn
แต่เดิมข้อมูลใน Spark จะถูกโหลดลงในหน่วยความจำที่เรียกว่า RDD หรือชุดข้อมูลที่มีการเผยแพร่ที่ยืดหยุ่น ตั้งแต่การพัฒนาใน Spark ได้มีการเพิ่มประเภทข้อมูลแบบคอลัมน์ใหม่ 2 ประเภท ได้แก่ ชุดข้อมูลที่เป็นการพิมพ์ และ Dataframe ที่ไม่มีการพิมพ์ พูดง่ายๆ ก็คือ RDD เหมาะสำหรับข้อมูลทุกประเภท ส่วนชุดข้อมูลและ DataFrame ได้รับการเพิ่มประสิทธิภาพเพื่อข้อมูลแบบตาราง เนื่องจากชุดข้อมูลใช้ได้เฉพาะกับ Java และ Scala API เราจึงจะใช้ PySpark Dataframe API สำหรับ Codelab นี้ สำหรับข้อมูลเพิ่มเติม โปรดดูเอกสารประกอบของ Apache Spark
3. กรณีการใช้งาน
วิศวกรข้อมูลมักต้องการให้นักวิทยาศาสตร์ข้อมูลเข้าถึงข้อมูลได้อย่างง่ายดาย อย่างไรก็ตาม ข้อมูลมักสกปรกในตอนแรก (ใช้งานวิเคราะห์ในสถานะปัจจุบันได้ยาก) และต้องมีการล้างข้อมูลก่อนจึงจะใช้งานได้ ตัวอย่างคือข้อมูลที่คัดลอกมาจากเว็บซึ่งอาจมีการเข้ารหัสแปลกๆ หรือแท็ก HTML ที่ไม่เกี่ยวข้อง
ในห้องทดลองนี้ คุณจะต้องโหลดชุดข้อมูลจาก BigQuery ในรูปแบบของโพสต์ Reddit ไปยังคลัสเตอร์ Spark ที่โฮสต์บน Dataproc ดึงข้อมูลที่เป็นประโยชน์และจัดเก็บข้อมูลที่ประมวลผลเป็นไฟล์ CSV ที่เป็นไฟล์ ZIP ใน Google Cloud Storage
หัวหน้านักวิทยาศาสตร์ข้อมูลที่บริษัทของคุณสนใจที่จะให้ทีมแก้ปัญหาต่างๆ ในการประมวลผลภาษาธรรมชาติ โดยเฉพาะอย่างยิ่ง ลูกค้าต้องการวิเคราะห์ข้อมูลใน subreddit "r/food" คุณจะสร้างไปป์ไลน์สำหรับดัมพ์ข้อมูลโดยเริ่มต้นด้วยโฆษณาทดแทนตั้งแต่เดือนมกราคม 2017 ถึงเดือนสิงหาคม 2019
4. การเข้าถึง BigQuery ผ่าน BigQuery Storage API
การดึงข้อมูลจาก BigQuery โดยใช้เมธอด API ของtabledata.list อาจใช้เวลานานและไม่มีประสิทธิภาพเมื่อปรับขนาดข้อมูล วิธีนี้จะแสดงรายการออบเจ็กต์ JSON และจำเป็นต้องอ่านทีละหน้าตามลำดับเพื่ออ่านชุดข้อมูลทั้งหมด
BigQuery Storage API ปรับปรุงอย่างมากในการเข้าถึงข้อมูลใน BigQuery โดยใช้โปรโตคอลที่อิงตาม RPC โปรโตคอลนี้รองรับการอ่านและเขียนข้อมูลพร้อมกัน รวมถึงรูปแบบอนุกรมต่างๆ เช่น Apache Avro และ Apache Arrow ในระดับสูง ผลลัพธ์นี้จะหมายถึงประสิทธิภาพที่ดีขึ้นอย่างมาก โดยเฉพาะอย่างยิ่งกับชุดข้อมูลขนาดใหญ่ขึ้น
ใน Codelab นี้ คุณจะใช้ spark-bigquery-connector สำหรับการอ่านและเขียนข้อมูลระหว่าง BigQuery และ Spark
5. การสร้างโปรเจ็กต์
ลงชื่อเข้าใช้คอนโซล Google Cloud Platform ที่ console.cloud.google.com แล้วสร้างโปรเจ็กต์ใหม่ดังนี้
ถัดไป คุณจะต้องเปิดใช้การเรียกเก็บเงินใน Cloud Console เพื่อใช้ทรัพยากร Google Cloud
การใช้งาน Codelab นี้น่าจะมีค่าใช้จ่ายไม่เกิน 2-3 ดอลลาร์ แต่อาจมากกว่านี้หากคุณตัดสินใจใช้ทรัพยากรมากขึ้นหรือปล่อยให้ทำงานต่อไป ส่วนสุดท้ายของ Codelab นี้จะแนะนำคุณเกี่ยวกับการล้างข้อมูลโปรเจ็กต์
ผู้ใช้ใหม่ของ Google Cloud Platform มีสิทธิ์รับช่วงทดลองใช้ฟรี$300
6. การตั้งค่าสภาพแวดล้อมของคุณ
ตอนนี้คุณจะเข้าสู่การตั้งค่าสภาพแวดล้อมโดยดำเนินการดังนี้
- การเปิดใช้ Compute Engine, Dataproc และ BigQuery Storage API
- การกําหนดการตั้งค่าโปรเจ็กต์
- การสร้างคลัสเตอร์ Dataproc
- การสร้างที่เก็บข้อมูล Google Cloud Storage
การเปิดใช้ API และการกำหนดค่าสภาพแวดล้อมของคุณ
เปิด Cloud Shell โดยกดปุ่มที่มุมขวาบนของ Cloud Console
หลังจากโหลด Cloud Shell แล้ว ให้เรียกใช้คำสั่งต่อไปนี้เพื่อเปิดใช้ Compute Engine, Dataproc และ BigQuery Storage API
gcloud services enable compute.googleapis.com \
dataproc.googleapis.com \
bigquerystorage.googleapis.com
ตั้งรหัสโปรเจ็กต์ของโปรเจ็กต์ โดยคุณไปที่หน้าการเลือกโปรเจ็กต์และค้นหาโปรเจ็กต์ได้ ซึ่งอาจไม่เหมือนกับชื่อโปรเจ็กต์
เรียกใช้คำสั่งต่อไปนี้เพื่อตั้งรหัสโปรเจ็กต์
gcloud config set project <project_id>
ตั้งค่าภูมิภาคของโปรเจ็กต์โดยเลือกจากรายการที่นี่ ตัวอย่างเช่น us-central1
gcloud config set dataproc/region <region>
เลือกชื่อสำหรับคลัสเตอร์ Dataproc แล้วสร้างตัวแปรสภาพแวดล้อมสำหรับคลัสเตอร์
CLUSTER_NAME=<cluster_name>
การสร้างคลัสเตอร์ Dataproc
สร้างคลัสเตอร์ Dataproc โดยเรียกใช้คำสั่งต่อไปนี้
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--worker-machine-type n1-standard-8 \
--num-workers 8 \
--image-version 1.5-debian \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--metadata 'PIP_PACKAGES=google-cloud-storage' \
--optional-components=ANACONDA \
--enable-component-gateway
คำสั่งนี้จะใช้เวลาสักครู่เพื่อดำเนินการให้เสร็จสิ้น หากต้องการแจกแจงคำสั่ง ให้ทำดังนี้
การดำเนินการนี้จะเริ่มต้นการสร้างคลัสเตอร์ Dataproc ด้วยชื่อที่คุณระบุก่อนหน้านี้ การใช้ beta
API จะเปิดใช้ฟีเจอร์เบต้าของ Dataproc เช่น เกตเวย์คอมโพเนนต์
gcloud beta dataproc clusters create ${CLUSTER_NAME}
ซึ่งจะเป็นการตั้งค่าประเภทของเครื่องที่จะใช้สำหรับผู้ปฏิบัติงาน
--worker-machine-type n1-standard-8
การดำเนินการนี้จะกำหนดจำนวนผู้ปฏิบัติงานที่คลัสเตอร์จะมี
--num-workers 8
ซึ่งจะเป็นการตั้งค่าเวอร์ชันอิมเมจของ Dataproc
--image-version 1.5-debian
การดำเนินการนี้จะกำหนดค่าการดำเนินการเริ่มต้นที่จะใช้ในคลัสเตอร์ คุณกำลังรวมการดำเนินการเริ่มต้น pip ไว้ที่นี่
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh
คือข้อมูลเมตาที่จะรวมไว้ในคลัสเตอร์ คุณกำลังระบุข้อมูลเมตาสำหรับการดำเนินการเริ่มต้น pip
--metadata 'PIP_PACKAGES=google-cloud-storage'
การดำเนินการนี้จะตั้งค่าคอมโพเนนต์ที่ไม่บังคับให้ติดตั้งบนคลัสเตอร์
--optional-components=ANACONDA
การดำเนินการนี้จะเปิดใช้เกตเวย์คอมโพเนนต์ที่ให้คุณใช้เกตเวย์คอมโพเนนต์ของ Dataproc เพื่อดู UI ทั่วไป เช่น Zeppelin, Jupyter หรือประวัติ Spark
--enable-component-gateway
โปรดดู Codelab สำหรับข้อมูลเบื้องต้นเกี่ยวกับ Dataproc แบบเจาะลึกมากขึ้น
การสร้างที่เก็บข้อมูล Google Cloud Storage
คุณจะต้องมีที่เก็บข้อมูล Google Cloud Storage สำหรับเอาต์พุตงาน ตั้งชื่อที่ไม่ซ้ำกันสำหรับที่เก็บข้อมูล และเรียกใช้คำสั่งต่อไปนี้เพื่อสร้างที่เก็บข้อมูลใหม่ ชื่อที่เก็บข้อมูลจะไม่ซ้ำกันในโปรเจ็กต์ Google Cloud ทั้งหมดสําหรับผู้ใช้ทุกคน คุณจึงอาจต้องพยายามทําเช่นนี้ 2-3 ครั้งโดยใช้ชื่ออื่น สร้างที่เก็บข้อมูลเรียบร้อยแล้วหากคุณไม่ได้รับ ServiceException
BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}
7. การวิเคราะห์ข้อมูลเชิงสำรวจ
ก่อนที่จะดำเนินการประมวลผลล่วงหน้า คุณควรดูข้อมูลเพิ่มเติมเกี่ยวกับลักษณะของข้อมูลที่กำลังจัดการ ในการทำเช่นนี้ คุณจะได้สำรวจ 2 วิธีในการสำรวจข้อมูล ขั้นตอนแรกคือดูข้อมูลดิบโดยใช้ UI เว็บของ BigQuery จากนั้นคำนวณจำนวนโพสต์ต่อ Subreddit โดยใช้ PySpark และ Dataproc
การใช้ UI ทางเว็บของ BigQuery
เริ่มต้นโดยใช้ UI เว็บของ BigQuery เพื่อดูข้อมูลของคุณ จากไอคอนเมนูใน Cloud Console ให้เลื่อนลงและกด "BigQuery" เพื่อเปิด UI ทางเว็บของ BigQuery
จากนั้นเรียกใช้คำสั่งต่อไปนี้ในตัวแก้ไขคำค้นหา UI บนเว็บของ BigQuery ซึ่งจะแสดงข้อมูล 10 แถวแบบเต็มตั้งแต่เดือนมกราคม 2017 ดังนี้
select * from fh-bigquery.reddit_posts.2017_01 limit 10;
คุณสามารถเลื่อนดูหน้าต่างๆ เพื่อดูคอลัมน์ทั้งหมดที่มีและตัวอย่างบางคอลัมน์ได้ โดยเฉพาะอย่างยิ่ง คุณจะเห็นคอลัมน์ 2 คอลัมน์ที่แสดงเนื้อหาที่เป็นข้อความของแต่ละโพสต์ ซึ่งก็คือ "ชื่อ" และ "selftext" โดยข้อความหลังจะเป็นเนื้อหาของโพสต์ โปรดสังเกตคอลัมน์อื่นๆ เช่น "created_utc" ด้วย ซึ่งเป็นเวลาเขียนโพสต์และ "subreddit" ซึ่งเป็น Subreddit ที่มีโพสต์อยู่
การใช้งาน PySpark
เรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell เพื่อโคลนที่เก็บด้วยโค้ดตัวอย่างและลง Cd ลงในไดเรกทอรีที่ถูกต้อง
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
คุณสามารถใช้ PySpark เพื่อระบุจำนวนโพสต์ที่มีอยู่สำหรับส่วนย่อยแต่ละรายการได้ คุณสามารถเปิด Cloud Editor แล้วอ่านสคริปต์ cloud-dataproc/codelabs/spark-bigquery
ก่อนที่จะเรียกใช้ในขั้นตอนถัดไปได้ โดยทำดังนี้
คลิกลิงก์ "Open Terminal" ใน Cloud Editor เพื่อเปลี่ยนกลับไปยัง Cloud Shell และเรียกใช้คำสั่งต่อไปนี้เพื่อเรียกใช้งาน PySpark แรกของคุณ
cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
--driver-log-levels root=FATAL \
counts_by_subreddit.py
คำสั่งนี้ช่วยให้คุณส่งงานไปยัง Dataproc ผ่าน Jobs API ได้ คุณกำลังระบุประเภทงานเป็น pyspark
คุณระบุชื่อคลัสเตอร์ พารามิเตอร์ที่ไม่บังคับ และชื่อไฟล์ที่มีงานได้ ในที่นี้ คุณกำลังระบุพารามิเตอร์ --jars
ซึ่งจะช่วยให้คุณรวม spark-bigquery-connector
เข้ากับงานได้ คุณยังสามารถตั้งค่าระดับเอาต์พุตของบันทึกโดยใช้ --driver-log-levels root=FATAL
ซึ่งจะระงับเอาต์พุตของบันทึกทั้งหมด ยกเว้นข้อผิดพลาด บันทึกของ Spark มักจะค่อนข้างเสียงดัง
การดำเนินการนี้อาจใช้เวลา 2-3 นาทีและเอาต์พุตสุดท้ายควรมีลักษณะดังนี้
8. สำรวจ Dataproc และ Spark UI
เมื่อเรียกใช้งาน Spark บน Dataproc คุณจะมีสิทธิ์เข้าถึง UI ทั้ง 2 รายการเพื่อตรวจสอบสถานะของงาน / คลัสเตอร์ ตัวเลือกแรกคือ Dataproc UI ซึ่งดูได้โดยคลิกไอคอนเมนูและเลื่อนลงไปที่ Dataproc คุณจะเห็นหน่วยความจำปัจจุบัน หน่วยความจำที่รอดำเนินการ และจำนวนคนทำงานที่นี่
นอกจากนี้ คุณสามารถคลิกแท็บ งาน เพื่อดูงานที่เสร็จแล้วได้ คุณสามารถดูรายละเอียดงาน เช่น บันทึกและเอาต์พุตของงานเหล่านั้น โดยคลิกที่รหัสงานสำหรับงานเฉพาะ
และยังดู UI ของ Spark ได้ด้วย จากหน้างาน ให้คลิกลูกศรย้อนกลับ จากนั้นคลิกที่อินเทอร์เฟซเว็บ คุณจะเห็นตัวเลือกหลายรายการในส่วนเกตเวย์คอมโพเนนต์ ซึ่งหลายรายการจะเปิดใช้ผ่านคอมโพเนนต์ที่ไม่บังคับได้เมื่อตั้งค่าคลัสเตอร์ สำหรับห้องทดลองนี้ ให้คลิกที่ "เซิร์ฟเวอร์ประวัติ Spark
ระบบจะเปิดหน้าต่างต่อไปนี้
งานที่เสร็จสมบูรณ์แล้วทั้งหมดจะแสดงที่นี่ และคุณสามารถคลิกที่ application_id ใดก็ได้เพื่อเรียนรู้เพิ่มเติมเกี่ยวกับงาน ในทำนองเดียวกัน คุณสามารถคลิก "แสดงแอปพลิเคชันที่ไม่สมบูรณ์" ที่ด้านล่างของหน้า Landing Page เพื่อดูงานทั้งหมดที่กำลังดำเนินการอยู่
9. การเรียกใช้งานโฆษณาทดแทน
ตอนนี้คุณจะเรียกใช้งานที่โหลดข้อมูลลงในหน่วยความจำ ดึงข้อมูลที่จำเป็น และถ่ายโอนเอาต์พุตลงในที่เก็บข้อมูล Google Cloud Storage คุณจะต้องดึงข้อมูล "ชื่อ", "เนื้อหา" (ข้อมูลดิบ) และ "สร้างการประทับเวลาแล้ว" สำหรับความคิดเห็น reddit แต่ละรายการ จากนั้นคุณจะนำข้อมูลนี้ไปแปลงเป็น CSV บีบอัด และโหลดลงในที่เก็บข้อมูลที่มี URI เป็น gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz
คุณสามารถอ้างอิง Cloud Editor อีกครั้งเพื่ออ่านโค้ดสำหรับ cloud-dataproc/codelabs/spark-bigquery/backfill.sh
ซึ่งเป็นสคริปต์ Wrapper เพื่อเรียกใช้โค้ดใน cloud-dataproc/codelabs/spark-bigquery/backfill.py
cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}
คุณจะเห็นข้อความว่างานเสร็จสมบูรณ์จำนวนมากในไม่ช้า งานอาจใช้เวลาถึง 15 นาทีจึงจะเสร็จสมบูรณ์ นอกจากนี้ คุณยังสามารถตรวจสอบที่เก็บข้อมูลของพื้นที่เก็บข้อมูลอีกครั้งเพื่อยืนยันเอาต์พุตของข้อมูลที่สําเร็จโดยใช้ gsutil เมื่องานทั้งหมดเสร็จสิ้นแล้ว ให้เรียกใช้คำสั่งต่อไปนี้
gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz
คุณควรจะเห็นผลลัพธ์ต่อไปนี้
ขอแสดงความยินดี คุณดำเนินการทดแทนข้อมูลความคิดเห็น Reddit สำเร็จแล้ว หากสนใจวิธีสร้างโมเดลเพิ่มเติมจากข้อมูลนี้ โปรดไปที่ Codelab ของ Spark-NLP
10. ล้างข้อมูล
เพื่อหลีกเลี่ยงไม่ให้เกิดการเรียกเก็บเงินที่ไม่จำเป็นกับบัญชี GCP หลังจากการเริ่มต้นอย่างรวดเร็วนี้เสร็จสิ้นแล้ว ให้ทำดังนี้
- ลบที่เก็บข้อมูล Cloud Storage สำหรับสภาพแวดล้อมและที่คุณสร้างขึ้น
- ลบสภาพแวดล้อม Dataproc
หากสร้างโปรเจ็กต์สำหรับ Codelab นี้โดยเฉพาะ คุณจะเลือกลบโปรเจ็กต์ได้ด้วย โดยทำดังนี้
- ในคอนโซล GCP ให้ไปที่หน้าโปรเจ็กต์
- ในรายการโปรเจ็กต์ ให้เลือกโปรเจ็กต์ที่ต้องการลบ แล้วคลิกลบ
- ในช่อง ให้พิมพ์รหัสโปรเจ็กต์ แล้วคลิกปิดเครื่องเพื่อลบโปรเจ็กต์
ใบอนุญาต
ผลงานนี้ได้รับอนุญาตภายใต้ใบอนุญาตครีเอทีฟคอมมอนส์แบบระบุแหล่งที่มา 3.0 และใบอนุญาต Apache 2.0