Echtzeitverarbeitung von IoT-Daten mit Dataproc und Google Managed Service for Apache Kafka

Echtzeitverarbeitung von IoT-Daten mit Dataproc und Google Managed Service for Apache Kafka

Informationen zu diesem Codelab

subjectZuletzt aktualisiert: Juni 16, 2025
account_circleVerfasst von Devanshi Khatsuriya

1. Einführung

2efacab8643a653b.png

Zuletzt aktualisiert:10.06.2024

Hintergrund

IoT-Geräte (Internet of Things), von Smart-Home-Lösungen bis hin zu Industriesensoren, generieren am Netzwerkrand riesige Datenmengen. Diese Daten sind für eine Vielzahl von Anwendungsfällen unerlässlich, z. B. für Geräteüberwachung, Tracking, Diagnose, Überwachung, Personalisierung und Flottenoptimierung. Der verwaltete Google-Dienst für Apache Kafka bietet eine skalierbare und langlebige Möglichkeit, diesen kontinuierlichen Datenstream auf OSS-kompatible, nutzerfreundliche und sichere Weise aufzunehmen und zu speichern. Google Cloud Dataproc ermöglicht die Verarbeitung dieser großen Datensätze für die Datenanalyse mit Apache Spark- und Hadoop-Clustern.

Umfang

In diesem Codelab erstellen Sie eine IoT-Datenverarbeitungspipeline mit Google Managed Service for Apache Kafka, Dataproc, Python und Apache Spark, die Echtzeitanalysen durchführt. Ihre Pipeline bietet folgende Vorteile:

  • Daten von IoT-Geräten mit GCE-VMs in einem Managed Kafka-Cluster veröffentlichen
  • Daten aus dem Managed Kafka-Cluster in einen Dataproc-Cluster streamen
  • Daten mit einem Dataproc-Spark-Streaming-Job verarbeiten

Lerninhalte

  • Von Google verwaltete Kafka- und Dataproc-Cluster erstellen
  • Streamingjobs mit Dataproc ausführen

Voraussetzungen

2. Übersicht

In diesem Codelab folgen wir der Geschichte eines fiktiven Unternehmens, DemoIOT Solutions. DemoIOT Solutions bietet Sensorgeräte, die Daten zu Temperatur, Luftfeuchtigkeit, Druck, Lichtstärke und Standort messen und übertragen. Er möchte Pipelines einrichten, die diese Daten verarbeiten, um seinen Kunden Echtzeitstatistiken zu präsentieren. Mithilfe solcher Pipelines können sie ihren Kunden eine Vielzahl von Diensten anbieten, z. B. Monitoring, automatisierte Vorschläge, Benachrichtigungen und Statistiken zu Orten, an denen die Kunden ihre Sensoren installiert haben.

Dazu verwenden wir eine GCE-VM, um das IoT-Gerät zu simulieren. Das Gerät veröffentlicht Daten in einem Kafka-Thema im von Google verwalteten Kafka-Cluster, das von einem Dataproc-Streamingjob gelesen und verarbeitet wird. In der Einrichtung der erforderlichen Voraussetzungen und auf den folgenden Seiten werden Sie durch alle diese Schritte geführt.

Voraussetzungen

  1. Suchen Sie den Projektnamen und die Projektnummer für Ihr Projekt. Weitere Informationen finden Sie unter Projektname, -nummer und -ID ermitteln.
  2. VPC-Subnetzwerk. Dadurch wird eine Verbindung zwischen der GCE-VM, dem Kafka-Cluster und dem Dataproc-Cluster hergestellt. Hier finden Sie eine Anleitung, wie Sie vorhandene Subnetze mit der gcloud CLI auflisten. Folgen Sie bei Bedarf der Anleitung unter VPC-Netzwerk im automatischen Modus erstellen, um ein VPC-Netzwerk mit Subnetz in jeder Google Cloud-Region zu erstellen. Für dieses Codelab verwenden wir jedoch nur ein Subnetz aus einer einzigen Region.
  • Achten Sie darauf, dass in diesem Unternetzwerk eine Firewallregel vorhanden ist, die den gesamten Eingang von tcp:22 zulässt, was für SSH erforderlich ist. Diese Regel kann beim Erstellen eines Netzwerks im Bereich „Firewallregeln“ ausgewählt werden.
  1. GCS-Bucket Sie benötigen Zugriff auf einen Google Cloud Storage-Bucket, um Dataproc-Jobressourcen zu speichern und verarbeitete Daten zu speichern. Wenn Sie noch keine haben, können Sie eine in Ihrem GCP-Projekt erstellen.

Umgebungsvariablen füllen

Geben Sie diese Umgebungsvariablen in Ihrem Terminal ein, in dem Sie die gcloud CLI ausführen, damit sie später verwendet werden können.

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

Ersetzen Sie Folgendes:

  • <project-id> durch den Namen des von Ihnen eingerichteten GCP-Projekts.
  • <project-number> durch den Namen der Projektnummer aus Schritt 1 der Voraussetzungen.
  • <region> durch den Namen einer Region aus der Liste der verfügbaren Regionen und Zonen, die Sie verwenden möchten. Wir können beispielsweise us-central1 verwenden.
  • <zone> durch den Namen der Zone aus Verfügbare Regionen und Zonen unter der zuvor ausgewählten Region. Wenn Sie beispielsweise us-central1 als Region ausgewählt haben, können Sie us-central1-f als Zone verwenden. In dieser Zone wird die GCE-VM erstellt, die IoT-Geräte simuliert. Achten Sie darauf, dass sich Ihre Zone in der von Ihnen ausgewählten Region befindet.
  • <subnet-path> durch den vollständigen Pfad des Subnetzes aus Schritt 2 der Voraussetzungen. Der Wert muss das Format projects/<project-id>/regions/<region>/subnetworks/<subnet-name> haben.
  • <bucket-name> durch den Namen des GCS-Buckets aus Schritt 3 der Voraussetzungen.

3. Von Google verwaltetes Kafka einrichten

In diesem Abschnitt wird ein von Google verwalteter Kafka-Cluster eingerichtet, auf dem der Kafka-Server bereitgestellt wird. Außerdem wird auf diesem Server ein Thema erstellt, in dem die IoT-Daten veröffentlicht und gelesen werden können, nachdem sie abonniert wurden. DemoIOT Solutions kann diesen Cluster so einrichten, dass alle seine Geräte Daten darin veröffentlichen.

Managed Kafka-Cluster erstellen

  • Erstellen Sie den verwalteten Kafka-Cluster. Hier lautet der Name des Clusters kafka-iot.
gcloud managed-kafka clusters create kafka-iot \
   
--project=$PROJECT_ID \
   
--location=$REGION \
   
--cpu=3 \
   
--memory=12GiB \
   
--subnets=$SUBNET_PATH \
   
--auto-rebalance

Sie sollten eine Antwort ähnlich der folgenden erhalten:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                        
Created cluster [kafka-iot].

Die Clustererstellung dauert 20 bis 30 Minuten. Warten Sie, bis dieser Vorgang abgeschlossen ist.

Thema erstellen

  • Erstellen Sie ein verwaltetes Kafka-Thema auf dem Cluster. Hier lautet der Name des Themas kafka-iot-topic.
gcloud managed-kafka topics create kafka-iot-topic \
   
--cluster=kafka-iot \
   
--location=$REGION \
   
--partitions=10 \
   
--replication-factor=3

Die Ausgabe sollte in etwa so aussehen:

Created topic [kafka-iot-topic].

4. Publisher einrichten

Um Daten im verwalteten Kafka-Cluster zu veröffentlichen, richten wir eine Google Compute Engine-VM-Instanz ein, die auf das VPC mit dem vom verwalteten Kafka-Cluster verwendeten Subnetz zugreifen kann. Diese VM simuliert die Sensorgeräte von DemoIOT Solutions.

Schritte

  1. Erstellen Sie die Google Compute Engine-VM-Instanz. Hier lautet der Name der GCE-VM publisher-instance.
gcloud compute instances create publisher-instance \
   
--scopes=https://www.googleapis.com/auth/cloud-platform \
   
--subnet=$SUBNET_PATH \
   
--zone=$ZONE
  1. Weisen Sie dem Standarddienstkonto der Google Compute Engine die Berechtigungen zur Verwendung des verwalteten Dienstes für Apache Kafka zu.
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. Stellen Sie über SSH eine Verbindung zur VM her. Alternativ können Sie SSH über die Google Cloud Console verwenden.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. Installieren Sie Java, um Kafka-Befehlszeilentools auszuführen, und laden Sie die Kafka-Binärdatei mit den folgenden Befehlen herunter.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. Laden Sie die Managed Kafka-Authentifizierungsbibliothek und die zugehörigen Abhängigkeiten herunter und konfigurieren Sie die Kafka-Clienteigenschaften.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

Weitere Informationen zur Einrichtung des Publishers-Rechners findest du unter Clientcomputer einrichten.

5. In Managed Kafka veröffentlichen

Nachdem der Publisher eingerichtet ist, können wir mit der Kafka-Befehlszeile einige Dummy-Daten von der GCE-VM (IoT-Geräte von DemoIOT Solutions simulieren) im verwalteten Kafka-Cluster veröffentlichen.

  1. Da wir eine SSH-Verbindung zur GCE-VM-Instanz hergestellt haben, müssen wir die Variable PROJECT_ID neu füllen:
export PROJECT_ID=<project-id>
export REGION=<region>

Ersetzen Sie Folgendes:

  • <project-id> durch den Namen des von Ihnen eingerichteten GCP-Projekts.
  • <region> mit der Region, in der der Kafka-Cluster erstellt wurde
  1. Verwenden Sie den Befehl managed-kafka clusters describe, um die IP-Adresse des Kafka-Bootstrap-Servers abzurufen. Über diese Adresse kann eine Verbindung zum Kafka-Cluster hergestellt werden.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Listen Sie die Themen im Cluster auf:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

Sie sollten die folgende Ausgabe sehen, die das zuvor erstellte Thema kafka-iot-topic enthält.

__remote_log_metadata
kafka
-iot-topic
  1. Kopieren Sie dieses Script und fügen Sie es in eine neue Datei publish_iot_data.sh ein. Sie können ein Tool wie vim oder nano verwenden, um eine neue Datei auf der GCE-VM zu erstellen.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data
.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data
() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data
() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status
() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka
() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

Erklärung

  • Dieses Script erstellt JSON-Nachrichten mit simulierten Sensorwerten, die Geräte-ID, Zeitstempel, Sensordaten (Temperatur, Luftfeuchtigkeit, Druck, Licht), Standortinformationen (Breiten- und Längengrad), Gerätestatus (Akku, Signal, Verbindungstyp) und einige Metadaten enthalten.
  • Es generiert einen kontinuierlichen Nachrichtenfluss von einer festgelegten Anzahl von Geräten, die jeweils in einem bestimmten Zeitintervall Daten senden, um reale IoT-Geräte nachzuahmen. Hier veröffentlichen wir Daten von 10 Geräten, die jeweils 20 Messwerte mit einem Zeitintervall von 10 Sekunden generieren.
  • Außerdem werden alle generierten Daten über das Kafka-Befehlszeilentool für Producer im Kafka-Thema veröffentlicht.
  1. Installieren Sie einige vom Script verwendete Abhängigkeiten: das bc-Paket für mathematische Berechnungen und das jq-Paket für die JSON-Verarbeitung.
sudo apt-get install bc jq
  1. Ändern Sie das Script in ein ausführbares Script und führen Sie es aus. Die Ausführung dauert etwa zwei Minuten.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

Sie können mit diesem Befehl prüfen, ob die Ereignisse veröffentlicht wurden. Dabei werden alle Ereignisse ausgegeben. Drücke zum Beenden auf <control-c>.

kafka-console-consumer.sh \
   
--topic kafka-iot-topic \
   
--from-beginning \
   
--bootstrap-server $BOOTSTRAP \
   
--consumer.config client.properties

6. Dataproc-Cluster einrichten

In diesem Abschnitt wird ein Dataproc-Cluster im VPC-Subnetz erstellt, in dem sich der verwaltete Kafka-Cluster befindet. Auf diesem Cluster werden Jobs ausgeführt, die die Echtzeitstatistiken und Statistiken generieren, die DemoIOT Solutions benötigt.

  1. Erstellen Sie einen Dataproc-Cluster. Hier lautet der Name des Clusters dataproc-iot.
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

Sie sollten eine Antwort ähnlich der folgenden erhalten:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

Die Erstellung eines Clusters kann 10 bis 15 Minuten dauern. Warten Sie, bis dieser Vorgang abgeschlossen ist, und prüfen Sie, ob der Cluster den Status RUNNING hat.

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. Kafka-Nachrichten mit Dataproc verarbeiten

In diesem letzten Abschnitt reichen Sie einen Dataproc-Job ein, der die veröffentlichten Nachrichten mit Spark Streaming verarbeitet. Dieser Job generiert tatsächlich einige Statistiken und Statistiken in Echtzeit, die von DemoIOT Solutions verwendet werden können.

  1. Führen Sie diesen Befehl aus, um die Streaming-PySpark-Jobdatei process_iot.py lokal zu erstellen.
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
   
StructField("device_id", StringType()),
   
StructField("timestamp", StringType()),
   
StructField(
       
"location",
       
StructType([
           
StructField("latitude", FloatType()),
           
StructField("longitude", FloatType()),
       
]),
   
),
   
StructField(
       
"sensor_data",
       
StructType([
           
StructField("temperature", FloatType()),
           
StructField("humidity", FloatType()),
           
StructField("pressure", FloatType()),
           
StructField("light_level", IntegerType()),
       
]),
   
),
   
StructField(
       
"device_status",
       
StructType([
           
StructField("battery_level", IntegerType()),
           
StructField("signal_strength", IntegerType()),
           
StructField("connection_type", StringType()),
       
]),
   
),
   
StructField(
       
"metadata",
       
StructType([
           
StructField("sensor_type", StringType()),
           
StructField("unit_temperature", StringType()),
           
StructField("unit_humidity", StringType()),
           
StructField("unit_pressure", StringType()),
           
StructField("unit_light_level", StringType()),
           
StructField("firmware_version", StringType()),
       
]),
   
),
])
CLIENT_PROPERTY_KEYS = [
   
"security.protocol",
   
"sasl.mechanism",
   
"sasl.login.callback.handler.class",
   
"sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
   
# Parse client properties file
   
parsed_path = urlparse(client_properties_path)
   
if parsed_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

   
bucket_name = parsed_path.netloc
   
blob_name = parsed_path.path.lstrip("/")

   
client = storage.Client()
   
bucket = client.bucket(bucket_name)
   
blob = bucket.blob(blob_name)
   
file_content = "[DEFAULT]\n" + blob.download_as_text()

   
config = configparser.ConfigParser()
   
config.read_string(file_content)

   
client_properties = dict()
   
for key in CLIENT_PROPERTY_KEYS:
       
client_properties[key] = config.get("DEFAULT", key)

   
print(f"Client properties: {client_properties}")
   
return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
   
print("Starting initial data processing...")
   
initial_rows = (
       
spark.readStream.format("kafka")
       
.option("kafka.bootstrap.servers", bootstrap_server_address)
       
.option("startingOffsets", "earliest")
       
.option("subscribe", "kafka-iot-topic")
       
.option("kafka.security.protocol", client_properties["security.protocol"])
       
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
       
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
       
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
       
.load()
   
)

   
initial_rows = (
       
initial_rows.selectExpr("CAST(value AS STRING)")
       
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
       
.select("data.*")
   
)
   
   
# Print first 20 rows
   
def print_top_rows(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
           
batch_df.limit(20).show(truncate=False)

   
initial_query_print = initial_rows.writeStream \
       
.foreachBatch(print_top_rows) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_print)

   
# Calculate and print average temperatures
   
def process_initial_avg_temp(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
           
current_averages = (
               
batch_df.select("device_id", "sensor_data.temperature")
               
.groupBy("device_id")
               
.agg(avg("temperature").alias("average_temperature"))
               
.orderBy("device_id")
           
)
           
current_averages.show(truncate=False)

   
initial_query_avg_temp = initial_rows.writeStream \
       
.foreachBatch(process_initial_avg_temp) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_avg_temp)
   
   
# Write data to GCS
   
initial_data_gcs_writer = (
       
initial_rows.writeStream.outputMode("append")
       
.format("avro")
       
.option("path", store_data_gcs_path+"/tables/iot_avro/")
       
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
       
.trigger(once=True) \
       
.start()
   
)
   
queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

   
client_properties = get_client_properties(client_properties_path)

   
# Create SparkSession
   
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
   
   
queries_to_await = []

   
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
   
   
# Wait for all queries to terminate
   
for query in queries_to_await:
       
try:
           
query.awaitTermination()
       
except Exception as e:
           
print(f"Error awaiting query: {e}")
       
finally:
           
query.stop()

   
spark.stop()

if __name__ == "__main__":
   
if len(sys.argv) < 4:
       
print("Invalid number of arguments passed ", len(sys.argv))
       
print(
           
"Usage: ",
           
sys.argv[0],
           
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
       
)
       
raise ValueError("Invalid number of arguments passed.")

   
parsed_data_path = urlparse(sys.argv[3])
   
if parsed_data_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


   
main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

Erklärung

  • Mit diesem Code wird ein PySpark Structured Streaming-Job eingerichtet, um Daten aus einem bestimmten Kafka-Thema zu lesen. Es verwendet die angegebene Bootstrap-Adresse des Kafka-Servers und die aus einer GCS-Konfigurationsdatei geladenen Kafka-Konfigurationen, um eine Verbindung zum Kafka-Broker herzustellen und sich zu authentifizieren.
  • Zuerst werden die Rohdaten aus Kafka als Stream von Byte-Arrays gelesen, diese Byte-Arrays in Strings umgewandelt und die json_schema mithilfe der StructType-Klasse von Spark angewendet, um die Struktur der Daten anzugeben (Geräte-ID, Zeitstempel, Standort, Sensordaten usw.).
  • Die ersten 10 Zeilen werden zur Vorschau in der Konsole ausgegeben, die durchschnittliche Temperatur pro Sensor berechnet und alle Daten im avro-Format in den GCS-Bucket geschrieben. Avro ist ein zeilenbasiertes Datenserialisierungssystem, das strukturierte Daten effizient in einem kompakten, schemadefinierten Binärformat speichert. Es bietet Schemaentwicklung, sprachenunabhängige Nutzung und eine hohe Komprimierung für die groß angelegte Datenverarbeitung.
  1. Erstellen Sie die Datei client.properties und füllen Sie die Umgebungsvariable für die Bootstrap-Adresse des Kafka-Servers aus.
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Laden Sie process_iot.py- und client.properties-Dateien in Ihren Google Cloud Storage-Bucket hoch, damit sie vom Dataproc-Job verwendet werden können.
gsutil cp process_iot.py client.properties $BUCKET
  1. Kopieren Sie einige Abhängigkeits-JAR-Dateien für den Dataproc-Job in Ihren GCS-Bucket. Dieses Verzeichnis enthält JAR-Dateien, die zum Ausführen von Spark Streaming-Jobs mit Kafka erforderlich sind, sowie die Managed Kafka-Authentifizierungsbibliothek und ihre Abhängigkeiten aus Clientcomputer einrichten.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. Senden Sie den Spark-Job an den Dataproc-Cluster.
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

Die Spark-Treiberlogs werden gedruckt. Außerdem sollten Sie diese Tabellen in der Console sehen und Daten in Ihrem GCS-Bucket finden.

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. Bereinigen

Folgen Sie der Anleitung, um nach Abschluss des Codelabs die Ressourcen zu bereinigen.

  1. Löschen Sie den verwalteten Kafka-Cluster, die GCE-VM des Publishers und den Dataproc-Cluster.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. Löschen Sie das VPC-Subnetz und das Netzwerk.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. Löschen Sie Ihren GCS-Bucket, wenn Sie die Daten nicht mehr verwenden möchten.
gcloud storage rm --recursive $BUCKET

9. Glückwunsch

Herzlichen Glückwunsch! Sie haben mit Manage Kafka und Dataproc eine IoT-Datenverarbeitungspipeline erstellt, mit der DemoIOT Solutions Echtzeitdaten zu den von ihren Geräten veröffentlichten Daten erhalten kann.

Sie haben einen verwalteten Kafka-Cluster erstellt, IoT-Ereignisse darin veröffentlicht und einen Dataproc-Job ausgeführt, bei dem diese Ereignisse mit Spark-Streaming in Echtzeit verarbeitet wurden. Sie kennen jetzt die wichtigsten Schritte zum Erstellen von Datenpipelines mit Managed Kafka und Dataproc.

Referenzdokumente