1. ภาพรวม
โค้ดแล็บนี้จะอธิบายวิธีสร้างไปป์ไลน์การประมวลผลข้อมูลโดยใช้ Apache Spark กับ Dataproc ใน Google Cloud Platform การอ่านข้อมูลจากที่เก็บข้อมูลหนึ่ง การแปลงข้อมูล และการเขียนข้อมูลลงในที่เก็บข้อมูลอื่นเป็นกรณีการใช้งานทั่วไปในสาขาวิทยาศาสตร์ข้อมูลและวิศวกรรมข้อมูล การแปลงข้อมูลที่พบบ่อย ได้แก่ การเปลี่ยนเนื้อหาของข้อมูล การนำข้อมูลที่ไม่จำเป็นออก และการเปลี่ยนประเภทไฟล์
ในโค้ดแล็บนี้ คุณจะได้เรียนรู้เกี่ยวกับ Apache Spark, เรียกใช้ไปป์ไลน์ตัวอย่างโดยใช้ Dataproc กับ PySpark (Python API ของ Apache Spark), BigQuery, Google Cloud Storage และข้อมูลจาก Reddit
2. ข้อมูลเบื้องต้นเกี่ยวกับ Apache Spark (ไม่บังคับ)
เว็บไซต์ระบุว่า "Apache Spark เป็นเครื่องมือวิเคราะห์แบบรวมสำหรับการประมวลผลข้อมูลขนาดใหญ่" ซึ่งช่วยให้คุณวิเคราะห์และประมวลผลข้อมูลแบบคู่ขนานและในหน่วยความจำได้ ซึ่งช่วยให้สามารถทำการคำนวณแบบคู่ขนานจำนวนมากในเครื่องและโหนดต่างๆ ได้ โดยเปิดตัวครั้งแรกในปี 2014 เพื่ออัปเกรด MapReduce แบบเดิม และยังคงเป็นหนึ่งในเฟรมเวิร์กที่ได้รับความนิยมมากที่สุดสำหรับการคำนวณขนาดใหญ่ Apache Spark เขียนด้วย Scala และมี API ใน Scala, Java, Python และ R ซึ่งมีไลบรารีมากมาย เช่น Spark SQL สำหรับการค้นหา SQL ในข้อมูล Spark Streaming สำหรับการสตรีมข้อมูล MLlib สำหรับแมชชีนเลิร์นนิง และ GraphX สำหรับการประมวลผลกราฟ ซึ่งทั้งหมดทำงานบนเครื่องมือ Apache Spark

Spark สามารถทำงานได้ด้วยตัวเองหรือใช้ประโยชน์จากบริการจัดการทรัพยากร เช่น Yarn, Mesos หรือ Kubernetes เพื่อการปรับขนาด คุณจะใช้ Dataproc สำหรับโค้ดแล็บนี้ ซึ่งใช้ Yarn
เดิมทีระบบจะโหลดข้อมูลใน Spark ลงในหน่วยความจำในสิ่งที่เรียกว่า RDD หรือชุดข้อมูลแบบกระจายที่ยืดหยุ่น การพัฒนา Spark ในเวลาต่อมาได้รวมการเพิ่มประเภทข้อมูลใหม่ 2 ประเภทในรูปแบบคอลัมน์ ได้แก่ ชุดข้อมูลซึ่งเป็นแบบมีประเภท และ Dataframe ซึ่งเป็นแบบไม่มีประเภท กล่าวอย่างคร่าวๆ คือ RDD เหมาะสําหรับข้อมูลทุกประเภท ในขณะที่ Dataset และ DataFrame ได้รับการเพิ่มประสิทธิภาพสําหรับข้อมูลตาราง เนื่องจากชุดข้อมูลใช้ได้กับ Java และ Scala API เท่านั้น เราจึงจะใช้ PySpark Dataframe API สำหรับ Codelab นี้ ดูข้อมูลเพิ่มเติมได้ที่เอกสารประกอบของ Apache Spark
3. กรณีการใช้งาน
วิศวกรข้อมูลมักต้องการให้เข้าถึงข้อมูลได้ง่ายสำหรับนักวิทยาศาสตร์ข้อมูล อย่างไรก็ตาม ข้อมูลมักจะ "ไม่สะอาด" ในตอนแรก (ใช้สำหรับการวิเคราะห์ในสถานะปัจจุบันได้ยาก) และต้องทำความสะอาดก่อนจึงจะมีประโยชน์มาก ตัวอย่างของข้อมูลนี้คือข้อมูลที่คัดลอกจากเว็บ ซึ่งอาจมีการเข้ารหัสที่ผิดปกติหรือแท็ก HTML ที่ไม่เกี่ยวข้อง
ใน Lab นี้ คุณจะโหลดชุดข้อมูลจาก BigQuery ในรูปแบบโพสต์ Reddit ลงในคลัสเตอร์ Spark ที่โฮสต์ใน Dataproc, แยกข้อมูลที่เป็นประโยชน์ และจัดเก็บข้อมูลที่ประมวลผลแล้วเป็นไฟล์ CSV ที่บีบอัดใน Google Cloud Storage

หัวหน้านักวิทยาศาสตร์ด้านข้อมูลที่บริษัทของคุณสนใจที่จะให้ทีมทำงานเกี่ยวกับปัญหาการประมวลผลภาษาธรรมชาติที่แตกต่างกัน โดยเฉพาะอย่างยิ่ง พวกเขาต้องการวิเคราะห์ข้อมูลในซับเรดดิต "r/food" คุณจะสร้างไปป์ไลน์สำหรับการดัมพ์ข้อมูลโดยเริ่มจากการทดแทนข้อมูลย้อนหลังตั้งแต่เดือนมกราคม 2017 ถึงเดือนสิงหาคม 2019
4. การเข้าถึง BigQuery ผ่าน BigQuery Storage API
การดึงข้อมูลจาก BigQuery โดยใช้เมธอด tabledata.list API อาจใช้เวลานานและไม่มีประสิทธิภาพเมื่อปริมาณข้อมูลเพิ่มขึ้น เมธอดนี้จะแสดงรายการออบเจ็กต์ JSON และต้องอ่านทีละหน้าตามลำดับเพื่ออ่านทั้งชุดข้อมูล
BigQuery Storage API ช่วยปรับปรุงการเข้าถึงข้อมูลใน BigQuery ได้อย่างมากโดยใช้โปรโตคอลที่อิงตาม RPC โดยรองรับการอ่านและเขียนข้อมูลแบบขนาน รวมถึงรูปแบบการซีเรียลไลซ์ต่างๆ เช่น Apache Avro และ Apache Arrow ในระดับสูง การดำเนินการนี้จะส่งผลให้ประสิทธิภาพดีขึ้นอย่างมาก โดยเฉพาะในชุดข้อมูลขนาดใหญ่
ใน Codelab นี้ คุณจะได้ใช้ spark-bigquery-connector เพื่ออ่านและเขียนข้อมูลระหว่าง BigQuery กับ Spark
5. การสร้างโปรเจ็กต์
ลงชื่อเข้าใช้คอนโซล Google Cloud Platform ที่ console.cloud.google.com แล้วสร้างโปรเจ็กต์ใหม่โดยทำดังนี้



จากนั้นคุณจะต้องเปิดใช้การเรียกเก็บเงินใน Cloud Console เพื่อใช้ทรัพยากรของ Google Cloud
การทำตาม Codelab นี้ไม่ควรมีค่าใช้จ่ายเกิน 2-3 ดอลลาร์ แต่ก็อาจมีค่าใช้จ่ายมากกว่านี้หากคุณตัดสินใจใช้ทรัพยากรเพิ่มเติมหรือปล่อยให้ทรัพยากรทำงานต่อไป ส่วนสุดท้ายของโค้ดแล็บนี้จะแนะนำวิธีล้างข้อมูลโปรเจ็กต์
ผู้ใช้ใหม่ของ Google Cloud Platform มีสิทธิ์รับช่วงทดลองใช้ฟรีมูลค่า$300
6. การตั้งค่าสภาพแวดล้อม
ตอนนี้คุณจะไปที่การตั้งค่าสภาพแวดล้อมโดยทำดังนี้
- การเปิดใช้ Compute Engine, Dataproc และ BigQuery Storage API
- การกำหนดค่าการตั้งค่าโปรเจ็กต์
- การสร้างคลัสเตอร์ Dataproc
- การสร้าง Bucket ของ Google Cloud Storage
การเปิดใช้ API และการกำหนดค่าสภาพแวดล้อม
เปิด Cloud Shell โดยกดปุ่มที่มุมขวาบนของ Cloud Console

หลังจากโหลด Cloud Shell แล้ว ให้เรียกใช้คำสั่งต่อไปนี้เพื่อเปิดใช้ Compute Engine, Dataproc และ BigQuery Storage API
gcloud services enable compute.googleapis.com \
dataproc.googleapis.com \
bigquerystorage.googleapis.com
ตั้งค่ารหัสโปรเจ็กต์ของโปรเจ็กต์ คุณสามารถดูได้โดยไปที่หน้าการเลือกโปรเจ็กต์แล้วค้นหาโปรเจ็กต์ของคุณ ซึ่งอาจไม่ใช่ชื่อเดียวกับชื่อโปรเจ็กต์


เรียกใช้คำสั่งต่อไปนี้เพื่อตั้งค่ารหัสโปรเจ็กต์
gcloud config set project <project_id>
ตั้งค่าภูมิภาคของโปรเจ็กต์โดยเลือกจากรายการที่นี่ ตัวอย่างเช่น us-central1
gcloud config set dataproc/region <region>
เลือกชื่อสำหรับคลัสเตอร์ Dataproc แล้วสร้างตัวแปรสภาพแวดล้อมสำหรับคลัสเตอร์
CLUSTER_NAME=<cluster_name>
การสร้างคลัสเตอร์ Dataproc
สร้างคลัสเตอร์ Dataproc โดยเรียกใช้คำสั่งต่อไปนี้
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--worker-machine-type n1-standard-8 \
--num-workers 8 \
--image-version 1.5-debian \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--metadata 'PIP_PACKAGES=google-cloud-storage' \
--optional-components=ANACONDA \
--enable-component-gateway
คำสั่งนี้จะใช้เวลาสักครู่จึงจะเสร็จสมบูรณ์ โดยคำสั่งนี้มีรายละเอียดดังนี้
ซึ่งจะเป็นการเริ่มสร้างคลัสเตอร์ Dataproc ที่มีชื่อที่คุณระบุไว้ก่อนหน้านี้ การใช้ beta API จะเปิดใช้ฟีเจอร์เบต้าของ Dataproc เช่น Component Gateway
gcloud beta dataproc clusters create ${CLUSTER_NAME}
ซึ่งจะเป็นการตั้งค่าประเภทของเครื่องที่จะใช้สำหรับผู้ปฏิบัติงาน
--worker-machine-type n1-standard-8
ซึ่งจะเป็นการกำหนดจำนวนผู้ปฏิบัติงานที่คลัสเตอร์จะมี
--num-workers 8
ซึ่งจะเป็นการตั้งค่าเวอร์ชันอิมเมจของ Dataproc
--image-version 1.5-debian
ซึ่งจะเป็นการกำหนดค่าการดำเนินการเริ่มต้นที่จะใช้ในคลัสเตอร์ ในที่นี้ คุณจะรวมการดำเนินการเริ่มต้น pip
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh
นี่คือข้อมูลเมตาที่จะรวมไว้ในคลัสเตอร์ ในที่นี้ คุณกำลังระบุข้อมูลเมตาสำหรับpipการดำเนินการเริ่มต้น
--metadata 'PIP_PACKAGES=google-cloud-storage'
ซึ่งจะเป็นการตั้งค่าให้ติดตั้งคอมโพเนนต์ที่ไม่บังคับในคลัสเตอร์
--optional-components=ANACONDA
ซึ่งจะเปิดใช้เกตเวย์คอมโพเนนต์ที่ช่วยให้คุณใช้เกตเวย์คอมโพเนนต์ของ Dataproc เพื่อดู UI ทั่วไป เช่น Zeppelin, Jupyter หรือประวัติ Spark ได้
--enable-component-gateway
ดูข้อมูลเบื้องต้นเกี่ยวกับ Dataproc แบบเจาะลึกเพิ่มเติมได้ใน Codelab นี้
การสร้าง Bucket ของ Google Cloud Storage
คุณจะต้องมี Bucket ของ Google Cloud Storage สำหรับเอาต์พุตของงาน กำหนดชื่อที่ไม่ซ้ำกันสำหรับ Bucket แล้วเรียกใช้คำสั่งต่อไปนี้เพื่อสร้าง Bucket ใหม่ ชื่อ Bucket จะไม่ซ้ำกันในโปรเจ็กต์ Google Cloud ทั้งหมดสำหรับผู้ใช้ทุกคน ดังนั้นคุณอาจต้องลองตั้งชื่อหลายครั้งด้วยชื่อที่แตกต่างกัน ระบบจะสร้างที่เก็บสำเร็จหากคุณไม่ได้รับ ServiceException
BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}
7. การวิเคราะห์ข้อมูลเบื้องต้น
ก่อนทำการประมวลผลล่วงหน้า คุณควรศึกษาเพิ่มเติมเกี่ยวกับลักษณะของข้อมูลที่คุณกำลังจัดการ โดยคุณจะต้องสำรวจวิธีการสํารวจข้อมูล 2 วิธี ก่อนอื่น คุณจะดูข้อมูลดิบบางส่วนโดยใช้เว็บ UI ของ BigQuery จากนั้นจะคำนวณจำนวนโพสต์ต่อซับเรดดิตโดยใช้ PySpark และ Dataproc
การใช้เว็บ UI ของ BigQuery
เริ่มต้นด้วยการใช้เว็บ UI ของ BigQuery เพื่อดูข้อมูล จากไอคอนเมนูใน Cloud Console ให้เลื่อนลงแล้วกด "BigQuery" เพื่อเปิด UI ทางเว็บของ BigQuery

จากนั้นเรียกใช้คำสั่งต่อไปนี้ในตัวแก้ไขคำค้นหาของ BigQuery Web UI ซึ่งจะแสดงผลข้อมูล 10 แถวเต็มจากเดือนมกราคม 2017 ดังนี้
select * from fh-bigquery.reddit_posts.2017_01 limit 10;

คุณเลื่อนดูหน้าเว็บเพื่อดูคอลัมน์ทั้งหมดที่มี รวมถึงตัวอย่างบางส่วนได้ โดยเฉพาะอย่างยิ่ง คุณจะเห็น 2 คอลัมน์ที่แสดงเนื้อหาที่เป็นข้อความของแต่ละโพสต์ ได้แก่ "title" และ "selftext" ซึ่งเป็นเนื้อหาของโพสต์ นอกจากนี้ ให้สังเกตคอลัมน์อื่นๆ เช่น "created_utc" ซึ่งเป็นเวลา UTC ที่มีการสร้างโพสต์ และ "subreddit" ซึ่งเป็น subreddit ที่โพสต์อยู่
การเรียกใช้ PySpark Job
เรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell เพื่อโคลนที่เก็บที่มีโค้ดตัวอย่างและเปลี่ยนไดเรกทอรีไปยังไดเรกทอรีที่ถูกต้อง
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
คุณสามารถใช้ PySpark เพื่อกำหนดจำนวนโพสต์ที่มีอยู่ในแต่ละ Subreddit คุณสามารถเปิด Cloud Editor และอ่านสคริปต์ cloud-dataproc/codelabs/spark-bigquery ก่อนที่จะเรียกใช้ในขั้นตอนถัดไปได้โดยทำดังนี้


คลิกปุ่ม "เปิดเทอร์มินัล" ใน Cloud Editor เพื่อเปลี่ยนกลับไปที่ Cloud Shell แล้วเรียกใช้คำสั่งต่อไปนี้เพื่อเรียกใช้งาน PySpark งานแรก
cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
--driver-log-levels root=FATAL \
counts_by_subreddit.py
คำสั่งนี้ช่วยให้คุณส่งงานไปยัง Dataproc ผ่าน Jobs API ได้ ในที่นี้ คุณกำลังระบุประเภทงานเป็น pyspark คุณระบุชื่อคลัสเตอร์ พารามิเตอร์ที่ไม่บังคับ และชื่อไฟล์ที่มีงานได้ ในที่นี้ คุณกำลังระบุพารามิเตอร์ --jars ซึ่งช่วยให้คุณรวม spark-bigquery-connector ไว้กับงานได้ นอกจากนี้ คุณยังตั้งค่าระดับเอาต์พุตของบันทึกได้โดยใช้ --driver-log-levels root=FATAL ซึ่งจะระงับเอาต์พุตของบันทึกทั้งหมด ยกเว้นข้อผิดพลาด โดยปกติแล้วบันทึกของ Spark จะมีข้อมูลจำนวนมาก
การดำเนินการนี้จะใช้เวลาไม่กี่นาที และเอาต์พุตสุดท้ายควรมีลักษณะดังนี้

8. การสำรวจ UI ของ Dataproc และ Spark
เมื่อเรียกใช้งาน Spark บน Dataproc คุณจะมีสิทธิ์เข้าถึง UI 2 รายการเพื่อตรวจสอบสถานะของงาน / คลัสเตอร์ ตัวแรกคือ UI ของ Dataproc ซึ่งคุณจะเห็นได้โดยคลิกไอคอนเมนูแล้วเลื่อนลงไปที่ Dataproc คุณจะเห็นหน่วยความจำปัจจุบันที่พร้อมใช้งาน รวมถึงหน่วยความจำที่รอดำเนินการและจำนวนผู้ปฏิบัติงาน

นอกจากนี้ คุณยังคลิกแท็บงานเพื่อดูงานที่เสร็จสมบูรณ์แล้วได้ด้วย คุณดูรายละเอียดของงาน เช่น บันทึกและเอาต์พุตของงานเหล่านั้นได้โดยคลิกรหัสงานของงานที่ต้องการ 

นอกจากนี้ คุณยังดู Spark UI ได้ด้วย จากหน้างาน ให้คลิกลูกศรย้อนกลับ แล้วคลิกอินเทอร์เฟซเว็บ คุณควรเห็นตัวเลือกหลายรายการในส่วนเกตเวย์คอมโพเนนต์ คุณเปิดใช้หลายรายการได้ผ่านคอมโพเนนต์ที่ไม่บังคับเมื่อตั้งค่าคลัสเตอร์ สำหรับ Lab นี้ ให้คลิก "Spark History Server"


ซึ่งควรจะเปิดหน้าต่างต่อไปนี้

งานทั้งหมดที่ทำเสร็จแล้วจะปรากฏที่นี่ และคุณสามารถคลิก application_id เพื่อดูข้อมูลเพิ่มเติมเกี่ยวกับงานได้ ในทำนองเดียวกัน คุณสามารถคลิก "แสดงใบสมัครที่ไม่สมบูรณ์" ที่ด้านล่างสุดของหน้า Landing Page เพื่อดูงานทั้งหมดที่กำลังดำเนินการอยู่
9. เรียกใช้การเติมงาน
ตอนนี้คุณจะเรียกใช้งานที่โหลดข้อมูลลงในหน่วยความจำ แยกข้อมูลที่จำเป็น และส่งออกเอาต์พุตไปยัง Bucket ของ Google Cloud Storage คุณจะดึงข้อมูล "title", "body" (ข้อความดิบ) และ "timestamp created" สำหรับความคิดเห็นแต่ละรายการใน Reddit จากนั้นคุณจะนำข้อมูลนี้ไปแปลงเป็น CSV, บีบอัดเป็น ZIP และโหลดลงในที่เก็บข้อมูลที่มี URI เป็น gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz
คุณสามารถกลับไปที่ Cloud Editor เพื่ออ่านโค้ดสำหรับ cloud-dataproc/codelabs/spark-bigquery/backfill.sh ซึ่งเป็นสคริปต์ Wrapper เพื่อเรียกใช้โค้ดใน cloud-dataproc/codelabs/spark-bigquery/backfill.py
cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}
คุณควรเห็นข้อความการทำงานเสร็จจำนวนมากในไม่ช้า งานนี้อาจใช้เวลาถึง 15 นาทีจึงจะเสร็จสมบูรณ์ นอกจากนี้ คุณยังตรวจสอบ Bucket ที่เก็บข้อมูลอีกครั้งเพื่อยืนยันว่าส่งออกข้อมูลสำเร็จโดยใช้ gsutil ได้ด้วย เมื่องานทั้งหมดเสร็จสิ้นแล้ว ให้เรียกใช้คำสั่งต่อไปนี้
gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz
คุณควรเห็นเอาต์พุตต่อไปนี้

ขอแสดงความยินดี คุณได้ทำการเติมข้อมูลย้อนหลังสำหรับข้อมูลความคิดเห็นใน Reddit เสร็จสมบูรณ์แล้ว หากสนใจวิธีสร้างโมเดลจากข้อมูลนี้ โปรดไปที่ Codelab ของ Spark-NLP
10. ล้างข้อมูล
โปรดดำเนินการดังนี้เพื่อเลี่ยงไม่ให้เกิดการเรียกเก็บเงินที่ไม่จำเป็นกับบัญชี GCP หลังจากที่ทำตามการเริ่มต้นอย่างรวดเร็วนี้เสร็จแล้ว
- ลบ Bucket ของ Cloud Storage สำหรับสภาพแวดล้อมและที่คุณสร้างไว้
- ลบสภาพแวดล้อม Dataproc
หากสร้างโปรเจ็กต์เพื่อใช้กับ Codelab นี้โดยเฉพาะ คุณจะลบโปรเจ็กต์ได้ด้วย (ไม่บังคับ) โดยทำดังนี้
- ในคอนโซล GCP ให้ไปที่หน้าโปรเจ็กต์
- ในรายการโปรเจ็กต์ ให้เลือกโปรเจ็กต์ที่ต้องการลบ แล้วคลิกลบ
- พิมพ์รหัสโปรเจ็กต์ในช่อง แล้วคลิกปิดเพื่อลบโปรเจ็กต์
ใบอนุญาต
ผลงานนี้ได้รับอนุญาตภายใต้สัญญาอนุญาตครีเอทีฟคอมมอนส์แบบระบุแหล่งที่มา 3.0 ทั่วไป และสัญญาอนุญาต Apache 2.0