Procesamiento de datos de la IoT en tiempo real con Dataproc y Google Cloud Managed Service para Apache Kafka
Acerca de este codelab
1. Introducción
Última actualización: 10/06/2024
Información general
Los dispositivos de la Internet de las cosas (IoT), que van desde soluciones de casa inteligente hasta sensores industriales, generan grandes cantidades de datos en el perímetro de la red. Estos datos son invaluables para una variedad de casos de uso, como la supervisión de dispositivos, el seguimiento, el diagnóstico, la vigilancia, la personalización, la optimización de flotas y mucho más. El servicio administrado de Google para Apache Kafka ofrece una forma escalable y duradera de transferir y almacenar este flujo continuo de datos de una manera compatible con OSS, fácil de usar y segura, mientras que Google Cloud Dataproc permite el procesamiento de estos grandes conjuntos de datos para el análisis de datos con clústeres de Apache Spark y Hadoop.
Qué compilarás
En este codelab, crearás una canalización de procesamiento de datos de la IoT con Google Managed Service para Apache Kafka, Dataproc, Python y Apache Spark que realice análisis en tiempo real. Tu canalización hará lo siguiente:
- Publica datos de dispositivos de la IoT en un clúster de Kafka administrado con VMs de GCE
- Transmite datos del clúster de Manage Kafka a un clúster de Dataproc
- Procesa datos con un trabajo de transmisión de Spark de Dataproc
Qué aprenderás
- Cómo crear clústeres de Kafka y Dataproc administrados por Google
- Cómo ejecutar trabajos de transmisión con Dataproc
Requisitos
- Una cuenta de GCP activa con un proyecto configurado Si no tienes una, puedes registrarte para obtener una prueba gratuita.
- gcloud CLI instalada y configurada Puedes seguir las instrucciones para instalar gcloud CLI en tu SO.
- Habilitaste las APIs de Kafka administrado por Google y Dataproc en tu proyecto de GCP.
2. Descripción general
En este codelab, seguiremos la historia de una empresa ficticia, DemoIOT Solutions. DemoIOT Solutions proporciona dispositivos de sensores que miden y transmiten datos de temperatura, humedad, presión, nivel de luz y ubicación. Le gustaría configurar canalizaciones que procesen estos datos para mostrar estadísticas en tiempo real a sus clientes. Con estas canalizaciones, pueden proporcionar una amplia variedad de servicios a sus clientes, como supervisión, sugerencias automatizadas, alertas y estadísticas sobre los lugares en los que los clientes instalaron sus sensores.
Para ello, usaremos una VM de GCE para simular el dispositivo de la IoT. El dispositivo publicará datos en un tema de Kafka en el clúster de Kafka administrado por Google, que un trabajo de transmisión de Dataproc leerá y procesará. La configuración de requisitos previos y las siguientes páginas te guiarán para realizar todos estos pasos.
Configuración de los requisitos previos
- Busca el nombre y el número de tu proyecto. Consulta Cómo encontrar el nombre, el número y el ID del proyecto como referencia.
- Subred de VPC Esto permitirá la conectividad entre la VM de GCE, el clúster de Kafka y el clúster de Dataproc. Sigue este vínculo para enumerar las subredes existentes con gcloud CLI. Si es necesario, sigue las instrucciones para crear una red de VPC en modo automático, que creará una red de VPC con subred en cada región de Google Cloud. Sin embargo, para los fines de este codelab, usaremos una subred de una sola región.
- En esta subred, asegúrate de que haya una regla de firewall que permita toda la entrada desde tcp:22, que es el SSH obligatorio. Esta regla estará disponible para seleccionarla en la sección Reglas de firewall cuando crees una red, así que asegúrate de seleccionarla.
- Bucket de GCS Necesitarás acceso a un bucket de almacenamiento de Google Cloud para almacenar los recursos de trabajo de Dataproc y conservar los datos procesados. Si no tienes uno, puedes crear uno en tu proyecto de GCP.
Propaga las variables de entorno
En la terminal en la que ejecutas la CLI de gcloud, propaga estas variables de entorno para que se puedan usar más adelante.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
Reemplaza lo siguiente:
<project-id>
por el nombre del proyecto de GCP que configuraste.<project-number>
por el nombre del número de proyecto del paso 1 de los requisitos previos.<region>
por el nombre de una región de las regiones y zonas disponibles que deseas usar. Por ejemplo, podemos usarus-central1
.<zone>
con el nombre de la zona de Regiones y zonas disponibles en la región que seleccionaste anteriormente. Por ejemplo, si seleccionasteus-central1
como región, puedes usarus-central1-f
como zona. Esta zona se usará para crear la VM de GCE que simulará dispositivos de la IoT. Asegúrate de que la zona esté en la región que elegiste.<subnet-path>
con la ruta de acceso completa de la subred del paso 2 de los requisitos previos. El valor de este debe tener el formatoprojects/<project-id>/regions/<region>/subnetworks/<subnet-name>
.<bucket-name>
por el nombre del bucket de GCS del paso 3 de los requisitos previos.
3. Configura Kafka administrado por Google
En esta sección, se configura un clúster de Kafka administrado por Google, que implementa el servidor de Kafka y crea un tema en este servidor en el que se pueden publicar y leer los datos de la IoT después de suscribirse a él. DemoIOT Solutions puede configurar este clúster para que todos sus dispositivos publiquen datos en él.
Crea un clúster de Kafka administrado
- Crea el clúster de Kafka administrado. Aquí, el nombre del clúster es
kafka-iot
.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
Deberías recibir una respuesta similar a la que figura a continuación:
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
La creación del clúster tarda entre 20 y 30 minutos. Espera a que se complete esta operación.
Crea un tema
- Crea un tema de Kafka administrado en el clúster. Aquí, el nombre del tema es
kafka-iot-topic
.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
Deberías obtener un resultado similar al siguiente:
Created topic [kafka-iot-topic].
4. Cómo configurar un publicador
Para publicar en el clúster de Kafka administrado, configuramos una instancia de VM de Google Compute Engine que pueda acceder a la VPC que contiene la subred que usa el clúster de Kafka administrado. Esta VM simula los dispositivos de sensor que proporciona DemoIOT Solutions.
Pasos
- Crea la instancia de VM de Google Compute Engine. Aquí, el nombre de la VM de GCE es
publisher-instance
.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Otorga a la cuenta de servicio predeterminada de Google Compute Engine los permisos para usar el servicio administrado de Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- Usa SSH para conectarte a la VM. Como alternativa, usa la consola de Google Cloud para SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Instala Java para ejecutar las herramientas de línea de comandos de Kafka y descarga el archivo binario de Kafka con estos comandos.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- Descarga la biblioteca de autenticación de Kafka administrado y sus dependencias, y configura las propiedades del cliente de Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
Para obtener más detalles sobre la configuración de la máquina del publicador, consulta Cómo configurar una máquina cliente.
5. Publica en Kafka administrado
Ahora que el publicador está configurado, podemos usar la línea de comandos de Kafka en él para publicar algunos datos simulados de la VM de GCE (que simula dispositivos de la IoT con DemoIOT Solutions) en el clúster de Kafka administrado.
- Como establecimos una conexión SSH a la instancia de VM de GCE, debemos volver a propagar la variable
PROJECT_ID
:
export PROJECT_ID=<project-id>
export REGION=<region>
Reemplaza lo siguiente:
<project-id>
por el nombre del proyecto de GCP que configuraste.<region>
con la región en la que se creó el clúster de Kafka
- Usa el comando
managed-kafka clusters describe
para obtener la dirección IP del servidor de inicio de Kafka. Esta dirección se puede usar para conectarse al clúster de Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Enumera los temas del clúster:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
Deberías ver el siguiente resultado, que contiene el tema kafka-iot-topic
que creamos antes.
__remote_log_metadata
kafka-iot-topic
- Copia y pega esta secuencia de comandos en un archivo
publish_iot_data.sh
nuevo. Para crear un archivo nuevo en la VM de GCE, puedes usar una herramienta comovim
onano
.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
Explicación
- Esta secuencia de comandos crea mensajes JSON con lecturas de sensores simuladas que tienen el ID del dispositivo, la marca de tiempo, los datos del sensor (temperatura, humedad, presión, luz), la información de ubicación (latitud, longitud), el estado del dispositivo (batería, señal, tipo de conexión) y algunos metadatos.
- Genera un flujo continuo de mensajes desde una cantidad determinada de dispositivos únicos, cada uno de los cuales envía datos en un intervalo de tiempo especificado, imitando los dispositivos de la IoT del mundo real. Aquí, publicamos datos de 10 dispositivos que producen 20 lecturas cada uno, en un intervalo de tiempo de 10 segundos.
- También publica todos los datos generados en el tema de Kafka con la herramienta de línea de comandos del productor de Kafka.
- Instala algunas dependencias que usa la secuencia de comandos: el paquete
bc
para cálculos matemáticos y el paquetejq
para el procesamiento de JSON.
sudo apt-get install bc jq
- Modifica la secuencia de comandos para que sea ejecutable y ejecútala. Debería tardar unos 2 minutos en ejecutarse.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
Para verificar que los eventos se hayan publicado correctamente, ejecuta este comando, que imprimirá todos los eventos. Presiona <control-c>
para salir.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Configura el clúster de Dataproc
En esta sección, se crea un clúster de Dataproc en la subred de VPC en la que se encuentra el clúster de Kafka administrado. Este clúster se usará para ejecutar tareas que generen las estadísticas y estadísticas en tiempo real que necesita DemoIOT Solutions.
- Crea un clúster de Dataproc. Aquí, el nombre del clúster es
dataproc-iot
.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
Deberías recibir una respuesta similar a la que figura a continuación:
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
La creación del clúster puede tardar entre 10 y 15 minutos. Espera a que se complete correctamente esta operación y describe el clúster para verificar que esté en estado RUNNING
.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Procesa mensajes de Kafka con Dataproc
En esta última sección, enviarás un trabajo de Dataproc que procese los mensajes publicados con Spark Streaming. Esta tarea genera estadísticas y estadísticas en tiempo real que pueden usar DemoIOT Solutions.
- Ejecuta este comando para crear el archivo de trabajo de PySpark de transmisión llamado
process_iot.py
de forma local.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
Explicación
- Este código configura un trabajo de transmisión de datos estructurados de PySpark para leer datos de un tema de Kafka especificado. Usa la dirección de arranque del servidor de Kafka proporcionada y las configuraciones de Kafka cargadas desde un archivo de configuración de GCS para conectarse y autenticarse con el agente de Kafka.
- Primero, lee los datos sin procesar de Kafka como un flujo de arrays de bytes, los convierte en cadenas y aplica
json_schema
con StructType de Spark para especificar la estructura de los datos (ID del dispositivo, marca de tiempo, ubicación, datos del sensor, etcétera). - Imprime las primeras 10 filas en la consola para obtener una vista previa, calcula la temperatura promedio por sensor y escribe todos los datos en el bucket de GCS en formato
avro
. Avro es un sistema de serialización de datos basado en filas que almacena datos estructurados de forma eficiente en un formato binario compacto definido por el esquema, lo que ofrece evolución del esquema, neutralidad de lenguaje y alta compresión para el procesamiento de datos a gran escala.
- Crea el archivo
client.properties
y propaga la variable de entorno para la dirección de arranque del servidor de Kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Sube los archivos
process_iot.py
yclient.properties
a tu bucket de Google Cloud Storage para que el trabajo de Dataproc pueda usarlos.
gsutil cp process_iot.py client.properties $BUCKET
- Copia algunos archivos JAR de dependencia para el trabajo de Dataproc en tu bucket de GCS. Este directorio contiene los archivos JAR necesarios para ejecutar trabajos de Spark Streaming con Kafka, y la biblioteca de autenticación de Kafka administrada y sus dependencias, que se toman de Configura una máquina cliente.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Envía el trabajo de Spark al clúster de Dataproc.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
Se imprimirán los registros del controlador de Spark. También deberías poder ver estas tablas registradas en la consola y los datos almacenados en tu bucket de GCS.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. Limpia
Sigue los pasos para limpiar los recursos después de completar el codelab.
- Borra el clúster de Kafka administrado, la VM de GCE del publicador y el clúster de Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- Borra la subred y la red de tu VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- Si ya no quieres usar los datos, borra tu bucket de GCS.
gcloud storage rm --recursive $BUCKET
9. Felicitaciones
¡Felicidades! Compilaste correctamente una canalización de procesamiento de datos de IoT con Manage Kafka y Dataproc que ayuda a DemoIOT Solutions a obtener estadísticas en tiempo real sobre los datos que publican sus dispositivos.
Creaste un clúster de Kafka administrado, publicaste eventos de IoT en él y ejecutaste un trabajo de Dataproc que usaba la transmisión de Spark para procesar estos eventos en tiempo real. Ahora conoces los pasos clave necesarios para crear canalizaciones de datos con Kafka administrado y Dataproc.