Echtzeitverarbeitung von IoT-Daten mit Dataproc und Google Managed Service for Apache Kafka
Informationen zu diesem Codelab
1. Einführung
Zuletzt aktualisiert:10.06.2024
Hintergrund
IoT-Geräte (Internet of Things), von Smart-Home-Lösungen bis hin zu Industriesensoren, generieren am Netzwerkrand riesige Datenmengen. Diese Daten sind für eine Vielzahl von Anwendungsfällen unerlässlich, z. B. für Geräteüberwachung, Tracking, Diagnose, Überwachung, Personalisierung und Flottenoptimierung. Der verwaltete Google-Dienst für Apache Kafka bietet eine skalierbare und langlebige Möglichkeit, diesen kontinuierlichen Datenstream auf OSS-kompatible, nutzerfreundliche und sichere Weise aufzunehmen und zu speichern. Google Cloud Dataproc ermöglicht die Verarbeitung dieser großen Datensätze für die Datenanalyse mit Apache Spark- und Hadoop-Clustern.
Umfang
In diesem Codelab erstellen Sie eine IoT-Datenverarbeitungspipeline mit Google Managed Service for Apache Kafka, Dataproc, Python und Apache Spark, die Echtzeitanalysen durchführt. Ihre Pipeline bietet folgende Vorteile:
- Daten von IoT-Geräten mit GCE-VMs in einem Managed Kafka-Cluster veröffentlichen
- Daten aus dem Managed Kafka-Cluster in einen Dataproc-Cluster streamen
- Daten mit einem Dataproc-Spark-Streaming-Job verarbeiten
Lerninhalte
- Von Google verwaltete Kafka- und Dataproc-Cluster erstellen
- Streamingjobs mit Dataproc ausführen
Voraussetzungen
- Ein aktives GCP-Konto mit einem eingerichteten Projekt. Wenn du noch kein Konto hast, kannst du dich für einen kostenlosen Testzeitraum registrieren.
- gcloud CLI muss installiert und konfiguriert sein. Folgen Sie der Anleitung unter gcloud CLI unter Ihrem Betriebssystem installieren.
- APIs für von Google verwaltetes Kafka und Dataproc in Ihrem GCP-Projekt aktiviert.
2. Übersicht
In diesem Codelab folgen wir der Geschichte eines fiktiven Unternehmens, DemoIOT Solutions. DemoIOT Solutions bietet Sensorgeräte, die Daten zu Temperatur, Luftfeuchtigkeit, Druck, Lichtstärke und Standort messen und übertragen. Er möchte Pipelines einrichten, die diese Daten verarbeiten, um seinen Kunden Echtzeitstatistiken zu präsentieren. Mithilfe solcher Pipelines können sie ihren Kunden eine Vielzahl von Diensten anbieten, z. B. Monitoring, automatisierte Vorschläge, Benachrichtigungen und Statistiken zu Orten, an denen die Kunden ihre Sensoren installiert haben.
Dazu verwenden wir eine GCE-VM, um das IoT-Gerät zu simulieren. Das Gerät veröffentlicht Daten in einem Kafka-Thema im von Google verwalteten Kafka-Cluster, das von einem Dataproc-Streamingjob gelesen und verarbeitet wird. In der Einrichtung der erforderlichen Voraussetzungen und auf den folgenden Seiten werden Sie durch alle diese Schritte geführt.
Voraussetzungen
- Suchen Sie den Projektnamen und die Projektnummer für Ihr Projekt. Weitere Informationen finden Sie unter Projektname, -nummer und -ID ermitteln.
- VPC-Subnetzwerk. Dadurch wird eine Verbindung zwischen der GCE-VM, dem Kafka-Cluster und dem Dataproc-Cluster hergestellt. Hier finden Sie eine Anleitung, wie Sie vorhandene Subnetze mit der gcloud CLI auflisten. Folgen Sie bei Bedarf der Anleitung unter VPC-Netzwerk im automatischen Modus erstellen, um ein VPC-Netzwerk mit Subnetz in jeder Google Cloud-Region zu erstellen. Für dieses Codelab verwenden wir jedoch nur ein Subnetz aus einer einzigen Region.
- Achten Sie darauf, dass in diesem Unternetzwerk eine Firewallregel vorhanden ist, die den gesamten Eingang von tcp:22 zulässt, was für SSH erforderlich ist. Diese Regel kann beim Erstellen eines Netzwerks im Bereich „Firewallregeln“ ausgewählt werden.
- GCS-Bucket Sie benötigen Zugriff auf einen Google Cloud Storage-Bucket, um Dataproc-Jobressourcen zu speichern und verarbeitete Daten zu speichern. Wenn Sie noch keine haben, können Sie eine in Ihrem GCP-Projekt erstellen.
Umgebungsvariablen füllen
Geben Sie diese Umgebungsvariablen in Ihrem Terminal ein, in dem Sie die gcloud CLI ausführen, damit sie später verwendet werden können.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
Ersetzen Sie Folgendes:
<project-id>
durch den Namen des von Ihnen eingerichteten GCP-Projekts.<project-number>
durch den Namen der Projektnummer aus Schritt 1 der Voraussetzungen.<region>
durch den Namen einer Region aus der Liste der verfügbaren Regionen und Zonen, die Sie verwenden möchten. Wir können beispielsweiseus-central1
verwenden.<zone>
durch den Namen der Zone aus Verfügbare Regionen und Zonen unter der zuvor ausgewählten Region. Wenn Sie beispielsweiseus-central1
als Region ausgewählt haben, können Sieus-central1-f
als Zone verwenden. In dieser Zone wird die GCE-VM erstellt, die IoT-Geräte simuliert. Achten Sie darauf, dass sich Ihre Zone in der von Ihnen ausgewählten Region befindet.<subnet-path>
durch den vollständigen Pfad des Subnetzes aus Schritt 2 der Voraussetzungen. Der Wert muss das Formatprojects/<project-id>/regions/<region>/subnetworks/<subnet-name>
haben.<bucket-name>
durch den Namen des GCS-Buckets aus Schritt 3 der Voraussetzungen.
3. Von Google verwaltetes Kafka einrichten
In diesem Abschnitt wird ein von Google verwalteter Kafka-Cluster eingerichtet, auf dem der Kafka-Server bereitgestellt wird. Außerdem wird auf diesem Server ein Thema erstellt, in dem die IoT-Daten veröffentlicht und gelesen werden können, nachdem sie abonniert wurden. DemoIOT Solutions kann diesen Cluster so einrichten, dass alle seine Geräte Daten darin veröffentlichen.
Managed Kafka-Cluster erstellen
- Erstellen Sie den verwalteten Kafka-Cluster. Hier lautet der Name des Clusters
kafka-iot
.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
Sie sollten eine Antwort ähnlich der folgenden erhalten:
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
Die Clustererstellung dauert 20 bis 30 Minuten. Warten Sie, bis dieser Vorgang abgeschlossen ist.
Thema erstellen
- Erstellen Sie ein verwaltetes Kafka-Thema auf dem Cluster. Hier lautet der Name des Themas
kafka-iot-topic
.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
Die Ausgabe sollte in etwa so aussehen:
Created topic [kafka-iot-topic].
4. Publisher einrichten
Um Daten im verwalteten Kafka-Cluster zu veröffentlichen, richten wir eine Google Compute Engine-VM-Instanz ein, die auf das VPC mit dem vom verwalteten Kafka-Cluster verwendeten Subnetz zugreifen kann. Diese VM simuliert die Sensorgeräte von DemoIOT Solutions.
Schritte
- Erstellen Sie die Google Compute Engine-VM-Instanz. Hier lautet der Name der GCE-VM
publisher-instance
.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Weisen Sie dem Standarddienstkonto der Google Compute Engine die Berechtigungen zur Verwendung des verwalteten Dienstes für Apache Kafka zu.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- Stellen Sie über SSH eine Verbindung zur VM her. Alternativ können Sie SSH über die Google Cloud Console verwenden.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Installieren Sie Java, um Kafka-Befehlszeilentools auszuführen, und laden Sie die Kafka-Binärdatei mit den folgenden Befehlen herunter.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- Laden Sie die Managed Kafka-Authentifizierungsbibliothek und die zugehörigen Abhängigkeiten herunter und konfigurieren Sie die Kafka-Clienteigenschaften.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
Weitere Informationen zur Einrichtung des Publishers-Rechners findest du unter Clientcomputer einrichten.
5. In Managed Kafka veröffentlichen
Nachdem der Publisher eingerichtet ist, können wir mit der Kafka-Befehlszeile einige Dummy-Daten von der GCE-VM (IoT-Geräte von DemoIOT Solutions simulieren) im verwalteten Kafka-Cluster veröffentlichen.
- Da wir eine SSH-Verbindung zur GCE-VM-Instanz hergestellt haben, müssen wir die Variable
PROJECT_ID
neu füllen:
export PROJECT_ID=<project-id>
export REGION=<region>
Ersetzen Sie Folgendes:
<project-id>
durch den Namen des von Ihnen eingerichteten GCP-Projekts.<region>
mit der Region, in der der Kafka-Cluster erstellt wurde
- Verwenden Sie den Befehl
managed-kafka clusters describe
, um die IP-Adresse des Kafka-Bootstrap-Servers abzurufen. Über diese Adresse kann eine Verbindung zum Kafka-Cluster hergestellt werden.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Listen Sie die Themen im Cluster auf:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
Sie sollten die folgende Ausgabe sehen, die das zuvor erstellte Thema kafka-iot-topic
enthält.
__remote_log_metadata
kafka-iot-topic
- Kopieren Sie dieses Script und fügen Sie es in eine neue Datei
publish_iot_data.sh
ein. Sie können ein Tool wievim
odernano
verwenden, um eine neue Datei auf der GCE-VM zu erstellen.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
Erklärung
- Dieses Script erstellt JSON-Nachrichten mit simulierten Sensorwerten, die Geräte-ID, Zeitstempel, Sensordaten (Temperatur, Luftfeuchtigkeit, Druck, Licht), Standortinformationen (Breiten- und Längengrad), Gerätestatus (Akku, Signal, Verbindungstyp) und einige Metadaten enthalten.
- Es generiert einen kontinuierlichen Nachrichtenfluss von einer festgelegten Anzahl von Geräten, die jeweils in einem bestimmten Zeitintervall Daten senden, um reale IoT-Geräte nachzuahmen. Hier veröffentlichen wir Daten von 10 Geräten, die jeweils 20 Messwerte mit einem Zeitintervall von 10 Sekunden generieren.
- Außerdem werden alle generierten Daten über das Kafka-Befehlszeilentool für Producer im Kafka-Thema veröffentlicht.
- Installieren Sie einige vom Script verwendete Abhängigkeiten: das
bc
-Paket für mathematische Berechnungen und dasjq
-Paket für die JSON-Verarbeitung.
sudo apt-get install bc jq
- Ändern Sie das Script in ein ausführbares Script und führen Sie es aus. Die Ausführung dauert etwa zwei Minuten.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
Sie können mit diesem Befehl prüfen, ob die Ereignisse veröffentlicht wurden. Dabei werden alle Ereignisse ausgegeben. Drücke zum Beenden auf <control-c>
.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Dataproc-Cluster einrichten
In diesem Abschnitt wird ein Dataproc-Cluster im VPC-Subnetz erstellt, in dem sich der verwaltete Kafka-Cluster befindet. Auf diesem Cluster werden Jobs ausgeführt, die die Echtzeitstatistiken und Statistiken generieren, die DemoIOT Solutions benötigt.
- Erstellen Sie einen Dataproc-Cluster. Hier lautet der Name des Clusters
dataproc-iot
.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
Sie sollten eine Antwort ähnlich der folgenden erhalten:
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
Die Erstellung eines Clusters kann 10 bis 15 Minuten dauern. Warten Sie, bis dieser Vorgang abgeschlossen ist, und prüfen Sie, ob der Cluster den Status RUNNING
hat.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Kafka-Nachrichten mit Dataproc verarbeiten
In diesem letzten Abschnitt reichen Sie einen Dataproc-Job ein, der die veröffentlichten Nachrichten mit Spark Streaming verarbeitet. Dieser Job generiert tatsächlich einige Statistiken und Statistiken in Echtzeit, die von DemoIOT Solutions verwendet werden können.
- Führen Sie diesen Befehl aus, um die Streaming-PySpark-Jobdatei
process_iot.py
lokal zu erstellen.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
Erklärung
- Mit diesem Code wird ein PySpark Structured Streaming-Job eingerichtet, um Daten aus einem bestimmten Kafka-Thema zu lesen. Es verwendet die angegebene Bootstrap-Adresse des Kafka-Servers und die aus einer GCS-Konfigurationsdatei geladenen Kafka-Konfigurationen, um eine Verbindung zum Kafka-Broker herzustellen und sich zu authentifizieren.
- Zuerst werden die Rohdaten aus Kafka als Stream von Byte-Arrays gelesen, diese Byte-Arrays in Strings umgewandelt und die
json_schema
mithilfe der StructType-Klasse von Spark angewendet, um die Struktur der Daten anzugeben (Geräte-ID, Zeitstempel, Standort, Sensordaten usw.). - Die ersten 10 Zeilen werden zur Vorschau in der Konsole ausgegeben, die durchschnittliche Temperatur pro Sensor berechnet und alle Daten im
avro
-Format in den GCS-Bucket geschrieben. Avro ist ein zeilenbasiertes Datenserialisierungssystem, das strukturierte Daten effizient in einem kompakten, schemadefinierten Binärformat speichert. Es bietet Schemaentwicklung, sprachenunabhängige Nutzung und eine hohe Komprimierung für die groß angelegte Datenverarbeitung.
- Erstellen Sie die Datei
client.properties
und füllen Sie die Umgebungsvariable für die Bootstrap-Adresse des Kafka-Servers aus.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Laden Sie
process_iot.py
- undclient.properties
-Dateien in Ihren Google Cloud Storage-Bucket hoch, damit sie vom Dataproc-Job verwendet werden können.
gsutil cp process_iot.py client.properties $BUCKET
- Kopieren Sie einige Abhängigkeits-JAR-Dateien für den Dataproc-Job in Ihren GCS-Bucket. Dieses Verzeichnis enthält JAR-Dateien, die zum Ausführen von Spark Streaming-Jobs mit Kafka erforderlich sind, sowie die Managed Kafka-Authentifizierungsbibliothek und ihre Abhängigkeiten aus Clientcomputer einrichten.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Senden Sie den Spark-Job an den Dataproc-Cluster.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
Die Spark-Treiberlogs werden gedruckt. Außerdem sollten Sie diese Tabellen in der Console sehen und Daten in Ihrem GCS-Bucket finden.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. Bereinigen
Folgen Sie der Anleitung, um nach Abschluss des Codelabs die Ressourcen zu bereinigen.
- Löschen Sie den verwalteten Kafka-Cluster, die GCE-VM des Publishers und den Dataproc-Cluster.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- Löschen Sie das VPC-Subnetz und das Netzwerk.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- Löschen Sie Ihren GCS-Bucket, wenn Sie die Daten nicht mehr verwenden möchten.
gcloud storage rm --recursive $BUCKET
9. Glückwunsch
Herzlichen Glückwunsch! Sie haben mit Manage Kafka und Dataproc eine IoT-Datenverarbeitungspipeline erstellt, mit der DemoIOT Solutions Echtzeitdaten zu den von ihren Geräten veröffentlichten Daten erhalten kann.
Sie haben einen verwalteten Kafka-Cluster erstellt, IoT-Ereignisse darin veröffentlicht und einen Dataproc-Job ausgeführt, bei dem diese Ereignisse mit Spark-Streaming in Echtzeit verarbeitet wurden. Sie kennen jetzt die wichtigsten Schritte zum Erstellen von Datenpipelines mit Managed Kafka und Dataproc.