Elaborazione dei dati IoT in tempo reale utilizzando Dataproc e Google Managed Service per Apache Kafka

1. Introduzione

2efacab8643a653b.png

Last Updated: 2024-06-10

Premessa

I dispositivi IoT (Internet of Things), dalle soluzioni per la smart home ai sensori industriali, generano grandi quantità di dati all'edge della rete. Questi dati sono preziosi per una serie di casi d'uso, come monitoraggio, tracciamento, diagnostica, sorveglianza, personalizzazione, ottimizzazione della flotta e molto altro. Google Managed Service per Apache Kafka offre un modo scalabile e duraturo per importare e archiviare questo flusso continuo di dati in modo compatibile con OSS, facile da usare e sicuro, mentre Google Cloud Dataproc consente l'elaborazione di questi grandi set di dati per l'analisi dei dati utilizzando i cluster Apache Spark e Hadoop.

Cosa creerai

In questo codelab, creerai una pipeline di elaborazione dei dati IoT utilizzando Google Managed Service per Apache Kafka, Dataproc, Python e Apache Spark che esegue analisi in tempo reale. La pipeline:

  • Pubblica i dati dai dispositivi IoT in un cluster Managed Kafka utilizzando le VM GCE
  • Trasmetti i dati dal cluster Kafka gestito a un cluster Dataproc
  • Elabora i dati utilizzando un job Dataproc Spark Streaming

Cosa imparerai a fare

  • Come creare cluster Google Managed Kafka e Dataproc
  • Come eseguire job di streaming utilizzando Dataproc

Che cosa ti serve

2. Panoramica

Per questo codelab, seguiamo la storia di un'azienda fittizia, DemoIOT Solutions. DemoIOT Solutions fornisce dispositivi di rilevamento che misurano e trasmettono dati relativi a temperatura, umidità, pressione, livello di luce e posizione. Vorrebbero configurare pipeline che elaborino questi dati per mostrare statistiche in tempo reale ai loro clienti. Utilizzando queste pipeline, possono fornire ai propri clienti un'ampia gamma di servizi, come monitoraggio, suggerimenti automatici, avvisi e approfondimenti sui luoghi in cui i clienti hanno installato i sensori.

A questo scopo, utilizzeremo una VM GCE per simulare il dispositivo IoT. Il dispositivo pubblica i dati in un argomento Kafka nel cluster Google Managed Kafka, che verranno letti ed elaborati da un job di streaming Dataproc. La configurazione dei prerequisiti e le pagine seguenti ti guideranno nell'esecuzione di tutti questi passaggi.

Configurazione dei prerequisiti

  1. Trova il nome e il numero del progetto. Per riferimento, consulta Trovare il nome, il numero e l'ID del progetto.
  2. Subnet VPC. Ciò consentirà la connettività tra la VM GCE, il cluster Kafka e il cluster Dataproc. Segui queste istruzioni per elencare le subnet esistenti utilizzando gcloud CLI. Se necessario, segui la procedura per creare una rete VPC in modalità automatica, che creerà una rete VPC con una subnet in ogni regione Google Cloud. Tuttavia, ai fini di questo codelab, utilizzeremo una subnet di una sola regione.
  • In questa subnet, assicurati che esista una regola firewall che consenta tutto il traffico in entrata da tcp:22, che è SSH richiesto. Questa regola sarà disponibile per la selezione nella sezione Regole firewall durante la creazione di una rete, quindi assicurati di selezionarla.
  1. Bucket GCS. Per archiviare le risorse dei job Dataproc e rendere persistenti i dati elaborati, devi avere accesso a un bucket Google Cloud Storage. Se non ne hai uno, puoi crearne uno nel tuo progetto GCP.

Compilare le variabili di ambiente

Nel terminale in cui esegui gcloud CLI, compila queste variabili di ambiente in modo che possano essere utilizzate in un secondo momento.

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

Sostituisci quanto segue:

  • <project-id> con il nome del progetto GCP che hai configurato.
  • <project-number> con il nome del numero di progetto del passaggio 1 dei prerequisiti.
  • <region> con il nome di una regione da Regioni e zone disponibili che vuoi utilizzare. Ad esempio, possiamo utilizzare us-central1.
  • <zone> con il nome della zona da Regioni e zone disponibili nella regione selezionata in precedenza. Ad esempio, se hai selezionato us-central1 come regione, puoi utilizzare us-central1-f come zona. Questa zona verrà utilizzata per creare la VM GCE che simula i dispositivi IoT. Assicurati che la tua zona si trovi nella regione che hai scelto.
  • <subnet-path> con il percorso completo della subnet del passaggio preliminare 2. Il valore deve essere nel formato projects/<project-id>/regions/<region>/subnetworks/<subnet-name>.
  • <bucket-name> con il nome del bucket GCS del passaggio 3 dei prerequisiti.

3. Configurare Google Managed Kafka

Questa sezione configura un cluster Google Managed Kafka, che esegue il deployment del server Kafka e crea un argomento su questo server in cui i dati IoT possono essere pubblicati e letti dopo la sottoscrizione. DemoIOT Solutions può configurare questo cluster in modo che tutti i suoi dispositivi pubblichino dati al suo interno.

Crea un cluster Managed Kafka

  • Crea il cluster Managed Kafka. In questo caso, il nome del cluster è kafka-iot.
gcloud managed-kafka clusters create kafka-iot \
    --project=$PROJECT_ID \
    --location=$REGION \
    --cpu=3 \
    --memory=12GiB \
    --subnets=$SUBNET_PATH \
    --auto-rebalance

Dovresti ricevere una risposta simile alla seguente:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                         
Created cluster [kafka-iot].

La creazione del cluster richiede 20-30 minuti. Attendi il completamento di questa operazione.

Creare un argomento

  • Crea un argomento Managed Kafka sul cluster. In questo caso, il nome dell'argomento è kafka-iot-topic.
gcloud managed-kafka topics create kafka-iot-topic \
    --cluster=kafka-iot \
    --location=$REGION \
    --partitions=10 \
    --replication-factor=3

Dovresti ottenere un output simile al seguente:

Created topic [kafka-iot-topic].

4. Configurare un publisher

Per pubblicare nel cluster Managed Kafka, configuriamo un'istanza VM di Google Compute Engine in grado di accedere al VPC contenente la subnet utilizzata dal cluster Managed Kafka. Questa VM simula i dispositivi sensore forniti da DemoIOT Solutions.

Passaggi

  1. Crea l'istanza VM di Google Compute Engine. In questo caso, il nome della VM GCE è publisher-instance.
gcloud compute instances create publisher-instance \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --subnet=$SUBNET_PATH \
    --zone=$ZONE
  1. Concedi all'account di servizio predefinito di Google Compute Engine le autorizzazioni per utilizzare Managed Service per Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. Utilizza SSH per connetterti alla VM. In alternativa, utilizza la console Google Cloud per SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. Installa Java per eseguire gli strumenti a riga di comando Kafka e scarica il binario Kafka utilizzando questi comandi.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. Scarica la libreria di autenticazione Managed Kafka e le relative dipendenze e configura le proprietà del client Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

Per maggiori dettagli sulla configurazione della macchina editore, consulta Configurare una macchina client.

5. Pubblicare su Kafka gestito

Ora che l'editore è configurato, possiamo utilizzare la riga di comando Kafka per pubblicare alcuni dati fittizi dalla VM GCE (che simula i dispositivi IoT di DemoIOT Solutions) nel cluster Managed Kafka.

  1. Poiché abbiamo eseguito l'accesso tramite SSH all'istanza VM GCE, dobbiamo riempire nuovamente la variabile PROJECT_ID:
export PROJECT_ID=<project-id>
export REGION=<region>

Sostituisci quanto segue:

  • <project-id> con il nome del progetto GCP che hai configurato.
  • <region> con la regione in cui è stato creato il cluster Kafka
  1. Utilizza il comando managed-kafka clusters describe per ottenere l'indirizzo IP del server bootstrap di Kafka. Questo indirizzo può essere utilizzato per connettersi al cluster Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Elenca gli argomenti nel cluster:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

Dovresti vedere l'output seguente, contenente l'argomento kafka-iot-topic creato in precedenza.

__remote_log_metadata
kafka-iot-topic
  1. Copia e incolla questo script in un nuovo file publish_iot_data.sh. Per creare un nuovo file sulla VM GCE, puoi utilizzare uno strumento come vim o nano.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

Spiegazione

  • Questo script crea messaggi JSON con letture simulate dei sensori che contengono ID dispositivo, timestamp, dati dei sensori (temperatura, umidità, pressione, luce), dati sulla posizione (latitudine, longitudine), stato del dispositivo (batteria, segnale, tipo di connessione) e alcuni metadati.
  • Genera un flusso continuo di messaggi da un numero prestabilito di dispositivi unici, ognuno dei quali invia dati a un intervallo di tempo specificato, simulando i dispositivi IoT reali. Qui pubblichiamo i dati di 10 dispositivi che producono 20 letture ciascuno, a un intervallo di tempo di 10 secondi.
  • Inoltre, pubblica tutti i dati generati nell'argomento Kafka utilizzando lo strumento a riga di comando del produttore Kafka.
  1. Installa alcune dipendenze utilizzate dallo script: il pacchetto bc per i calcoli matematici e il pacchetto jq per l'elaborazione JSON.
sudo apt-get install bc jq
  1. Modifica lo script in modo che sia eseguibile ed eseguilo. L'esecuzione dovrebbe richiedere circa 2 minuti.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

Puoi verificare che gli eventi siano stati pubblicati correttamente eseguendo questo comando, che stamperà tutti gli eventi. Premi <control-c> per uscire.

kafka-console-consumer.sh \
    --topic kafka-iot-topic \
    --from-beginning \
    --bootstrap-server $BOOTSTRAP \
    --consumer.config client.properties

6. Configura il cluster Dataproc

Questa sezione crea un cluster Dataproc nella subnet VPC in cui è presente il cluster Managed Kafka. Questo cluster verrà utilizzato per eseguire i job che generano le statistiche e gli approfondimenti in tempo reale necessari a DemoIOT Solutions.

  1. Crea un cluster Dataproc. In questo caso, il nome del cluster è dataproc-iot.
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

Dovresti ricevere una risposta simile alla seguente:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

La creazione del cluster potrebbe richiedere 10-15 minuti. Attendi il completamento dell'operazione e verifica che il cluster sia nello stato RUNNING descrivendolo.

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. Elaborare i messaggi Kafka utilizzando Dataproc

In quest'ultima sezione, invierai un job Dataproc che elabora i messaggi pubblicati utilizzando Spark Streaming. Questo job genera statistiche e approfondimenti in tempo reale che possono essere utilizzati da DemoIOT Solutions.

  1. Esegui questo comando per creare localmente il file del job PySpark di streaming denominato process_iot.py.
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
    StructField("device_id", StringType()),
    StructField("timestamp", StringType()),
    StructField(
        "location",
        StructType([
            StructField("latitude", FloatType()),
            StructField("longitude", FloatType()),
        ]),
    ),
    StructField(
        "sensor_data",
        StructType([
            StructField("temperature", FloatType()),
            StructField("humidity", FloatType()),
            StructField("pressure", FloatType()),
            StructField("light_level", IntegerType()),
        ]),
    ),
    StructField(
        "device_status",
        StructType([
            StructField("battery_level", IntegerType()),
            StructField("signal_strength", IntegerType()),
            StructField("connection_type", StringType()),
        ]),
    ),
    StructField(
        "metadata",
        StructType([
            StructField("sensor_type", StringType()),
            StructField("unit_temperature", StringType()),
            StructField("unit_humidity", StringType()),
            StructField("unit_pressure", StringType()),
            StructField("unit_light_level", StringType()),
            StructField("firmware_version", StringType()),
        ]),
    ),
])
CLIENT_PROPERTY_KEYS = [
    "security.protocol",
    "sasl.mechanism",
    "sasl.login.callback.handler.class",
    "sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
    # Parse client properties file
    parsed_path = urlparse(client_properties_path)
    if parsed_path.scheme != "gs":
        raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

    bucket_name = parsed_path.netloc
    blob_name = parsed_path.path.lstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    file_content = "[DEFAULT]\n" + blob.download_as_text()

    config = configparser.ConfigParser()
    config.read_string(file_content)

    client_properties = dict()
    for key in CLIENT_PROPERTY_KEYS:
        client_properties[key] = config.get("DEFAULT", key)

    print(f"Client properties: {client_properties}")
    return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
    print("Starting initial data processing...")
    initial_rows = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_server_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", "kafka-iot-topic")
        .option("kafka.security.protocol", client_properties["security.protocol"])
        .option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
        .option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
        .option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
        .load()
    )

    initial_rows = (
        initial_rows.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), JSON_SCHEMA).alias("data"))
        .select("data.*")
    )
    
    # Print first 20 rows
    def print_top_rows(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
            batch_df.limit(20).show(truncate=False)

    initial_query_print = initial_rows.writeStream \
        .foreachBatch(print_top_rows) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_print)

    # Calculate and print average temperatures
    def process_initial_avg_temp(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
            current_averages = (
                batch_df.select("device_id", "sensor_data.temperature")
                .groupBy("device_id")
                .agg(avg("temperature").alias("average_temperature"))
                .orderBy("device_id")
            )
            current_averages.show(truncate=False)

    initial_query_avg_temp = initial_rows.writeStream \
        .foreachBatch(process_initial_avg_temp) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_avg_temp)
    
    # Write data to GCS
    initial_data_gcs_writer = (
        initial_rows.writeStream.outputMode("append")
        .format("avro")
        .option("path", store_data_gcs_path+"/tables/iot_avro/")
        .option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
        .trigger(once=True) \
        .start()
    )
    queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

    client_properties = get_client_properties(client_properties_path)

    # Create SparkSession
    spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
    
    queries_to_await = []

    process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
    
    # Wait for all queries to terminate
    for query in queries_to_await:
        try:
            query.awaitTermination()
        except Exception as e:
            print(f"Error awaiting query: {e}")
        finally:
            query.stop()

    spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Invalid number of arguments passed ", len(sys.argv))
        print(
            "Usage: ",
            sys.argv[0],
            " <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
        )
        raise ValueError("Invalid number of arguments passed.")

    parsed_data_path = urlparse(sys.argv[3])
    if parsed_data_path.scheme != "gs":
        raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


    main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

Spiegazione

  • Questo codice configura un job di Structured Streaming PySpark per leggere i dati da un argomento Kafka specificato. Utilizza l'indirizzo di bootstrap del server Kafka fornito e le configurazioni Kafka caricate da un file di configurazione GCS per connettersi e autenticarsi con il broker Kafka.
  • Innanzitutto, legge i dati non elaborati da Kafka come flusso di array di byte, li converte in stringhe e applica json_schema utilizzando StructType di Spark per specificare la struttura dei dati (ID dispositivo, timestamp, posizione, dati dei sensori e così via).
  • Stampa le prime 10 righe nella console per l'anteprima, calcola la temperatura media per sensore e scrive tutti i dati nel bucket GCS in formato avro. Avro è un sistema di serializzazione dei dati basato su righe che archivia in modo efficiente i dati strutturati in un formato binario compatto definito dallo schema, offrendo evoluzione dello schema, neutralità del linguaggio e compressione elevata per l'elaborazione di dati su larga scala.
  1. Crea il file client.properties e compila la variabile di ambiente per l'indirizzo di bootstrap del server Kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Carica i file process_iot.py e client.properties nel bucket Cloud Storage, in modo che possano essere utilizzati dal job Dataproc.
gsutil cp process_iot.py client.properties $BUCKET
  1. Copia alcuni jar di dipendenza per il job Dataproc nel bucket GCS. Questa directory contiene i file JAR necessari per eseguire i job Spark Streaming con Kafka, nonché la libreria di autenticazione Managed Kafka e le relative dipendenze, estratte da Configurare una macchina client.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. Invia il job Spark al cluster Dataproc.
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

Verranno stampati i log del driver Spark. Dovresti anche essere in grado di visualizzare queste tabelle registrate nella console e i dati archiviati nel bucket GCS.

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. Esegui la pulizia

Segui i passaggi per liberare spazio dalle risorse dopo aver completato il codelab.

  1. Elimina il cluster Managed Kafka, la VM GCE Publisher e il cluster Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. Elimina la subnet e la rete VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. Elimina il bucket GCS se non vuoi più utilizzare i dati.
gcloud storage rm --recursive $BUCKET

9. Complimenti

Congratulazioni, hai creato correttamente una pipeline di elaborazione dei dati IoT con Manage Kafka e Dataproc che aiuta DemoIOT Solutions a ottenere approfondimenti in tempo reale sui dati pubblicati dai suoi dispositivi.

Hai creato un cluster Managed Kafka, pubblicato eventi IoT e eseguito un job Dataproc che utilizzava Spark Streaming per elaborare questi eventi in tempo reale. Ora conosci i passaggi chiave necessari per creare pipeline di dati utilizzando Managed Kafka e Dataproc.

Documentazione di riferimento