ใหม่และแม่นยำ: การรวมสตรีมใน ELT

1. บทนำ

ภาพรวม

กรอบการทำงานสำหรับการวิเคราะห์สตรีมมิงมีความสำคัญมากขึ้นเรื่อยๆ ในระบบคลังข้อมูลร่วมสมัย โดยผู้ใช้แบบธุรกิจ ความต้องการการวิเคราะห์แบบเรียลไทม์ยังคงไม่หยุดนิ่ง เราได้สร้างความก้าวหน้าครั้งใหญ่เพื่อปรับปรุงความใหม่ของข้อมูลภายในคลังสินค้าและเพื่อรองรับการวิเคราะห์สตรีมมิงโดยทั่วไป แต่วิศวกรข้อมูลยังต้องเผชิญกับความท้าทายเมื่อนำแหล่งที่มาของสตรีมมิงเหล่านี้มาปรับใช้กับสถาปัตยกรรมคลังข้อมูลของตน

ในบล็อกนี้ เราพูดถึงปัญหาที่พบบ่อยที่สุดบางประการที่วิศวกรข้อมูลต้องเผชิญในการแก้ปัญหากรณีการใช้งานเหล่านี้ เราสรุปแนวคิดในการออกแบบและรูปแบบทางสถาปัตยกรรมบางส่วนเพื่อการรวบรวมข้อมูลสตรีมมิงอย่างมีประสิทธิภาพโดยใช้ BigQuery

ความใหม่และความถูกต้องของข้อมูล

แบบใหม่หมายความว่าเวลาในการตอบสนองของข้อมูลแบบรวมนั้นต่ำกว่าเกณฑ์บางอย่าง เช่น "อัปเดตจนถึงชั่วโมงล่าสุด" ความใหม่จะพิจารณาจากข้อมูลดิบชุดย่อยที่รวมอยู่ในข้อมูลรวม

เมื่อจัดการกับข้อมูลสตรีมมิง เป็นเรื่องปกติที่เหตุการณ์จะมาถึงล่าช้าภายในระบบประมวลผลข้อมูล ซึ่งหมายความว่าเวลาที่ระบบของเราประมวลผลเหตุการณ์จะช้ากว่าเวลาที่เหตุการณ์เกิดขึ้นอย่างมาก

เมื่อเราประมวลผลข้อเท็จจริงที่ส่งมาทีหลัง ค่าสถิติรวมของเราจะเปลี่ยนแปลง ซึ่งหมายความว่าค่าที่นักวิเคราะห์จะเห็นจะเปลี่ยนไปตามในแต่ละวัน[1] ค่าถูกต้องหมายความว่าสถิติรวมใกล้เคียงกับค่าที่ปรับยอดแล้วมากที่สุด

แน่นอนว่ามีมิติที่ 3 ในการเพิ่มประสิทธิภาพ ซึ่งก็คือต้นทุน ในแง่ของเงินและประสิทธิภาพ ตัวอย่างคือเราสามารถใช้มุมมองเชิงตรรกะสำหรับออบเจ็กต์ข้อมูลในการจัดระยะและการรายงานได้ ข้อเสียของการใช้มุมมองเชิงตรรกะคือทุกครั้งที่มีการค้นหาตารางรวม ระบบจะสแกนชุดข้อมูลดิบทั้งหมด ซึ่งจะทำงานช้าและใช้ต้นทุนสูง

คำอธิบายสถานการณ์

มาเตรียมความพร้อมให้กับกรณีการใช้งานนี้กัน เรากำลังจะนำเข้าข้อมูลสตรีมกิจกรรม Wikipedia ที่เผยแพร่โดย Wikimedia เป้าหมายของเราคือการสร้างลีดเดอร์บอร์ดที่จะแสดงให้ผู้เขียนเห็นสิ่งที่มีการเปลี่ยนแปลงมากที่สุด และจะอัปเดตให้เป็นปัจจุบันเมื่อมีการเผยแพร่บทความใหม่ ลีดเดอร์บอร์ดของเราซึ่งจะใช้เป็นแดชบอร์ด BI Engine จะรวบรวมเหตุการณ์ดิบโดยใช้ชื่อผู้ใช้เพื่อคำนวณคะแนน[2]

2. การออกแบบ

การแบ่งระดับข้อมูล

ในไปป์ไลน์ข้อมูล เราจะกำหนดระดับข้อมูลหลายๆ ระดับ เราจะเก็บข้อมูลเหตุการณ์ที่เป็นข้อมูลดิบ และสร้างไปป์ไลน์ของการเปลี่ยนรูปแบบ การสร้างคุณค่า และการรวบรวมในลำดับต่อๆ มา เราไม่เชื่อมโยงตารางการรายงานกับข้อมูลที่อยู่ในตารางข้อมูล RAW โดยตรง เนื่องจากเราต้องการรวมและรวมศูนย์การเปลี่ยนรูปแบบที่ทีมต่างๆ ให้ความสำคัญสำหรับข้อมูลแบบทีละขั้น

หลักการที่สำคัญในสถาปัตยกรรมนี้คือ ระบบสามารถคำนวณระดับที่สูงขึ้น (ขั้นบันไดและการรายงาน) ใหม่ได้ทุกเมื่อโดยใช้ข้อมูลดิบเท่านั้น

การแบ่งพาร์ติชัน

BigQuery รองรับการแบ่งพาร์ติชัน 2 รูปแบบ ได้แก่ การแบ่งพาร์ติชันช่วงจำนวนเต็มและการแบ่งพาร์ติชันวันที่ เราจะพิจารณาการแบ่งพาร์ติชันวันที่ที่อยู่ในขอบเขตของโพสต์นี้เท่านั้น

สำหรับการแบ่งพาร์ติชันวันที่ เราจะเลือกระหว่างพาร์ติชันเวลาการส่งผ่านข้อมูลหรือพาร์ติชันตามช่อง การแบ่งพาร์ติชันเวลาในการนำเข้าจะนำข้อมูลไปไว้ในพาร์ติชันตามเวลาที่ได้ข้อมูลมา นอกจากนี้ ผู้ใช้ยังเลือกพาร์ติชันเมื่อโหลดได้โดยระบุตัวตกแต่งพาร์ติชัน

การแบ่งพาร์ติชันช่องข้อมูลตามค่าวันที่หรือการประทับเวลาในคอลัมน์

สำหรับการส่งผ่านข้อมูลเหตุการณ์ เราจะนำข้อมูลไปไว้ในตารางที่แบ่งพาร์ติชันเวลาการนำเข้า เนื่องจากเวลาการส่งผ่านข้อมูลเกี่ยวข้องกับการประมวลผลหรือประมวลผลข้อมูลที่ได้รับในอดีตอีกครั้ง การทดแทนข้อมูลประวัติจะจัดเก็บไว้ในพาร์ติชันเวลาการนำเข้าได้ด้วย โดยอิงตามเวลาที่ข้อมูลดังกล่าวจะมาถึง

ใน Codelab นี้ เราจะถือว่าเราจะไม่ได้รับข้อเท็จจริงที่ส่งล่าช้า [3] จากสตรีมเหตุการณ์ของ Wikimedia ซึ่งจะลดความซับซ้อนของการโหลดที่เพิ่มขึ้นของตารางชั่วคราวตามที่อธิบายไว้ด้านล่าง

สำหรับตารางชั่วคราว เราจะแบ่งพาร์ติชันตามเวลาของเหตุการณ์ ทั้งนี้เนื่องจากนักวิเคราะห์ของเราสนใจที่จะค้นหาข้อมูลตามเวลาของเหตุการณ์ (เวลาที่บทความได้รับการเผยแพร่บน Wikipedia ไม่ใช่เวลาที่เหตุการณ์ได้รับการประมวลผลภายในไปป์ไลน์

3. สถาปัตยกรรม

สิ่งที่คุณจะสร้าง

เราจะใช้โปรโตคอล SSE ในการอ่านสตรีมเหตุการณ์จาก Wikimedia เราจะเขียนบริการมิดเดิลแวร์ขนาดเล็กที่จะอ่านจากสตรีมเหตุการณ์เป็นไคลเอ็นต์ SSE และจะเผยแพร่ไปยังหัวข้อ Pub/Sub ภายในสภาพแวดล้อม GCP ของเรา

เมื่อเหตุการณ์พร้อมใช้งานใน Pub/Sub แล้ว เราจะสร้างงาน Cloud Dataflow โดยใช้เทมเพลตที่จะสตรีมระเบียนไปยังระดับข้อมูลดิบของคลังข้อมูล BigQuery ขั้นตอนถัดไปคือการคำนวณสถิติรวมเพื่อสนับสนุนลีดเดอร์บอร์ดสดของเรา

631efe46d234f131.png

การกำหนดเวลาและการจัดการเป็นกลุ่ม

สำหรับการจัด ELT ที่ป้อนข้อมูลระดับขั้นแสดงและการรายงานของคลังสินค้า เราจะใช้ Dataform Dataform "มอบเครื่องมือ แนวทางปฏิบัติแนะนำ และเวิร์กโฟลว์ที่ได้รับแรงบันดาลใจมาจากวิศวกรรมซอฟต์แวร์" แก่ทีมวิศวกรข้อมูล นอกจากการจัดการเป็นกลุ่มและการกำหนดเวลาแล้ว Dataform ยังมีฟังก์ชันการทำงาน เช่น Assertions และ Tests เพื่อรับรองคุณภาพ การกำหนดการดำเนินการของคลังสินค้าที่กำหนดเองสำหรับการจัดการฐานข้อมูล และฟีเจอร์เอกสารประกอบเพื่อรองรับการค้นพบข้อมูล

ผู้เขียนขอขอบคุณทีม Dataform สำหรับการแสดงความคิดเห็นอันมีค่าในการรีวิวห้องทดลองและบล็อกนี้

ภายใน Dataform ระบบจะประกาศข้อมูลดิบที่สตรีมจาก Dataflow เป็นชุดข้อมูลภายนอก ระบบจะกำหนดตารางการทดลองใช้และการรายงานแบบไดนามิก โดยใช้ไวยากรณ์ SQLX ของ Dataform

เราจะใช้ฟีเจอร์การโหลดที่เพิ่มขึ้นของ Dataform เพื่อป้อนข้อมูลในตารางการทดลองใช้ โดยจะกำหนดเวลาให้โปรเจ็กต์ Dataform ทำงานทุกชั่วโมง ตามข้างต้น เราจะสันนิษฐานว่าเราจะไม่ได้รับข้อเท็จจริงที่มาถึงล่าช้า ดังนั้นตรรกะของเราจะเป็นการนำเข้าบันทึกที่มีเวลาของกิจกรรมซึ่งช้ากว่าเวลาของเหตุการณ์ล่าสุดในบันทึกที่จัดเตรียมไว้ต่างๆ ที่มีอยู่

ในห้องทดลองช่วงท้ายของซีรีส์นี้ เราจะพูดถึงการจัดการข้อเท็จจริงที่มาถึงล่าช้า

เมื่อเราเรียกใช้ทั้งโปรเจ็กต์ ระดับข้อมูลอัปสตรีมจะมีการเพิ่มระเบียนใหม่ทั้งหมด และจะคำนวณการรวมข้อมูลอีกครั้ง โดยเฉพาะอย่างยิ่ง การเรียกใช้แต่ละครั้งจะส่งผลให้ตารางรวมมีการรีเฟรชแบบเต็ม การออกแบบที่จับต้องได้ของเราจะประกอบด้วยการจัดกลุ่มตารางชั่วคราวตามชื่อผู้ใช้ ซึ่งจะช่วยเพิ่มประสิทธิภาพของคำค้นหาแบบรวมที่จะรีเฟรชลีดเดอร์บอร์ดนี้ทั้งหมด

สิ่งที่คุณต้องมี

  • Chrome เวอร์ชันล่าสุด
  • ความรู้พื้นฐานเกี่ยวกับ SQL และความคุ้นเคยกับ BigQuery เบื้องต้น

4. การตั้งค่า

สร้างชุดข้อมูลและตาราง BigQuery สำหรับระดับข้อมูลดิบ

สร้างชุดข้อมูลใหม่ที่จะมีสคีมาคลังสินค้า นอกจากนี้เราจะใช้ตัวแปรเหล่านี้ในภายหลัง ดังนั้นโปรดใช้เซสชัน Shell เดียวกันสำหรับขั้นตอนต่อไปนี้ หรือตั้งค่าตัวแปรตามที่จำเป็น อย่าลืมแทนที่ <PROJECT_ID> ด้วยรหัสโปรเจ็กต์

export PROJECT=<PROJECT_ID>
export DATASET=fresh_streams

bq --project_id $PROJECT mk $DATASET

ต่อไปเราจะสร้างตารางที่จะเก็บเหตุการณ์ดิบโดยใช้คอนโซล GCP สคีมาจะจับคู่ฟิลด์ที่เราฉายภาพจากสตรีมเหตุการณ์ของการเปลี่ยนแปลงที่เผยแพร่ซึ่งเรากำลังบริโภคจาก Wikimedia

CREATE TABLE fresh_streams.wiki_changes
(
  id INT64,
  user STRING,
  title STRING,
  timestamp TIMESTAMP
)
PARTITION BY DATE(_PARTITIONTIME)
CLUSTER BY user

สร้างหัวข้อ Pub/Sub และการสมัครใช้บริการ

export TOPIC=<TOPIC_ID>

gcloud pubsub topics create $TOPIC

สร้างบัญชีและโปรเจ็กต์ Dataform

ไปที่ https://app.dataform.co แล้วสร้างบัญชีใหม่ เมื่อเข้าสู่ระบบแล้ว คุณจะต้องสร้างโปรเจ็กต์ใหม่

ภายในโปรเจ็กต์ คุณจะต้องกำหนดค่าการผสานรวมกับ BigQuery เนื่องจาก Dataform จะต้องเชื่อมต่อกับคลังสินค้า เราจึงต้องจัดสรรข้อมูลเข้าสู่ระบบบัญชีบริการ

โปรดทำตามขั้นตอนที่ลิงก์ด้านบนภายในเอกสาร Dataform คุณจะต้องกำหนดค่าการเชื่อมต่อกับ BigQuery ในหน้าฐานข้อมูล โปรดเลือกรหัสโปรเจ็กต์เดียวกับที่คุณสร้างไว้ข้างต้น จากนั้นอัปโหลดข้อมูลเข้าสู่ระบบและทดสอบการเชื่อมต่อ

3f4aacdee4000234.png

เมื่อกำหนดค่าการผสานรวม BigQuery แล้ว คุณจะเห็นชุดข้อมูลภายในแท็บการประมาณ โดยเฉพาะอย่างยิ่ง ตาราง Raw ที่เราใช้เพื่อบันทึกเหตุการณ์จาก Dataflow จะแสดงที่นี่ เรามากลับมาดูเรื่องนี้กันเร็วๆ นี้

5. การใช้งาน

สร้างบริการ Python สำหรับการอ่านและเผยแพร่เหตุการณ์ไปยัง Pub/Sub

โปรดดูโค้ด Python ด้านล่างที่อยู่ในส่วนนี้ด้วย โปรดดูเอกสาร Pub/Sub API ในตัวอย่างนี้

ลองจดรายการคีย์ในโค้ด ซึ่งเป็นช่องที่เรากำลังจะฉายจากเหตุการณ์ JSON แบบเต็ม ยังคงอยู่ในข้อความที่เผยแพร่ และสุดท้ายอยู่ในตาราง wiki_changes ภายในระดับ Raw ของชุดข้อมูล BigQuery

ข้อมูลเหล่านี้ตรงกับสคีมาตาราง wiki_changes ที่เรากำหนดไว้ในชุดข้อมูล BigQuery สำหรับ wiki_changes

#!/usr/bin/env python3

import json, time, sys, os
from sseclient import SSEClient as EventSource

from google.cloud import pubsub_v1

project_id = os.environ['PROJECT']
topic_name = os.environ['TOPIC']

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

futures = dict()

url = 'https://stream.wikimedia.org/v2/stream/recentchange'

keys = ['id', 'timestamp', 'user', 'title']

for event in EventSource(url):
    if event.event == 'message':
        try:
            change = json.loads(event.data)
            changePub = {k: change.get(k, 0) for k in keys}
        except ValueError:
            pass
        else:
            payloadJson = json.dumps(changePub).encode('utf-8')
            future = publisher.publish(
                   topic_path, data=payloadJson)
            futures[payloadJson] = future

while futures:
    time.sleep(5)

6. การใช้งาน ต่อ

สร้างงาน Dataflow จากเทมเพลตเพื่ออ่านจาก Pub/Sub และเขียนไปยัง BigQuery

เมื่อเผยแพร่เหตุการณ์การเปลี่ยนแปลงล่าสุดไปยังหัวข้อ Pub/Sub แล้ว เราจะใช้ประโยชน์จากงาน Cloud Dataflow เพื่ออ่านและเขียนเหตุการณ์ดังกล่าวลงใน BigQuery ได้

หากเรามีความต้องการที่ซับซ้อนขณะประมวลผลสตรีม ให้ลองคิดเข้าร่วมสตรีมที่แยกกัน สร้างการรวมหน้าต่างโดยใช้การค้นหาเพื่อเพิ่มคุณค่าให้กับข้อมูล เราก็จะใช้สตรีมดังกล่าวในโค้ด Apacheบีมได้

เนื่องจากความต้องการสำหรับกรณีการใช้งานนี้ไม่ซับซ้อน เราจึงใช้เทมเพลต Dataflow แบบพร้อมใช้งานได้ทันทีโดยไม่ต้องปรับแต่งใดๆ เราดำเนินการดังกล่าวได้โดยตรงจากคอนโซล GCP ใน Cloud Dataflow

92cc945b5a22632f.png

เราจะใช้หัวข้อ Pub/Sub กับเทมเพลต BigQuery จากนั้นเพียงกำหนดค่าบางอย่างในเทมเพลต Dataflow รวมถึงหัวข้ออินพุต Pub/Sub และตารางเอาต์พุต BigQuery

b63c3a61733b4d9.png

7. การใช้งาน, ขั้นตอน Dataform

ตารางโมเดลใน Dataform

โมเดล Dataform ของเราเชื่อมโยงกับที่เก็บ GitHub ต่อไปนี้ โดยโฟลเดอร์คำจำกัดความจะมีไฟล์ SQLX ที่กำหนดโมเดลข้อมูล

ตามที่กล่าวไว้ในส่วนการจัดกำหนดการและการจัดการกลุ่ม เราจะกำหนดตารางชั่วคราวใน Dataform ที่รวบรวมระเบียนดิบจาก wiki_changes ลองดู DDL สำหรับตารางการทดลองใช้ (ซึ่งลิงก์อยู่ในที่เก็บ GitHub ที่เชื่อมโยงกับโปรเจ็กต์ Dataform ของเราด้วย)

โปรดทราบคุณลักษณะที่สำคัญบางประการของตารางนี้:

  • โดยได้รับการกําหนดค่าเป็นประเภทที่เพิ่มขึ้น ดังนั้นเมื่องาน ELT ที่กำหนดเวลาไว้ทํางานอยู่ ระบบจะเพิ่มเฉพาะระเบียนใหม่เท่านั้น
  • ตามที่แสดงโดยโค้ด when() ที่ด้านล่าง ตรรกะสำหรับกรณีนี้จะขึ้นอยู่กับช่องการประทับเวลา ซึ่งแสดงถึงการประทับเวลาในสตรีมเหตุการณ์ กล่าวคือ event_time ของการเปลี่ยนแปลง
  • โดยจะมีการคลัสเตอร์โดยใช้ช่อง user ซึ่งหมายความว่าผู้ใช้จะเรียงลำดับระเบียนภายในแต่ละพาร์ติชัน ซึ่งช่วยลดการสับเปลี่ยนที่จำเป็นในการค้นหาที่สร้างลีดเดอร์บอร์ด
config {
  type: "incremental",
  schema: "wiki_push",
  bigquery: {
    partitionBy: "date(event_time)",
    clusterBy: ["user"]
  }
}

select
  user,
  title,
  timestamp as event_time,
  current_timestamp() as processed_time
from
  wiki_push.wiki_changes

${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

ตารางอีกตารางที่เราต้องกำหนดในโปรเจ็กต์คือตารางระดับการรายงาน ซึ่งจะรองรับคำค้นหาลีดเดอร์บอร์ด ตารางในระดับการรายงานจะรวบรวมไว้ เนื่องจากผู้ใช้ของเรากังวลเกี่ยวกับจำนวนการเปลี่ยนแปลง Wikipedia ที่เผยแพร่ใหม่และถูกต้อง

คำจำกัดความของตารางไม่ซับซ้อน และใช้ประโยชน์จากการอ้างอิงของ Dataform ข้อดีหลักๆ ของฟีเจอร์อ้างอิงเหล่านี้คือช่วยให้การอ้างอิงที่ชัดเจนว่าทรัพยากร Dependency ระหว่างออบเจ็กต์อย่างชัดเจน ซึ่งช่วยสนับสนุนความถูกต้องของไปป์ไลน์ โดยดูแลให้ทรัพยากร Dependency ดำเนินการก่อนการค้นหาที่อ้างอิงเสมอ

config {
  type: "table",
  schema: "wiki_push"
}

select
  user,
  count(*) as changesCount
from
${ref("wiki_staged")}
group by user

ตั้งเวลาโปรเจ็กต์ Dataform

ขั้นตอนสุดท้ายคือการสร้างกำหนดการที่จะดำเนินการทุกชั่วโมง เมื่อมีการเรียกใช้โปรเจ็กต์ Dataform จะดำเนินการตามคำสั่ง SQL ที่จำเป็นเพื่อรีเฟรชตารางชั่วคราวที่เพิ่มขึ้นและโหลดตารางรวมอีกครั้ง

กำหนดการนี้สามารถเรียกใช้ได้ทุกชั่วโมงหรือบ่อยกว่านั้นคือทุก 5-10 นาทีโดยประมาณเพื่อให้ลีดเดอร์บอร์ดอัปเดตตามเหตุการณ์ล่าสุดที่สตรีมเข้ามาในระบบอยู่เสมอ

9467013210f617ac.png

8. ขอแสดงความยินดี

ยินดีด้วย คุณสร้างสถาปัตยกรรมข้อมูลแบบเป็นขั้นให้กับข้อมูลที่สตรีมสำเร็จแล้ว

เราเริ่มต้นจากสตรีมเหตุการณ์ Wikimedia และได้เปลี่ยนข้อมูลนี้เป็นตารางการรายงานใน BigQuery ที่เป็นปัจจุบันอยู่เสมอ

b6a06b79bdaf8316.png

สิ่งที่ต้องทำต่อไป

อ่านเพิ่มเติม

[1] เป็นเรื่องปกติที่วิศวกรข้อมูลจะทําการเปลี่ยนรูปแบบเป็นกลุ่มแบบรายวันเพื่อเขียนทับข้อมูลรวมระหว่างวัน (เช่น รายชั่วโมง) หรือที่เรียกว่าการปรับยอด

[2] สำหรับรายละเอียดการใช้งาน โปรดดูที่ส่วนสถาปัตยกรรม

[3] ข้อเท็จจริงที่มาถึงล่าช้าคือเหตุการณ์ที่มี event_time ซึ่งช้ากว่าบันทึกที่ประมวลผลแล้วโดยระบบภายในสตรีมเหตุการณ์เดียวกันนี้