1. ภาพรวม
โค้ดแล็บนี้จะอธิบายวิธีสร้างไปป์ไลน์การประมวลผลข้อมูลโดยใช้ Apache Spark กับ Dataproc ใน Google Cloud Platform การอ่านข้อมูลจากตำแหน่งพื้นที่เก็บข้อมูลหนึ่ง เปลี่ยนรูปแบบข้อมูล และเขียนลงในตำแหน่งพื้นที่เก็บข้อมูลอื่นเป็นกรณีการใช้งานที่พบได้ทั่วไปในวิทยาการข้อมูลและวิศวกรข้อมูล การเปลี่ยนรูปแบบที่พบได้ทั่วไป ได้แก่ การเปลี่ยนเนื้อหาของข้อมูล การนำข้อมูลที่ไม่จําเป็นออก และการเปลี่ยนประเภทไฟล์
ในโค้ดแล็บนี้ คุณจะได้เรียนรู้เกี่ยวกับ Apache Spark, เรียกใช้ไปป์ไลน์ตัวอย่างโดยใช้ Dataproc พร้อม PySpark (Python API ของ Apache Spark), BigQuery, Google Cloud Storage และข้อมูลจาก Reddit
2. ข้อมูลเบื้องต้นเกี่ยวกับ Apache Spark (ไม่บังคับ)
เว็บไซต์ระบุว่า "Apache Spark เป็นเครื่องมือวิเคราะห์แบบรวมสําหรับการประมวลผลข้อมูลขนาดใหญ่" ซึ่งช่วยให้คุณวิเคราะห์และประมวลผลข้อมูลแบบขนานและในหน่วยความจําได้ ซึ่งจะทําให้สามารถคํานวณแบบขนานจํานวนมากในเครื่องและโหนดต่างๆ ได้ เฟรมเวิร์กนี้เปิดตัวครั้งแรกในปี 2014 เป็นการอัปเกรด MapReduce แบบดั้งเดิม และยังคงเป็นหนึ่งในเฟรมเวิร์กยอดนิยมสําหรับการประมวลผลขนาดใหญ่ Apache Spark เขียนด้วย Scala และมี API ใน Scala, Java, Python และ R โดยประกอบด้วยไลบรารีมากมาย เช่น Spark SQL สําหรับการค้นหา SQL ในข้อมูล, Spark Streaming สําหรับสตรีมมิงข้อมูล, MLlib สําหรับแมชชีนเลิร์นนิง และ GraphX สําหรับการประมวลผลกราฟ ซึ่งทั้งหมดนี้ทํางานบนเครื่องมือ Apache Spark
Spark สามารถทํางานได้ด้วยตัวเองหรือจะใช้ประโยชน์จากบริการจัดการทรัพยากร เช่น Yarn, Mesos หรือ Kubernetes เพื่อปรับขนาดก็ได้ คุณจะใช้ Dataproc สําหรับโค้ดแล็บนี้ ซึ่งใช้ Yarn
เดิมข้อมูลใน Spark จะโหลดลงในหน่วยความจําเป็น RDD หรือชุดข้อมูลที่กระจายแบบยืดหยุ่น การพัฒนาใน Spark มีการเพิ่มประเภทข้อมูลแบบคอลัมน์ใหม่ 2 ประเภท ได้แก่ ชุดข้อมูลที่มีการจัดประเภทและ Dataframe ที่ไม่มีการจัดประเภท กล่าวโดยคร่าวๆ คือ RDD เหมาะสําหรับข้อมูลทุกประเภท ส่วนชุดข้อมูลและ DataFrame เพิ่มประสิทธิภาพสําหรับข้อมูลตาราง เนื่องจากชุดข้อมูลใช้ได้กับ Java และ Scala API เท่านั้น เราจะใช้ PySpark Dataframe API สําหรับโค้ดแล็บนี้ ดูข้อมูลเพิ่มเติมได้ที่เอกสารประกอบของ Apache Spark
3. กรณีการใช้งาน
วิศวกรข้อมูลมักต้องการให้นักวิทยาศาสตร์ข้อมูลเข้าถึงข้อมูลได้ง่ายๆ อย่างไรก็ตาม ข้อมูลมักจะไม่สะอาดในตอนแรก (ใช้งานวิเคราะห์ได้ยากในสถานะปัจจุบัน) และต้องได้รับการทำความสะอาดก่อนจึงจะมีประโยชน์มาก ตัวอย่างของกรณีนี้คือข้อมูลที่คัดลอกมาจากเว็บซึ่งอาจมีการเข้ารหัสที่แปลกประหลาดหรือแท็ก HTML ที่ไม่เกี่ยวข้อง
ในบทนี้ คุณจะโหลดชุดข้อมูลจาก BigQuery ในรูปแบบโพสต์ Reddit ไปยังคลัสเตอร์ Spark ที่โฮสต์ใน Dataproc, ดึงข้อมูลที่เป็นประโยชน์ และจัดเก็บข้อมูลที่ประมวลผลแล้วเป็นไฟล์ CSV ที่บีบอัดใน Google Cloud Storage
นักวิทยาศาสตร์ข้อมูลระดับอาวุโสของบริษัทคุณสนใจให้ทีมแก้ปัญหาการประมวลผลภาษาที่เป็นธรรมชาติ โดยเฉพาะอย่างยิ่ง ลูกค้ารายนี้สนใจวิเคราะห์ข้อมูลในฟอรัม "r/food" คุณจะต้องสร้างไปป์ไลน์สําหรับการถ่ายโอนข้อมูลโดยเริ่มจากการทดแทนข้อมูลตั้งแต่เดือนมกราคม 2017 ถึงเดือนสิงหาคม 2019
4. การเข้าถึง BigQuery ผ่าน BigQuery Storage API
การดึงข้อมูลจาก BigQuery โดยใช้เมธอด tabledata.list API อาจใช้เวลานานและไม่มีประสิทธิภาพเมื่อปริมาณข้อมูลเพิ่มขึ้น เมธอดนี้จะแสดงรายการออบเจ็กต์ JSON และต้องอ่านทีละหน้าตามลําดับเพื่ออ่านชุดข้อมูลทั้งหมด
BigQuery Storage API มีการปรับปรุงที่สำคัญในการเข้าถึงข้อมูลใน BigQuery โดยใช้โปรโตคอลที่อิงตาม RPC โดยรองรับการอ่านและเขียนข้อมูลพร้อมกัน รวมถึงรูปแบบการแปลงเป็นอนุกรมรูปแบบต่างๆ เช่น Apache Avro และ Apache Arrow ในระดับสูง การดำเนินการนี้จะช่วยให้ประสิทธิภาพดีขึ้นอย่างมาก โดยเฉพาะกับชุดข้อมูลขนาดใหญ่
ในโค้ดแล็บนี้ คุณจะใช้ spark-bigquery-connector ในการอ่านและเขียนข้อมูลระหว่าง BigQuery กับ Spark
5. การสร้างโปรเจ็กต์
ลงชื่อเข้าใช้คอนโซล Google Cloud Platform ที่ console.cloud.google.com และสร้างโปรเจ็กต์ใหม่โดยทำดังนี้
ถัดไป คุณจะต้องเปิดใช้การเรียกเก็บเงินใน Cloud Console เพื่อใช้ทรัพยากร Google Cloud
ค่าใช้จ่ายในการทํา Codelab นี้ไม่ควรเกิน 200 บาท แต่อาจมากกว่านั้นหากคุณตัดสินใจใช้ทรัพยากรเพิ่มเติมหรือปล่อยไว้ให้ทํางาน ส่วนสุดท้ายของ Codelab นี้จะแนะนำวิธีล้างข้อมูลโปรเจ็กต์
ผู้ใช้ใหม่ของ Google Cloud Platform มีสิทธิ์รับช่วงทดลองใช้ฟรีมูลค่า$300
6. การตั้งค่าสภาพแวดล้อม
ตอนนี้คุณจะต้องตั้งค่าสภาพแวดล้อมโดยทำดังนี้
- การเปิดใช้ Compute Engine, Dataproc และ BigQuery Storage API
- การกำหนดการตั้งค่าโปรเจ็กต์
- การสร้างคลัสเตอร์ Dataproc
- การสร้างที่เก็บข้อมูล Google Cloud Storage
การเปิดใช้ API และการกำหนดค่าสภาพแวดล้อม
เปิด Cloud Shell โดยกดปุ่มที่มุมขวาบนของ Cloud Console
หลังจากโหลด Cloud Shell แล้ว ให้เรียกใช้คำสั่งต่อไปนี้เพื่อเปิดใช้ Compute Engine, Dataproc และ BigQuery Storage API
gcloud services enable compute.googleapis.com \
dataproc.googleapis.com \
bigquerystorage.googleapis.com
ตั้งค่ารหัสโปรเจ็กต์ของโปรเจ็กต์ โดยไปที่หน้าการเลือกโปรเจ็กต์แล้วค้นหาโปรเจ็กต์ ซึ่งอาจไม่ใช่ชื่อเดียวกับโปรเจ็กต์
เรียกใช้คําสั่งต่อไปนี้เพื่อตั้งค่ารหัสโปรเจ็กต์
gcloud config set project <project_id>
ตั้งค่าภูมิภาคของโปรเจ็กต์โดยเลือกจากรายการที่นี่ ตัวอย่างเช่น us-central1
gcloud config set dataproc/region <region>
เลือกชื่อสำหรับคลัสเตอร์ Dataproc และสร้างตัวแปรสภาพแวดล้อมสำหรับคลัสเตอร์
CLUSTER_NAME=<cluster_name>
การสร้างคลัสเตอร์ Dataproc
สร้างคลัสเตอร์ Dataproc โดยเรียกใช้คําสั่งต่อไปนี้
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--worker-machine-type n1-standard-8 \
--num-workers 8 \
--image-version 1.5-debian \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--metadata 'PIP_PACKAGES=google-cloud-storage' \
--optional-components=ANACONDA \
--enable-component-gateway
คำสั่งนี้จะใช้เวลาสักครู่จึงจะเสร็จสมบูรณ์ รายละเอียดของคำสั่งมีดังนี้
ซึ่งจะเป็นการเริ่มสร้างคลัสเตอร์ Dataproc ด้วยชื่อที่คุณระบุไว้ก่อนหน้านี้ การใช้ beta
API จะเปิดใช้ฟีเจอร์เบต้าของ Dataproc เช่น Component Gateway
gcloud beta dataproc clusters create ${CLUSTER_NAME}
ซึ่งจะตั้งค่าประเภทเครื่องที่จะใช้สำหรับผู้ปฏิบัติงาน
--worker-machine-type n1-standard-8
ซึ่งจะเป็นการกำหนดจำนวนผู้ปฏิบัติงานของคลัสเตอร์
--num-workers 8
ซึ่งจะเป็นการกำหนดเวอร์ชันอิมเมจของ Dataproc
--image-version 1.5-debian
ซึ่งจะกําหนดค่าการดำเนินการเริ่มต้นที่จะใช้ในคลัสเตอร์ ในส่วนนี้ คุณกําลังรวมการดําเนินการเริ่มต้น pip
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh
นี่คือข้อมูลเมตาที่จะรวมไว้ในคลัสเตอร์ ในส่วนนี้ คุณกําลังระบุข้อมูลเมตาสําหรับpip
การดำเนินการเริ่มต้น
--metadata 'PIP_PACKAGES=google-cloud-storage'
ซึ่งจะเป็นการตั้งค่าให้ติดตั้งคอมโพเนนต์ที่ไม่บังคับในคลัสเตอร์
--optional-components=ANACONDA
ซึ่งจะเปิดใช้เกตเวย์คอมโพเนนต์ที่ช่วยให้คุณใช้เกตเวย์คอมโพเนนต์ของ Dataproc เพื่อดู UI ทั่วไป เช่น Zeppelin, Jupyter หรือประวัติ Spark ได้
--enable-component-gateway
ดูข้อมูลเบื้องต้นที่ละเอียดยิ่งขึ้นเกี่ยวกับ Dataproc ได้ที่ codelab นี้
การสร้างที่เก็บข้อมูล Google Cloud Storage
คุณจะต้องมีที่เก็บข้อมูล Google Cloud Storage สำหรับเอาต์พุตของงาน กำหนดชื่อที่ไม่ซ้ำกันสำหรับที่เก็บข้อมูล แล้วเรียกใช้คำสั่งต่อไปนี้เพื่อสร้างที่เก็บข้อมูลใหม่ ชื่อที่เก็บข้อมูลต้องไม่ซ้ำกันสำหรับผู้ใช้ทุกคนในโปรเจ็กต์ Google Cloud ทั้งหมด คุณจึงอาจต้องลองดำเนินการนี้หลายครั้งโดยใช้ชื่ออื่น ระบบจะสร้างที่เก็บข้อมูลสำเร็จหากคุณไม่ได้รับ ServiceException
BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}
7. การวิเคราะห์ข้อมูลเชิงสำรวจ
ก่อนทำการเตรียมข้อมูลเบื้องต้น คุณควรศึกษาเพิ่มเติมเกี่ยวกับลักษณะของข้อมูลที่คุณจัดการ โดยคุณจะได้สํารวจวิธีการสํารวจข้อมูล 2 วิธี ก่อนอื่น คุณจะเห็นข้อมูลดิบบางส่วนโดยใช้เว็บ UI ของ BigQuery จากนั้นจะคํานวณจํานวนโพสต์ต่อแต่ละฟอรัมย่อยโดยใช้ PySpark และ Dataproc
การใช้เว็บ UI ของ BigQuery
เริ่มต้นด้วยการใช้เว็บ UI ของ BigQuery เพื่อดูข้อมูล จากไอคอนเมนูในคอนโซลระบบคลาวด์ ให้เลื่อนลงแล้วกด "BigQuery" เพื่อเปิด UI ทางเว็บของ BigQuery
จากนั้นเรียกใช้คําสั่งต่อไปนี้ในเครื่องมือแก้ไขคําค้นหาของ UI ทางเว็บ BigQuery ซึ่งจะแสดงผลข้อมูล 10 แถวเต็มๆ ตั้งแต่เดือนมกราคม 2017
select * from fh-bigquery.reddit_posts.2017_01 limit 10;
คุณสามารถเลื่อนหน้าเพื่อดูคอลัมน์ทั้งหมดที่ใช้ได้ รวมถึงดูตัวอย่างบางส่วน โดยเฉพาะอย่างยิ่ง คุณจะเห็น 2 คอลัมน์ที่แสดงเนื้อหาที่เป็นข้อความของโพสต์แต่ละรายการ ได้แก่ "title" และ "selftext" โดยคอลัมน์หลังคือเนื้อหาของโพสต์ และดูคอลัมน์อื่นๆ ด้วย เช่น "created_utc" ซึ่งเป็นเวลา UTC ที่สร้างโพสต์ และ "subreddit" ซึ่งเป็นชื่อของ subreddit ที่โพสต์อยู่
การดำเนินการกับงาน PySpark
เรียกใช้คําสั่งต่อไปนี้ใน Cloud Shell เพื่อโคลนที่เก็บที่มีโค้ดตัวอย่างและ cd ไปยังไดเรกทอรีที่ถูกต้อง
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
คุณสามารถใช้ PySpark เพื่อระบุจํานวนโพสต์ที่มีอยู่สําหรับแต่ละฟอรัมย่อยได้ คุณสามารถเปิด Cloud Editor และอ่านสคริปต์ cloud-dataproc/codelabs/spark-bigquery
ก่อนที่จะเรียกใช้ในขั้นตอนถัดไป
คลิกปุ่ม "เปิดเทอร์มินัล" ในเครื่องมือแก้ไขบนระบบคลาวด์เพื่อเปลี่ยนกลับไปที่ Cloud Shell แล้วเรียกใช้คําสั่งต่อไปนี้เพื่อดําเนินการงาน PySpark แรก
cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
--driver-log-levels root=FATAL \
counts_by_subreddit.py
คำสั่งนี้ช่วยให้คุณส่งงานไปยัง Dataproc ผ่าน Jobs API ได้ ในส่วนนี้คุณกําลังระบุประเภทงานเป็น pyspark
คุณสามารถระบุชื่อคลัสเตอร์ พารามิเตอร์ที่ไม่บังคับ และชื่อไฟล์ที่มีงาน ในส่วนนี้ คุณกําลังระบุพารามิเตอร์ --jars
ซึ่งช่วยให้คุณรวม spark-bigquery-connector
ไว้ในงานได้ นอกจากนี้ คุณยังตั้งค่าระดับเอาต์พุตบันทึกได้โดยใช้ --driver-log-levels root=FATAL
ซึ่งจะระงับเอาต์พุตบันทึกทั้งหมดยกเว้นข้อผิดพลาด บันทึกของ Spark มีแนวโน้มที่จะค่อนข้างมีข้อมูลมากเกินไป
การดำเนินการนี้จะใช้เวลา 2-3 นาที และเอาต์พุตสุดท้ายควรมีลักษณะดังนี้
8. การสำรวจ UI ของ Dataproc และ Spark
เมื่อเรียกใช้งาน Spark ใน Dataproc คุณจะมีสิทธิ์เข้าถึง UI 2 แบบเพื่อตรวจสอบสถานะงาน / คลัสเตอร์ รายการแรกคือ UI ของ Dataproc ซึ่งคุณจะเห็นได้โดยคลิกไอคอนเมนูแล้วเลื่อนลงไปที่ Dataproc ในส่วนนี้ คุณจะเห็นหน่วยความจำที่ใช้ได้ในปัจจุบัน รวมถึงหน่วยความจำที่รอดำเนินการและจำนวนผู้ปฏิบัติงาน
นอกจากนี้ คุณยังคลิกแท็บงานเพื่อดูงานที่เสร็จแล้วได้ด้วย คุณดูรายละเอียดงาน เช่น บันทึกและเอาต์พุตของงานเหล่านั้นได้โดยคลิกรหัสงานของงานที่ต้องการ
นอกจากนี้ คุณยังดู UI ของ Spark ได้ด้วย จากหน้างาน ให้คลิกลูกศรย้อนกลับ แล้วคลิก "อินเทอร์เฟซเว็บ" คุณควรเห็นตัวเลือกหลายรายการในส่วนเกตเวย์คอมโพเนนต์ คุณสามารถเปิดใช้ฟีเจอร์เหล่านี้ได้หลายรายการผ่านคอมโพเนนต์ที่ไม่บังคับเมื่อตั้งค่าคลัสเตอร์ สําหรับห้องทดลองนี้ ให้คลิก "เซิร์ฟเวอร์ประวัติของ Spark
ซึ่งจะเป็นการเปิดหน้าต่างต่อไปนี้
งานที่ทำเสร็จแล้วทั้งหมดจะแสดงที่นี่ และคุณสามารถคลิก application_id ใดก็ได้เพื่อดูข้อมูลเพิ่มเติมเกี่ยวกับงาน ในทํานองเดียวกัน คุณสามารถคลิก "แสดงคําขอที่ยังไม่เสร็จสมบูรณ์" ที่ด้านล่างสุดของหน้า Landing Page เพื่อดูงานที่ทํางานอยู่ทั้งหมด
9. เรียกใช้งานทดแทน
ตอนนี้คุณจะต้องเรียกใช้งานที่โหลดข้อมูลลงในหน่วยความจํา ดึงข้อมูลที่จําเป็น และส่งออกเอาต์พุตไปยังที่เก็บข้อมูล Google Cloud Storage คุณจะดึงข้อมูล "ชื่อ" "เนื้อหา" (ข้อความดิบ) และ "การประทับเวลาที่สร้าง" สำหรับความคิดเห็น Reddit แต่ละรายการ จากนั้นให้นำข้อมูลนี้ไปแปลงเป็น CSV, ใส่ลงในไฟล์ ZIP และโหลดลงในที่เก็บข้อมูลที่มี URI ของ gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz
คุณสามารถไปที่ Cloud Editor อีกครั้งเพื่ออ่านโค้ดของ cloud-dataproc/codelabs/spark-bigquery/backfill.sh
ซึ่งเป็นสคริปต์ Wrapper เพื่อเรียกใช้โค้ดใน cloud-dataproc/codelabs/spark-bigquery/backfill.py
cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}
คุณควรเห็นข้อความแจ้งว่างานเสร็จสมบูรณ์หลายรายการในไม่ช้า การดำเนินการอาจใช้เวลาถึง 15 นาทีจึงจะเสร็จสมบูรณ์ นอกจากนี้ คุณยังตรวจสอบที่เก็บข้อมูลอีกครั้งเพื่อยืนยันว่าการส่งออกข้อมูลสําเร็จโดยใช้ gsutil ได้ด้วย เมื่องานทั้งหมดเสร็จแล้ว ให้เรียกใช้คําสั่งต่อไปนี้
gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz
คุณควรเห็นเอาต์พุตต่อไปนี้
ยินดีด้วย คุณได้ทดแทนข้อมูลความคิดเห็น Reddit เรียบร้อยแล้ว หากสนใจวิธีสร้างโมเดลจากข้อมูลนี้ โปรดไปที่ Codelab Spark-NLP
10. ล้างข้อมูล
โปรดดำเนินการดังนี้เพื่อเลี่ยงไม่ให้เกิดการเรียกเก็บเงินกับบัญชี GCP โดยไม่จำเป็นหลังจากการเริ่มต้นใช้งานอย่างรวดเร็วนี้เสร็จสมบูรณ์
- ลบที่เก็บข้อมูล Cloud Storage ของสภาพแวดล้อมที่คุณสร้างขึ้น
- ลบสภาพแวดล้อม Dataproc
หากสร้างโปรเจ็กต์มาเพื่อโค้ดแล็บนี้โดยเฉพาะ คุณก็ลบโปรเจ็กต์ได้เช่นกันโดยทำดังนี้
- ในคอนโซล GCP ให้ไปที่หน้าโปรเจ็กต์
- ในรายการโปรเจ็กต์ ให้เลือกโปรเจ็กต์ที่ต้องการลบ แล้วคลิกลบ
- ในช่อง ให้พิมพ์รหัสโปรเจ็กต์ แล้วคลิกปิดเพื่อลบโปรเจ็กต์
ใบอนุญาต
ผลงานนี้ได้รับอนุญาตภายใต้สัญญาอนุญาตครีเอทีฟคอมมอนส์สำหรับยอมรับสิทธิของผู้สร้าง (Creative Commons Attribution License) 3.0 ทั่วไป และสัญญาอนุญาต Apache 2.0