Aktuell und präzise: Streams im ELT zusammenfassen

1. Einführung

Übersicht

Frameworks für Streaminganalysen werden in modernen Data-Warehouse-Prozessen immer wichtiger, da Geschäftsanwender Die Nachfrage nach Echtzeitanalysen ist ungebrochen. Es wurden große Fortschritte gemacht, um die Datenaktualität in Warehouses zu verbessern und Streaminganalysen im Allgemeinen zu unterstützen. Doch Data Engineers stehen bei der Anpassung dieser Streamingquellen an ihre Data-Warehouse-Architektur immer noch vor Herausforderungen.

In diesem Blog sprechen wir über einige der häufigsten Herausforderungen, mit denen Data Engineers bei der Lösung dieser Anwendungsfälle konfrontiert sind. Wir erläutern einige Designideen und Architekturmuster für eine effiziente Aggregation von Streamingdaten mit BigQuery.

Datenaktualität und -genauigkeit

Mit aktuell meinen wir, dass die Datenlatenz der Einheit unter einem bestimmten Grenzwert liegt, z.B. „bis zur letzten Stunde“. Die Aktualität wird anhand der Teilmenge der Rohdaten bestimmt, die in den Zusammenfassungen enthalten sind.

Bei Streamingdaten kommt es häufig zu spät in unserem Datenverarbeitungssystem, d. h. dass der Zeitpunkt, zu dem unser System ein Ereignis verarbeitet, deutlich später als der Zeitpunkt des Ereignisses liegt.

Wenn wir die spät eintreffenden Fakten verarbeiten, ändern sich die Werte unserer aggregierten Statistiken, was bedeutet, dass sich die Werte, die Analysten im Tagesverlauf sehen, ändern werden[1]. Mit genau ist gemeint, dass die aggregierten Statistiken den endgültigen abgeglichenen Werten so nahe wie möglich kommen.

Es gibt noch eine dritte Dimension, die optimiert werden muss: die Kosten – im Sinne von Geld und Leistung. Zur Veranschaulichung könnten wir eine logische Ansicht für die Datenobjekte in Staging und Berichterstellung verwenden. Der Nachteil einer logischen Ansicht wäre, dass bei jeder Abfrage der aggregierten Tabelle das gesamte Roh-Dataset gescannt wird, was langsam und teuer sein wird.

Szenariobeschreibung

Lassen Sie uns die Voraussetzungen für diesen Anwendungsfall schaffen. Wir werden Daten von Wikipedia Event Streams aufnehmen, die von Wikimedia veröffentlicht werden. Unser Ziel ist es, eine Bestenliste zu erstellen, auf der die Autoren mit den meisten Änderungen angezeigt werden und die immer auf dem neuesten Stand ist, sobald neue Artikel veröffentlicht werden. Unsere Bestenliste, die als BI Engine-Dashboard implementiert wird, aggregiert die Rohereignisse nach Nutzernamen, um die Punktzahlen zu berechnen[2].

2. Design

Datenstufen

In der Datenpipeline definieren wir mehrere Datenebenen. Wir speichern die Rohdaten der Ereignisse und erstellen eine Pipeline mit nachfolgenden Transformationen, Anreicherung und Aggregation. Wir verbinden die Berichtstabellen nicht direkt mit den Daten in Rohtabellen, da wir die Transformationen vereinheitlichen und zentralisieren möchten, die den verschiedenen Teams für die bereitgestellten Daten wichtig sind.

Ein wichtiges Prinzip in dieser Architektur besteht darin, dass die höheren Ebenen – Staging und Berichterstellung – jederzeit nur unter Verwendung der Rohdaten neu berechnet werden können.

Partitionierung

BigQuery unterstützt zwei Partitionierungsarten: Partitionierung nach Ganzzahlbereich und Datumspartitionierung. Wir berücksichtigen in diesem Beitrag nur die Datumspartitionierung.

Bei der Datumspartitionierung können Sie zwischen Aufnahmezeitpartitionen oder feldbasierten Partitionen wählen. Bei der Partitionierung nach Aufnahmezeit werden Daten basierend auf dem Zeitpunkt ihrer Erfassung an eine Partition gesendet. Nutzer können beim Laden auch eine Partition auswählen, indem sie einen Partitions-Decorator angeben.

Bei der Feldpartitionierung werden Daten anhand des Datums- oder Zeitstempelwerts in einer Spalte partitioniert.

Zur Aufnahme von Ereignissen werden Daten in eine nach Aufnahmezeit partitionierte Tabelle übertragen. Das liegt daran, dass die Aufnahmezeit für die Verarbeitung oder erneute Verarbeitung von in der Vergangenheit empfangenen Daten relevant ist. Backfills von Verlaufsdaten können auch in Partitionen mit Aufnahmezeit gespeichert werden, je nachdem, wann sie eingetroffen wären.

In diesem Codelab gehen wir davon aus, dass wir keine spät ankommenden Fakten[3] aus dem Wikimedia-Ereignisstream erhalten. Dies vereinfacht das inkrementelle Laden der Staging-Tabelle, wie unten erläutert.

Bei der Staging-Tabelle wird nach der Ereigniszeit partitioniert. Das liegt daran, dass unsere Analysten daran interessiert sind, Daten basierend auf dem Zeitpunkt des Ereignisses – dem Zeitpunkt der Veröffentlichung des Artikels auf Wikipedia – – und nicht dem Zeitpunkt, zu dem das Ereignis innerhalb der Pipeline verarbeitet wurde, abzufragen.

3. Architektur

Inhalt

Zum Lesen des Ereignisstreams aus Wikimedia verwenden wir das SSE. Wir schreiben einen kleinen Middleware-Dienst, der als SSE-Client aus dem Ereignisstream liest und in einem Pub/Sub-Thema in unserer GCP-Umgebung veröffentlicht.

Sobald die Ereignisse in Pub/Sub verfügbar sind, erstellen wir mithilfe einer Vorlage einen Cloud Dataflow-Job, der die Datensätze in unsere Rohdatenstufe in unserem BigQuery Data Warehouse streamt. Der nächste Schritt besteht darin, die zusammengefassten Statistiken für unsere Live-Bestenliste zu berechnen.

631efe46d234f131.png

Planung und Orchestrierung

Zur Orchestrierung des ELT, der die Staging- und Reporting-Ebenen des Warehouse füllt, verwenden wir Dataform. Dataform „bringt Tools, Best Practices und von der Softwareentwicklung inspirierte Workflows“ Data-Engineering-Teams. Neben Orchestrierung und Planung bietet Dataform Funktionen wie Assertions und Tests zur Qualitätssicherung, die Definition benutzerdefinierter Warehouse-Vorgänge für die Datenbankverwaltung sowie Dokumentationsfunktionen zur Unterstützung der Datenerkennung.

Die Autoren danken dem Dataform-Team für das wertvolle Feedback bei der Bewertung dieses Labs und Blogs.

In Dataform werden die aus Dataflow eingehenden Rohdaten als externes Dataset deklariert. Die Staging- und Berichtstabellen werden mithilfe der SQLX-Syntax von Dataform dynamisch definiert.

Wir verwenden die inkrementelle Ladefunktion von Dataform, um die Staging-Tabelle zu füllen, und planen, dass das Dataform-Projekt stündlich ausgeführt wird. Wie oben beschrieben gehen wir davon aus, dass wir keine spät ankommenden Fakten erhalten werden. Unsere Logik besteht also darin, Datensätze mit einer Ereigniszeit aufzunehmen, die nach der letzten Ereigniszeit der vorhandenen bereitgestellten Datensätze liegt.

In späteren Labs dieser Reihe besprechen wir den Umgang mit spät ankommenden Fakten.

Wenn wir das gesamte Projekt ausführen, werden den vorgelagerten Datenebenen alle neuen Datensätze hinzugefügt und unsere Aggregationen werden neu berechnet. Insbesondere führt jede Ausführung zu einer vollständigen Aktualisierung der aggregierten Tabelle. Unser physisches Design sieht vor, dass die Staging-Tabelle nach Nutzername geclustert wird, wodurch die Leistung der Aggregationsabfrage weiter erhöht wird, durch die diese Bestenliste vollständig aktualisiert wird.

Voraussetzungen

  • Eine aktuelle Version von Chrome
  • Grundkenntnisse in SQL und Grundkenntnisse in BigQuery

4. Einrichtung

BigQuery-Dataset und -Tabelle für Rohstufe erstellen

Erstellen Sie ein neues Dataset, das unser Warehouse-Schema enthält. Diese Variablen werden auch später noch verwendet. Verwenden Sie daher für die folgenden Schritte dieselbe Shell-Sitzung oder legen Sie die Variablen nach Bedarf fest. Ersetzen Sie <PROJECT_ID> durch die Projekt-ID.

export PROJECT=<PROJECT_ID>
export DATASET=fresh_streams

bq --project_id $PROJECT mk $DATASET

Als Nächstes erstellen wir mit der GCP Console eine Tabelle, die die Rohereignisse enthält. Das Schema stimmt mit den Feldern überein, die wir aus dem Ereignisstream der veröffentlichten Änderungen, die wir aus Wikimedia abrufen, projizieren.

CREATE TABLE fresh_streams.wiki_changes
(
  id INT64,
  user STRING,
  title STRING,
  timestamp TIMESTAMP
)
PARTITION BY DATE(_PARTITIONTIME)
CLUSTER BY user

Pub/Sub-Thema und -Abo erstellen

export TOPIC=<TOPIC_ID>

gcloud pubsub topics create $TOPIC

Dataform-Konto und -Projekt erstellen

Rufen Sie https://app.dataform.co auf und erstellen Sie ein neues Konto. Sobald Sie angemeldet sind, erstellen Sie ein neues Projekt.

Innerhalb Ihres Projekts müssen Sie die Integration in BigQuery konfigurieren. Da Dataform eine Verbindung zum Warehouse herstellen muss, müssen Sie Dienstkonto-Anmeldedaten bereitstellen.

Führen Sie die oben in den Dataform-Dokumenten verlinkten Schritte aus. Auf der Seite „Datenbank“ konfigurieren Sie die Verbindung zu BigQuery. Achten Sie darauf, dieselbe Projekt-ID auszuwählen, die Sie oben erstellt haben. Laden Sie dann die Anmeldedaten hoch und testen Sie die Verbindung.

3f4aacdee4000234.png

Nachdem Sie die BigQuery-Integration konfiguriert haben, werden auf dem Tab „Modellierung“ Datasets angezeigt. Hier ist insbesondere die Raw-Tabelle vorhanden, die wir zum Erfassen von Ereignissen aus Dataflow verwenden. Kommen wir gleich darauf zurück.

5. Implementierung

Python-Dienst zum Lesen und Veröffentlichen von Ereignissen in Pub/Sub erstellen

Sehen Sie sich den Python-Code unten an, der auch in diesem Gist verfügbar ist. In diesem Beispiel richten wir uns nach den Pub/Sub API-Dokumenten.

Sehen wir uns die Liste keys im Code an. Dies sind die Felder, die wir aus dem vollständigen JSON-Ereignis projizieren, in den veröffentlichten Nachrichten und schließlich in der Tabelle wiki_changes auf der Raw-Ebene unseres BigQuery-Datasets beibehalten werden.

Diese stimmen mit dem Tabellenschema wiki_changes überein, das wir in unserem BigQuery-Dataset für wiki_changes definiert haben.

#!/usr/bin/env python3

import json, time, sys, os
from sseclient import SSEClient as EventSource

from google.cloud import pubsub_v1

project_id = os.environ['PROJECT']
topic_name = os.environ['TOPIC']

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

futures = dict()

url = 'https://stream.wikimedia.org/v2/stream/recentchange'

keys = ['id', 'timestamp', 'user', 'title']

for event in EventSource(url):
    if event.event == 'message':
        try:
            change = json.loads(event.data)
            changePub = {k: change.get(k, 0) for k in keys}
        except ValueError:
            pass
        else:
            payloadJson = json.dumps(changePub).encode('utf-8')
            future = publisher.publish(
                   topic_path, data=payloadJson)
            futures[payloadJson] = future

while futures:
    time.sleep(5)

6. Implementierung (Fortsetzung)

Dataflow-Job aus Vorlage erstellen, um aus Pub/Sub zu lesen und in BigQuery zu schreiben

Sobald die letzten Änderungsereignisse im Pub/Sub-Thema veröffentlicht wurden, können wir diese Ereignisse mit einem Cloud Dataflow-Job lesen und in BigQuery schreiben.

Wenn wir bei der Verarbeitung des Streams besondere Anforderungen hätten, könnten wir diese in unseren Apache Beam-Code implementieren, z. B. wenn wir unterschiedliche Streams zusammenführen, Fensteraggregationen erstellen oder Daten mithilfe von Lookups anreichern.

Da unsere Anforderungen für diesen Anwendungsfall einfacher sind, können wir die vorkonfigurierte Dataflow-Vorlage verwenden und müssen keine Anpassungen daran vornehmen. Dies ist direkt über die GCP Console in Cloud Dataflow möglich.

92cc945b5a22632f.png

Wir verwenden die Vorlage „Pub/Sub-Thema für BigQuery“ und müssen nur noch ein paar Dinge in der Dataflow-Vorlage konfigurieren, einschließlich des Pub/Sub-Eingabethemas und der BigQuery-Ausgabetabelle.

b63c3a61733b4d9.png

7. Implementierung, Dataform-Schritte

Modelltabellen in Dataform

Unser Dataform-Modell ist an das folgende GitHub-Repository gebunden. Der Definitionsordner enthält die SQLX-Dateien, die das Datenmodell definieren.

Wie im Abschnitt Planung und Orchestrierung erläutert, definieren wir eine Staging-Tabelle in Dataform, die die Rohdaten aus wiki_changes aggregiert. Werfen wir einen Blick auf die DDL für die Staging-Tabelle (auch im GitHub-Repository verlinkt, das mit unserem Dataform-Projekt verknüpft ist).

Beachten Sie einige wichtige Funktionen dieser Tabelle:

  • Er ist als inkrementeller Typ konfiguriert. Wenn unsere geplanten ELT-Jobs ausgeführt werden, werden also nur neue Datensätze hinzugefügt.
  • Wie durch den if()-Code am unteren Rand ausgedrückt, basiert die Logik hierfür auf dem Zeitstempelfeld, das den Zeitstempel im Ereignisstream, d. h. die event_time der Änderung, widerspiegelt.
  • Es wird mithilfe des Felds user geclustert. Das bedeutet, dass die Datensätze in jeder Partition nach user sortiert werden, wodurch der Shuffle-Aufwand für die Abfrage zum Erstellen der Bestenliste reduziert wird.
config {
  type: "incremental",
  schema: "wiki_push",
  bigquery: {
    partitionBy: "date(event_time)",
    clusterBy: ["user"]
  }
}

select
  user,
  title,
  timestamp as event_time,
  current_timestamp() as processed_time
from
  wiki_push.wiki_changes

${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

Die andere Tabelle, die wir in unserem Projekt definieren müssen, ist die Tabelle der Reporting-Stufe, die die Abfragen der Bestenliste unterstützt. Die Tabellen auf der Berichtsebene werden aggregiert, da unsere Nutzer die aktuelle und genaue Anzahl der veröffentlichten Wikipedia-Änderungen berücksichtigen möchten.

Die Tabellendefinition ist unkompliziert und verwendet Dataform-Referenzen. Ein großer Vorteil dieser Verweise besteht darin, dass sie die Abhängigkeiten zwischen Objekten explizit machen und die Richtigkeit der Pipeline unterstützen, indem sichergestellt wird, dass Abhängigkeiten immer vor abhängigen Abfragen ausgeführt werden.

config {
  type: "table",
  schema: "wiki_push"
}

select
  user,
  count(*) as changesCount
from
${ref("wiki_staged")}
group by user

Dataform-Projekt planen

Im letzten Schritt erstellen Sie einfach einen Zeitplan, der stündlich ausgeführt wird. Wenn das Projekt aufgerufen wird, führt Dataform die erforderlichen SQL-Anweisungen aus, um die inkrementelle Staging-Tabelle und die aggregierte Tabelle neu zu laden.

Dieser Zeitplan kann stündlich – oder sogar häufiger (etwa alle 5 bis 10 Minuten) – aufgerufen werden, damit die Bestenliste mit den neuesten Ereignissen aktualisiert wird, die in das System gestreamt wurden.

9467013210f617ac.png

8. Glückwunsch

Herzlichen Glückwunsch. Sie haben erfolgreich eine mehrstufige Datenarchitektur für Ihre gestreamten Daten erstellt.

Wir haben mit einem Wikimedia-Ereignisstream begonnen und diesen in eine Berichtstabelle in BigQuery umgewandelt, die immer aktuell ist.

b6a06b79bdaf8316.png

Was liegt als Nächstes an?

Weitere Informationen

[1] Data Engineers führen häufig eine tägliche Batchtransformation durch, um die täglichen (z. B. stündlichen) aggregierten Daten zu überschreiben. Dies wird als Abgleich bezeichnet.

[2] Weitere Informationen zur Implementierung finden Sie im Abschnitt „Architektur“.

[3] Eine spät eingetroffene Tatsache ist ein Ereignis mit einer Ereigniszeit (event_time), die nach den Datensätzen liegt, die bereits vom System innerhalb desselben Ereignisstreams verarbeitet wurden.