Elaborazione dei dati IoT in tempo reale utilizzando Dataproc e Google Managed Service per Apache Kafka
Informazioni su questo codelab
1. Introduzione
Ultimo aggiornamento: 10/06/2024
Premessa
I dispositivi IoT (Internet of Things), dalle soluzioni per la smart home ai sensori industriali, generano enormi quantità di dati all'edge della rete. Questi dati sono preziosi per una serie di casi d'uso, come monitoraggio dei dispositivi, tracciamento, diagnostica, sorveglianza, personalizzazione, ottimizzazione del parco veicoli e molto altro. Il servizio gestito Google per Apache Kafka offre un modo scalabile e duraturo per importare e archiviare questo flusso continuo di dati in modo sicuro, facile da usare e compatibile con OSS, mentre Google Cloud Dataproc consente di elaborare questi set di dati di grandi dimensioni per l'analisi dei dati utilizzando i cluster Apache Spark e Hadoop.
Cosa creerai
In questo codelab, creerai una pipeline di elaborazione dei dati IoT utilizzando Google Managed Service per Apache Kafka, Dataproc, Python e Apache Spark per eseguire analisi in tempo reale. La pipeline:
- Pubblicare i dati dei dispositivi IoT in un cluster Kafka gestito utilizzando le VM GCE
- Esegui il flusso di dati dal cluster Kafka gestito a un cluster Dataproc
- Elaborare i dati utilizzando un job Dataproc Spark Streaming
Cosa imparerai a fare
- Come creare cluster Kafka e Dataproc gestiti da Google
- Come eseguire job di streaming utilizzando Dataproc
Che cosa ti serve
- Un account Google Cloud attivo con un progetto configurato. Se non ne hai uno, puoi registrarti per una prova senza costi.
- Interfaccia a riga di comando gcloud installata e configurata. Puoi seguire le istruzioni per installare gcloud CLI sul tuo sistema operativo.
- API abilitate per Google Managed Kafka e Dataproc nel tuo progetto Google Cloud.
2. Panoramica
Per questo codelab, seguiamo la storia di una società fittizia, DemoIOT Solutions. DemoIOT Solutions fornisce dispositivi di rilevamento che misurano e trasmettono dati su temperatura, umidità, pressione, intensità luminosa e posizione. Vorrebbe configurare pipeline che elaborino questi dati per mostrare statistiche in tempo reale ai propri clienti. Utilizzando queste pipeline, possono offrire ai propri clienti una vasta gamma di servizi, come monitoraggio, suggerimenti automatici, avvisi e approfondimenti sui luoghi in cui i clienti hanno installato i loro sensori.
A questo scopo, utilizzeremo una VM GCE per simulare il dispositivo IoT. Il dispositivo pubblicherà i dati in un argomento Kafka nel cluster Kafka gestito da Google, che verranno letti ed elaborati da un job di streaming Dataproc. La configurazione dei prerequisiti e le pagine seguenti ti guideranno nell'esecuzione di tutti questi passaggi.
Configurazione dei prerequisiti
- Trova il nome e il numero del progetto. Per riferimento futuro, consulta Trovare il nome, il numero e l'ID del progetto.
- Subnet VPC. In questo modo, sarà possibile la connettività tra la VM GCE, il cluster Kafka e il cluster Dataproc. Segui questa procedura per elencare le sottoreti esistenti utilizzando gcloud CLI. Se necessario, segui la procedura per creare una rete VPC in modalità automatica, che consente di creare una rete VPC con una subnet in ogni regione Google Cloud. Tuttavia, ai fini di questo codelab, utilizzeremo una sottorete di una sola regione.
- In questa sottorete, assicurati che sia presente una regola firewall che consenta tutto il traffico in entrata da tcp:22, che è necessario per SSH. Questa regola sarà disponibile per la selezione nella sezione Regole firewall durante la creazione di una rete, quindi assicurati di selezionarla.
- Bucket GCS. Per archiviare le risorse dei job Dataproc e mantenere i dati elaborati, devi avere accesso a un bucket Google Cloud Storage. Se non ne hai uno, puoi crearlo nel tuo progetto Google Cloud.
Compilare le variabili di ambiente
Nel terminale in cui esegui gcloud CLI, compila queste variabili di ambiente in modo che possano essere utilizzate in un secondo momento.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
Sostituisci quanto segue:
<project-id>
con il nome del progetto Google Cloud che hai configurato.<project-number>
con il nome del numero di progetto del passaggio 1 del prerequisito.<region>
con il nome di una regione tra le regioni e le zone disponibili che vuoi utilizzare. Ad esempio, possiamo utilizzareus-central1
.<zone>
con il nome della zona in Regioni e zone disponibili nella regione selezionata in precedenza. Ad esempio, se hai selezionatous-central1
come regione, puoi utilizzareus-central1-f
come zona. Questa zona verrà utilizzata per creare la VM GCE che simula i dispositivi IoT. Assicurati che la zona si trovi nella regione che hai scelto.<subnet-path>
con il percorso completo della subnet del passaggio 2 del prerequisito. Il valore deve essere nel formato:projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
.<bucket-name>
con il nome del bucket GCS del passaggio 3 del prerequisito.
3. Configurare Kafka gestito da Google
Questa sezione configura un cluster Kafka gestito da Google, che esegue il deployment del server Kafka e crea un argomento su questo server in cui è possibile pubblicare e leggere i dati IoT dopo aver effettuato la sottoscrizione. DemoIOT Solutions può configurare questo cluster in modo che tutti i suoi dispositivi pubblichino dati al suo interno.
Creare un cluster Kafka gestito
- Crea il cluster Kafka gestito. In questo caso, il nome del cluster è
kafka-iot
.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
Dovresti ricevere una risposta simile alla seguente:
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
La creazione del cluster richiede 20-30 minuti. Attendi il completamento di questa operazione.
Creare un argomento
- Crea un argomento Kafka gestito sul cluster. Qui il nome dell'argomento è
kafka-iot-topic
.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
Dovresti visualizzare un output simile al seguente:
Created topic [kafka-iot-topic].
4. Configurare un publisher
Per pubblicare nel cluster Kafka gestito, abbiamo configurato un'istanza VM Google Compute Engine che può accedere alla VPC contenente la sottorete utilizzata dal cluster Kafka gestito. Questa VM simula i dispositivi di rilevamento forniti da DemoIOT Solutions.
Passaggi
- Crea l'istanza VM Google Compute Engine. Qui il nome della VM GCE è
publisher-instance
.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Concedi all'account di servizio predefinito di Google Compute Engine le autorizzazioni per utilizzare Managed Service per Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- Utilizza SSH per connetterti alla VM. In alternativa, utilizza la console Google Cloud per SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Installa Java per eseguire gli strumenti a riga di comando Kafka e scarica il file binario di Kafka utilizzando questi comandi.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- Scarica la libreria di autenticazione Managed Kafka e le relative dipendenze e configura le proprietà del client Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
Per maggiori dettagli sulla configurazione del computer del publisher, consulta Configurare un computer client.
5. Pubblicazione in Kafka gestito
Ora che il publisher è configurato, possiamo utilizzare la riga di comando Kafka per pubblicare alcuni dati simulati dalla VM GCE (che simula i dispositivi IoT di DemoIOT Solutions) al cluster Kafka gestito.
- Poiché abbiamo eseguito l'accesso tramite SSH all'istanza VM GCE, dobbiamo compilare di nuovo la variabile
PROJECT_ID
:
export PROJECT_ID=<project-id>
export REGION=<region>
Sostituisci quanto segue:
<project-id>
con il nome del progetto Google Cloud che hai configurato.<region>
con la regione in cui è stato creato il cluster Kafka
- Utilizza il comando
managed-kafka clusters describe
per ottenere l'indirizzo IP del server bootstrap Kafka. Questo indirizzo può essere utilizzato per connettersi al cluster Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Elenca gli argomenti nel cluster:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
Dovresti vedere il seguente output, contenente l'argomento kafka-iot-topic
che abbiamo creato in precedenza.
__remote_log_metadata
kafka-iot-topic
- Copia e incolla questo script in un nuovo file
publish_iot_data.sh
. Per creare un nuovo file sulla VM GCE, puoi utilizzare uno strumento comevim
onano
.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
Spiegazione
- Questo script crea messaggi JSON con letture dei sensori simulate che contengono ID dispositivo, timestamp, dati del sensore (temperatura, umidità, pressione, luce), informazioni sulla posizione (latitudine, longitudine), stato del dispositivo (batteria, segnale, tipo di connessione) e alcuni metadati.
- Genera un flusso continuo di messaggi da un numero prestabilito di dispositivi unici, ognuno dei quali invia dati a un intervallo di tempo specificato, simulando i dispositivi IoT reali. Qui pubblichiamo i dati di 10 dispositivi che producono 20 letture ciascuno, con un intervallo di tempo di 10 secondi.
- Inoltre, pubblica tutti i dati generati nell'argomento Kafka utilizzando lo strumento a riga di comando del produttore Kafka.
- Installa alcune dipendenze utilizzate dallo script: il pacchetto
bc
per i calcoli matematici e il pacchettojq
per l'elaborazione JSON.
sudo apt-get install bc jq
- Modifica lo script in modo che sia un file eseguibile ed eseguilo. L'esecuzione dovrebbe richiedere circa 2 minuti.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
Puoi verificare che gli eventi siano stati pubblicati correttamente eseguendo questo comando, che stampa tutti gli eventi. Premi <control-c>
per uscire.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Configura il cluster Dataproc
Questa sezione crea un cluster Dataproc nella sottorete VPC in cui è presente il cluster Kafka gestito. Questo cluster verrà utilizzato per eseguire job che generano le statistiche e gli approfondimenti in tempo reale necessari per DemoIOT Solutions.
- Crea un cluster Dataproc. Qui il nome del cluster è
dataproc-iot
.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
Dovresti ricevere una risposta simile alla seguente:
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
La creazione del cluster potrebbe richiedere 10-15 minuti. Attendi il completamento dell'operazione e verifica che il cluster sia nello stato RUNNING
descrivendolo.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Elaborare i messaggi Kafka utilizzando Dataproc
In questa ultima sezione, invierai un job Dataproc che elabora i messaggi pubblicati utilizzando Spark Streaming. Questo job genera statistiche e approfondimenti in tempo reale che possono essere utilizzati da DemoIOT Solutions.
- Esegui questo comando per creare localmente il file del job PySpark in streaming denominato
process_iot.py
.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
Spiegazione
- Questo codice configura un job di streaming strutturato PySpark per leggere i dati da un argomento Kafka specificato. Utilizza l'indirizzo di bootstrap del server Kafka e le configurazioni Kafka fornite caricate da un file di configurazione GCS per connettersi e autenticarsi con il broker Kafka.
- Innanzitutto, legge i dati non elaborati da Kafka come stream di array di byte, li trasmette in stringhe e applica
json_schema
utilizzando StructType di Spark per specificare la struttura dei dati (ID dispositivo, timestamp, posizione, dati dei sensori e così via). - Stampa le prime 10 righe nella console per l'anteprima, calcola la temperatura media per sensore e scrive tutti i dati nel bucket GCS in formato
avro
. Avro è un sistema di serializzazione dei dati basato su righe che archivia in modo efficiente i dati strutturati in un formato binario compatto definito dallo schema, offrendo l'evoluzione dello schema, la neutralità del linguaggio e un'alta compressione per l'elaborazione di dati su larga scala.
- Crea il file
client.properties
e compila la variabile di ambiente per l'indirizzo di bootstrap del server Kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Carica i file
process_iot.py
eclient.properties
nel bucket Google Cloud Storage in modo che possano essere utilizzati dal job Dataproc.
gsutil cp process_iot.py client.properties $BUCKET
- Copia alcuni file JAR delle dipendenze per il job Dataproc nel bucket GCS. Questa directory contiene i file JAR necessari per eseguire i job Spark Streaming con Kafka, la libreria di autenticazione di Kafka gestita e le relative dipendenze, tratte da Configurare una macchina client.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Invia il job Spark al cluster Dataproc.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
I log del driver di Spark verranno stampati. Dovresti anche riuscire a vedere queste tabelle registrate nella console e i dati archiviati nel bucket GCS.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. Esegui la pulizia
Segui i passaggi per ripulire le risorse dopo aver completato il codelab.
- Elimina il cluster Kafka gestito, la VM GCE del publisher e il cluster Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- Elimina la subnet e la rete VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- Elimina il bucket GCS se non vuoi più utilizzare i dati.
gcloud storage rm --recursive $BUCKET
9. Complimenti
Congratulazioni, hai creato una pipeline di elaborazione dei dati IoT con Manage Kafka e Dataproc che aiuta DemoIOT Solutions a ottenere approfondimenti in tempo reale sui dati pubblicati dai suoi dispositivi.
Hai creato un cluster Kafka gestito, hai pubblicato eventi IoT al suo interno ed eseguito un job Dataproc che utilizzava Spark Streaming per elaborare questi eventi in tempo reale. Ora conosci i passaggi chiave necessari per creare pipeline di dati utilizzando Managed Kafka e Dataproc.