Traitement des données IoT en temps réel à l'aide de Dataproc et de Google Managed Service pour Apache Kafka
À propos de cet atelier de programmation
1. Introduction
Dernière mise à jour:10/06/2024
Contexte
Les appareils IoT (Internet des objets), allant des solutions de maison connectée aux capteurs industriels, génèrent d'énormes quantités de données en périphérie du réseau. Ces données sont inestimables pour divers cas d'utilisation, comme la surveillance des appareils, le suivi, les diagnostics, la surveillance, la personnalisation, l'optimisation de la flotte et bien plus encore. Google Managed Service pour Apache Kafka offre un moyen évolutif et durable d'ingérer et de stocker ce flux continu de données de manière compatible avec les logiciels Open Source, simple à utiliser et sécurisée, tandis que Google Cloud Dataproc permet de traiter ces grands ensembles de données pour l'analyse des données à l'aide de clusters Apache Spark et Hadoop.
Objectifs de l'atelier
Dans cet atelier de programmation, vous allez créer un pipeline de traitement des données IoT à l'aide de Google Managed Service pour Apache Kafka, Dataproc, Python et Apache Spark, qui effectue des analyses en temps réel. Votre pipeline va:
- Publier des données provenant d'appareils IoT dans un cluster Kafka géré à l'aide de VM GCE
- Transférer des données en flux continu du cluster Managed Kafka vers un cluster Dataproc
- Traiter des données à l'aide d'une tâche Dataproc Spark Streaming
Points abordés
- Créer des clusters Google Managed Kafka et Dataproc
- Exécuter des tâches de streaming à l'aide de Dataproc
Prérequis
- Un compte GCP actif avec un projet configuré Si vous n'en avez pas, vous pouvez vous inscrire à un essai sans frais.
- gcloud CLI installée et configurée. Vous pouvez suivre les instructions pour installer la CLI gcloud sur votre OS.
- API activées pour Kafka géré par Google et Dataproc dans votre projet GCP.
2. Présentation
Pour cet atelier de programmation, suivons l'histoire d'une entreprise fictive, DemoIOT Solutions. DemoIOT Solutions fournit des capteurs qui mesurent et transmettent des données sur la température, l'humidité, la pression, le niveau d'éclairage et la position. Il souhaite configurer des pipelines qui traitent ces données afin de présenter des statistiques en temps réel à ses clients. Grâce à ces pipelines, ils peuvent fournir une grande variété de services à leurs clients, comme la surveillance, des suggestions automatiques, des alertes et des insights sur les lieux où les clients ont installé leurs capteurs.
Pour ce faire, nous allons utiliser une VM GCE pour simuler l'appareil IoT. L'appareil publiera des données dans un sujet Kafka du cluster Kafka géré par Google, qui seront lues et traitées par un job de streaming Dataproc. La configuration des prérequis et les pages suivantes vous guideront pour effectuer toutes ces étapes.
Configuration préalable
- Recherchez le nom et le numéro de votre projet. Pour référence, consultez Trouver le nom, le numéro et l'ID du projet.
- Sous-réseau VPC Cela permettra la connectivité entre la VM GCE, le cluster Kafka et le cluster Dataproc. Suivez ces instructions pour lister les sous-réseaux existants à l'aide de la gcloud CLI. Si nécessaire, suivez la procédure de création d'un réseau VPC en mode automatique, qui crée un réseau VPC avec un sous-réseau dans chaque région Google Cloud. Toutefois, pour les besoins de cet atelier de programmation, nous n'utiliserons qu'un sous-réseau d'une seule région.
- Dans ce sous-réseau, assurez-vous qu'une règle de pare-feu autorise tout trafic entrant à partir de tcp:22, qui est requis pour le protocole SSH. Cette règle sera disponible dans la section "Règles de pare-feu" lorsque vous créerez un réseau. Veillez à la sélectionner.
- Bucket GCS Vous aurez besoin d'un accès à un bucket de stockage Google Cloud pour stocker les ressources de tâche Dataproc et conserver les données traitées. Si vous n'en avez pas, vous pouvez en créer un dans votre projet GCP.
Remplir les variables d'environnement
Dans le terminal où vous exécutez la CLI gcloud, renseignez ces variables d'environnement afin qu'elles puissent être utilisées ultérieurement.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
Remplacez les éléments suivants :
<project-id>
par le nom du projet GCP que vous avez configuré.<project-number>
par le nom du numéro de projet de l'étape 1 des conditions préalables.<region>
par le nom d'une région de la section Régions et zones disponibles que vous souhaitez utiliser. Par exemple, nous pouvons utiliserus-central1
.<zone>
par le nom de la zone dans Régions et zones disponibles sous la région que vous avez précédemment sélectionnée. Par exemple, si vous avez sélectionnéus-central1
comme région, vous pouvez utiliserus-central1-f
comme zone. Cette zone servira à créer la VM GCE qui simule les appareils IoT. Assurez-vous que votre zone se trouve dans la région que vous avez choisie.<subnet-path>
par le chemin d'accès complet du sous-réseau de l'étape 2 des conditions préalables. La valeur doit être au format suivant:projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
.<bucket-name>
par le nom du bucket GCS de l'étape 3 de la section "Conditions préalables".
3. Configurer Kafka géré par Google
Cette section configure un cluster Kafka géré par Google, qui déploie le serveur Kafka, et crée un sujet sur ce serveur où les données IoT peuvent être publiées et lues après s'y être abonnées. DemoIOT Solutions peut configurer ce cluster afin que tous ses appareils y publient des données.
Créer un cluster Kafka géré
- Créez le cluster Kafka géré. Ici, le nom du cluster est
kafka-iot
.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
Vous devriez obtenir un résultat semblable à celui-ci :
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
La création du cluster prend 20 à 30 minutes. Attendez la fin de cette opération.
Créer un sujet
- Créez un sujet Kafka géré sur le cluster. Ici, le nom du sujet est
kafka-iot-topic
.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
Un résultat semblable aux lignes suivantes doit s'afficher:
Created topic [kafka-iot-topic].
4. Configurer un éditeur
Pour publier des données dans le cluster Kafka géré, nous avons configuré une instance de VM Google Compute Engine pouvant accéder au VPC contenant le sous-réseau utilisé par le cluster Kafka géré. Cette VM simule les appareils de capteurs fournis par DemoIOT Solutions.
Étapes
- Créez l'instance de VM Google Compute Engine. Ici, le nom de la VM GCE est
publisher-instance
.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Accordez au compte de service par défaut Google Compute Engine les autorisations nécessaires pour utiliser Managed Service pour Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- Utilisez SSH pour vous connecter à la VM. Vous pouvez également utiliser Google Cloud Console pour SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Installez Java pour exécuter les outils de ligne de commande Kafka, puis téléchargez le binaire Kafka à l'aide de ces commandes.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- Téléchargez la bibliothèque d'authentification Managed Kafka et ses dépendances, puis configurez les propriétés du client Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
Pour en savoir plus sur la configuration de la machine de l'éditeur, consultez Configurer une machine cliente.
5. Publier dans Managed Kafka
Maintenant que l'éditeur est configuré, nous pouvons utiliser la ligne de commande Kafka pour publier des données fictives à partir de la VM GCE (simulant des appareils IoT par DemoIOT Solutions) vers le cluster Kafka géré.
- Étant donné que nous nous sommes connectés en SSH à l'instance de VM GCE, nous devons réinitialiser la variable
PROJECT_ID
:
export PROJECT_ID=<project-id>
export REGION=<region>
Remplacez les éléments suivants :
<project-id>
par le nom du projet GCP que vous avez configuré.<region>
avec la région dans laquelle le cluster Kafka a été créé
- Utilisez la commande
managed-kafka clusters describe
pour obtenir l'adresse IP du serveur de démarrage Kafka. Vous pouvez utiliser cette adresse pour vous connecter au cluster Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Répertoriez les sujets du cluster:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
Vous devriez voir le résultat suivant, contenant le sujet kafka-iot-topic
que nous avons créé précédemment.
__remote_log_metadata
kafka-iot-topic
- Copiez et collez ce script dans un nouveau fichier
publish_iot_data.sh
. Pour créer un fichier sur la VM GCE, vous pouvez utiliser un outil tel quevim
ounano
.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
Explication
- Ce script crée des messages JSON avec des mesures de capteurs simulées qui incluent l'ID de l'appareil, le code temporel, les données des capteurs (température, humidité, pression, luminosité), les informations de localisation (latitude, longitude), l'état de l'appareil (batterie, signal, type de connexion) et certaines métadonnées.
- Il génère un flux continu de messages à partir d'un nombre défini d'appareils uniques, chacun envoyant des données à un intervalle de temps spécifié, imitant les appareils IoT du monde réel. Ici, nous publions les données de 10 appareils qui produisent 20 lectures chacun, à un intervalle de 10 secondes.
- Il publie également toutes les données générées dans le sujet Kafka à l'aide de l'outil de ligne de commande du producteur Kafka.
- Installez certaines dépendances utilisées par le script : le package
bc
pour les calculs mathématiques et le packagejq
pour le traitement JSON.
sudo apt-get install bc jq
- Modifiez le script pour qu'il soit exécutable, puis exécutez-le. L'exécution devrait prendre environ deux minutes.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
Vous pouvez vérifier que les événements ont bien été publiés en exécutant cette commande, qui imprime tous les événements. Appuyez sur <control-c>
pour quitter.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Configurer le cluster Dataproc
Cette section crée un cluster Dataproc dans le sous-réseau VPC où se trouve le cluster Kafka géré. Ce cluster sera utilisé pour exécuter des tâches qui génèrent les statistiques et les insights en temps réel dont DemoIOT Solutions a besoin.
- Créez un cluster Dataproc. Ici, le nom du cluster est
dataproc-iot
.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
Vous devriez obtenir un résultat semblable à celui-ci :
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
La création du cluster peut prendre 10 à 15 minutes. Attendez la fin de cette opération et vérifiez que le cluster est à l'état RUNNING
en le décrivant.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Traiter des messages Kafka à l'aide de Dataproc
Dans cette dernière section, vous allez envoyer une tâche Dataproc qui traite les messages publiés à l'aide de Spark Streaming. Cette tâche génère en fait des statistiques et des insights en temps réel qui peuvent être utilisés par DemoIOT Solutions.
- Exécutez cette commande pour créer localement le fichier de tâche PySpark en streaming appelé
process_iot.py
.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
Explication
- Ce code configure une tâche de streaming structuré PySpark pour lire les données d'un sujet Kafka spécifié. Il utilise l'adresse d'amorçage du serveur Kafka fournie et les configurations Kafka chargées à partir d'un fichier de configuration GCS pour se connecter et s'authentifier auprès de l'agent Kafka.
- Il lit d'abord les données brutes de Kafka en tant que flux d'arrays d'octets, puis convertit ces arrays d'octets en chaînes, puis applique
json_schema
à l'aide de StructType de Spark pour spécifier la structure des données (ID de l'appareil, code temporel, position, données des capteurs, etc.). - Il affiche les 10 premières lignes dans la console pour un aperçu, calcule la température moyenne par capteur et écrit toutes les données dans le bucket GCS au format
avro
. Avro est un système de sérialisation de données basé sur les lignes qui stocke efficacement les données structurées dans un format binaire compact défini par un schéma. Il offre une évolution du schéma, une neutralité linguistique et une compression élevée pour le traitement de données à grande échelle.
- Créez le fichier
client.properties
et renseignez la variable d'environnement pour l'adresse d'amorçage du serveur Kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Importez les fichiers
process_iot.py
etclient.properties
dans votre bucket Google Cloud Storage afin qu'ils puissent être utilisés par la tâche Dataproc.
gsutil cp process_iot.py client.properties $BUCKET
- Copiez des fichiers JAR de dépendance pour la tâche Dataproc dans votre bucket GCS. Ce répertoire contient les fichiers JAR requis pour exécuter des tâches Spark Streaming avec Kafka, ainsi que la bibliothèque d'authentification Kafka gérée et ses dépendances, extraites de la section Configurer une machine cliente.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Envoyez la tâche Spark au cluster Dataproc.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
Les journaux du pilote Spark seront imprimés. Vous devriez également pouvoir voir ces tables enregistrées dans la console et les données stockées dans votre bucket GCS.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. Effectuer un nettoyage
Suivez la procédure de nettoyage des ressources une fois l'atelier de programmation terminé.
- Supprimez le cluster Kafka géré, la VM GCE de l'éditeur et le cluster Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- Supprimez votre sous-réseau et votre réseau VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- Supprimez votre bucket GCS si vous ne souhaitez plus utiliser les données.
gcloud storage rm --recursive $BUCKET
9. Félicitations
Félicitations ! Vous avez créé un pipeline de traitement des données IoT avec Manage Kafka et Dataproc, qui aide DemoIOT Solutions à obtenir des insights en temps réel sur les données publiées par ses appareils.
Vous avez créé un cluster Kafka géré, y avez publié des événements IoT et exécuté une tâche Dataproc qui utilisait le streaming Spark pour traiter ces événements en temps réel. Vous connaissez désormais les principales étapes requises pour créer des pipelines de données à l'aide de Kafka géré et de Dataproc.