Real-Time IoT Data Processing using Dataproc and Google Managed Service for Apache Kafka

1. Introduction

2efacab8643a653b.png

Last Updated: 2024-06-10

Background

Internet of Things (IoT) devices, ranging from smart home solutions to industrial sensors, generate vast amounts of data at the edge of the network. This data is invaluable for a variety of use-cases, like device monitoring, tracking, diagnostics, surveillance, personalization, fleet optimization and much more. Google Managed Service for Apache Kafka offers a scalable and durable way to ingest and store this continuous stream of data in an OSS compatible, easy to use and secure way, while Google Cloud Dataproc allows processing of these large datasets for data analytics using Apache Spark and Hadoop clusters.

What you'll build

In this codelab, you are going to build an IoT Data Processing pipeline using Google Managed Service for Apache Kafka, Dataproc, Python and Apache Spark that does real-time analytics. Your pipeline will:

  • Publish data from IoT devices to a Managed Kafka cluster using GCE VMs
  • Stream data from the Manage Kafka cluster to a Dataproc cluster
  • Process data using a Dataproc Spark Streaming job

What you'll learn

  • How to create Google Managed Kafka and Dataproc clusters
  • How to run streaming jobs using Dataproc

What you'll need

2. Overview

For this codelab, let's follow the story of a dummy company, DemoIOT Solutions. DemoIOT Solutions provides sensor devices that measure and transmit data for temperature, humidity, pressure, light level and location. They would like to set up pipelines that process this data to show real time statistics to their customers. Using such pipelines, they can provide a wide variety of services to their customers, like monitoring, automated suggestions, alerts and insights about places where the customers have installed their sensors.

To do this, we will use a GCE VM to simulate the IoT device. The device will publish data to a Kafka topic in the Google Managed Kafka cluster, which will be read and processed by a Dataproc streaming job. The Prerequisite setup and the following pages will lead you to perform all these steps.

Prerequisite setup

  1. Find the project name and project number for your project. See Find the project name, number, and ID for reference.
  2. VPC subnetwork. This will allow connectivity between the GCE VM, Kafka cluster and Dataproc cluster. Follow this to list existing subnets using gcloud CLI. If required, follow create an auto mode VPC network which will create a VPC network with subnetwork in each Google Cloud region. Though, for the purpose of this codelab, we will use a subnetwork from a single region only.
  • In this subnetwork, ensure that there is a Firewall rule allowing all ingress from tcp:22, which is required SSH. This rule will be available to select under the Firewall rules section when creating a network, so ensure that you select it.
  1. GCS bucket. You will need access to a Google Cloud storage bucket to store Dataproc job resources and persist processed data. If you don't have one, you can create one in your GCP project.

Populate environment variables

In your terminal where you run the gcloud CLI, populate these environment variables so that they can be used later.

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

Replace the following:

  • <project-id> with the name of the GCP project you set up.
  • <project-number> with the name of the project number from Prerequisite step 1.
  • <region> with the name of a region from Available regions and zones that you want to use. For instance, we can use us-central1.
  • <zone> with the name of the zone from Available regions and zones under the region you previously selected. For instance, if you selected us-central1 as the region, you can use us-central1-f as the zone. This zone will be used to create the GCE VM that simulates IoT devices. Ensure that your zone is in the region that you have chosen.
  • <subnet-path> with the full path of the subnet from Prerequisite step 2. The value of this must be in the format: projects/<project-id>/regions/<region>/subnetworks/<subnet-name>.
  • <bucket-name> with the name of GCS bucket from Prerequisite step 3.

3. Set up Google Managed Kafka

This section sets up a Google Managed Kafka cluster, which deploys the Kafka server, and creates a topic on this server where the IoT data can be published and read from after subscribing to it. DemoIOT Solutions can set up this cluster so that all their devices publish data to it.

Create a Managed Kafka cluster

  • Create the Managed Kafka cluster. Here, the name of the cluster is kafka-iot.
gcloud managed-kafka clusters create kafka-iot \
    --project=$PROJECT_ID \
    --location=$REGION \
    --cpu=3 \
    --memory=12GiB \
    --subnets=$SUBNET_PATH \
    --auto-rebalance

You should get a response similar to the following:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                         
Created cluster [kafka-iot].

Cluster creation takes 20-30 minutes. Wait for the completion of this operation.

Create a Topic

  • Create a Managed Kafka topic on the cluster. Here, the name of the topic is kafka-iot-topic.
gcloud managed-kafka topics create kafka-iot-topic \
    --cluster=kafka-iot \
    --location=$REGION \
    --partitions=10 \
    --replication-factor=3

You should get an output similar to the following:

Created topic [kafka-iot-topic].

4. Set up a Publisher

To publish to the Managed Kafka cluster, we set up a Google Compute Engine VM instance that can access the VPC containing the subnet used by the Managed Kafka cluster. This VM simulates the sensor devices provided by DemoIOT Solutions.

Steps

  1. Create the Google Compute Engine VM instance. Here, the name of the GCE VM is publisher-instance.
gcloud compute instances create publisher-instance \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --subnet=$SUBNET_PATH \
    --zone=$ZONE
  1. Give the Google Compute Engine default service account the permissions to use Managed Service for Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. Use SSH to connect to the VM. Alternatively, use the Google Cloud Console to SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. Install Java to run Kafka command line tools, and download Kafka binary using these commands.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. Download Managed Kafka authentication library and its dependencies and configure Kafka client properties.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

For more details about the publisher machine setup, refer to Set up a client machine.

5. Publish to Managed Kafka

Now that the publisher is set up, we can use the Kafka command line on it to publish some dummy data from the GCE VM (simulating IoT devices by DemoIOT Solutions) to the Managed Kafka cluster.

  1. Since we have SSHed into the GCE VM instance, we need to re-populate the PROJECT_ID variable:
export PROJECT_ID=<project-id>
export REGION=<region>

Replace the following:

  • <project-id> with the name of the GCP project you set up.
  • <region> with the region in which the Kafka cluster was created
  1. Use the managed-kafka clusters describe command to get the IP address of the Kafka bootstrap server. This address can be used to connect to the Kafka cluster.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. List the topics in the cluster:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

You should be able to see the following output, containing the topic kafka-iot-topic we created earlier.

__remote_log_metadata
kafka-iot-topic
  1. Copy and paste this script into a new file publish_iot_data.sh. To create a new file on the GCE VM, you can use a tool like vim or nano.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

Explanation

  • This script creates JSON messages with simulated sensor readings that have device ID, timestamp, sensor data (temperature, humidity, pressure, light), location information (latitude, longitude), device status (battery, signal, connection type) and some metadata.
  • It generates a continuous flow of messages from a set number of unique devices, each sending data at a specified time interval, mimicking real-world IoT devices. Here, we publish data from 10 devices that produce 20 readings each, at a 10 second time interval.
  • It also publishes all generated data to the Kafka topic using the Kafka producer command line tool.
  1. Install some dependencies used by the script - bc package for mathematical calculations and jq package for JSON processing.
sudo apt-get install bc jq
  1. Modify the script to be an executable and run the script. It should take about 2 minutes to run.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

You can check that the events got published successfully by running this command which will print all events. Press <control-c> to exit.

kafka-console-consumer.sh \
    --topic kafka-iot-topic \
    --from-beginning \
    --bootstrap-server $BOOTSTRAP \
    --consumer.config client.properties

6. Set up the Dataproc Cluster

This section creates a Dataproc cluster in the VPC subnetwork where the Managed Kafka cluster is present. This cluster will be used to run jobs that generate the real time statistics and insights needed by DemoIOT Solutions.

  1. Create a Dataproc cluster. Here the name of the cluster is dataproc-iot.
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

You should get a response similar to the following:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

Cluster creation may take 10-15 minutes. Wait for the successful completion of this operation and check that the cluster is in RUNNING state by describing the cluster.

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. Process Kafka messages using Dataproc

In this last section, you will submit a Dataproc job that processes the published messages using Spark Streaming. This job actually generates some real time statistics and insights that can be used by DemoIOT Solutions.

  1. Run this command to create the streaming PySpark job file called process_iot.py locally.
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
    StructField("device_id", StringType()),
    StructField("timestamp", StringType()),
    StructField(
        "location",
        StructType([
            StructField("latitude", FloatType()),
            StructField("longitude", FloatType()),
        ]),
    ),
    StructField(
        "sensor_data",
        StructType([
            StructField("temperature", FloatType()),
            StructField("humidity", FloatType()),
            StructField("pressure", FloatType()),
            StructField("light_level", IntegerType()),
        ]),
    ),
    StructField(
        "device_status",
        StructType([
            StructField("battery_level", IntegerType()),
            StructField("signal_strength", IntegerType()),
            StructField("connection_type", StringType()),
        ]),
    ),
    StructField(
        "metadata",
        StructType([
            StructField("sensor_type", StringType()),
            StructField("unit_temperature", StringType()),
            StructField("unit_humidity", StringType()),
            StructField("unit_pressure", StringType()),
            StructField("unit_light_level", StringType()),
            StructField("firmware_version", StringType()),
        ]),
    ),
])
CLIENT_PROPERTY_KEYS = [
    "security.protocol",
    "sasl.mechanism",
    "sasl.login.callback.handler.class",
    "sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
    # Parse client properties file
    parsed_path = urlparse(client_properties_path)
    if parsed_path.scheme != "gs":
        raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

    bucket_name = parsed_path.netloc
    blob_name = parsed_path.path.lstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    file_content = "[DEFAULT]\n" + blob.download_as_text()

    config = configparser.ConfigParser()
    config.read_string(file_content)

    client_properties = dict()
    for key in CLIENT_PROPERTY_KEYS:
        client_properties[key] = config.get("DEFAULT", key)

    print(f"Client properties: {client_properties}")
    return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
    print("Starting initial data processing...")
    initial_rows = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_server_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", "kafka-iot-topic")
        .option("kafka.security.protocol", client_properties["security.protocol"])
        .option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
        .option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
        .option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
        .load()
    )

    initial_rows = (
        initial_rows.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), JSON_SCHEMA).alias("data"))
        .select("data.*")
    )
    
    # Print first 20 rows
    def print_top_rows(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
            batch_df.limit(20).show(truncate=False)

    initial_query_print = initial_rows.writeStream \
        .foreachBatch(print_top_rows) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_print)

    # Calculate and print average temperatures
    def process_initial_avg_temp(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
            current_averages = (
                batch_df.select("device_id", "sensor_data.temperature")
                .groupBy("device_id")
                .agg(avg("temperature").alias("average_temperature"))
                .orderBy("device_id")
            )
            current_averages.show(truncate=False)

    initial_query_avg_temp = initial_rows.writeStream \
        .foreachBatch(process_initial_avg_temp) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_avg_temp)
    
    # Write data to GCS
    initial_data_gcs_writer = (
        initial_rows.writeStream.outputMode("append")
        .format("avro")
        .option("path", store_data_gcs_path+"/tables/iot_avro/")
        .option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
        .trigger(once=True) \
        .start()
    )
    queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

    client_properties = get_client_properties(client_properties_path)

    # Create SparkSession
    spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
    
    queries_to_await = []

    process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
    
    # Wait for all queries to terminate
    for query in queries_to_await:
        try:
            query.awaitTermination()
        except Exception as e:
            print(f"Error awaiting query: {e}")
        finally:
            query.stop()

    spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Invalid number of arguments passed ", len(sys.argv))
        print(
            "Usage: ",
            sys.argv[0],
            " <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
        )
        raise ValueError("Invalid number of arguments passed.")

    parsed_data_path = urlparse(sys.argv[3])
    if parsed_data_path.scheme != "gs":
        raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


    main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

Explanation

  • This code sets up a PySpark Structured Streaming job to read data from a specified Kafka topic. It uses the provided Kafka server bootstrap address and Kafka configurations loaded from a GCS config file to connect and authenticate with the Kafka broker.
  • It first reads the raw data from Kafka as a stream of byte arrays, and casts those byte arrays to strings, and applies the json_schema using Spark's StructType to specify the structure of the data (device ID, timestamp, location, sensor data, etc.).
  • It prints the first 10 rows to the console for preview, computes the average temperature per sensor and writes all data to the GCS bucket in avro format. Avro is a row-based data serialization system which efficiently stores structured data in a compact, schema-defined binary format, offering schema evolution, language neutrality, and high compression for large-scale data processing.
  1. Create the client.properties file and populate the environment variable for the bootstrap address of the kafka server.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Upload process_iot.py and client.properties files to your Google Cloud Storage bucket, so that they can be used by the Dataproc job.
gsutil cp process_iot.py client.properties $BUCKET
  1. Copy some dependency jars for the Dataproc job to your GCS bucket. This directory contains jars that are required to run Spark Streaming jobs with Kafka, and the Managed Kafka authentication library and its dependencies, taken from Set up a client machine.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. Submit the Spark job to the Dataproc cluster.
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

The Spark driver logs will be printed. You should also be able to see these tables logged to the console and data stored in your GCS bucket.

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. Clean up

Follow the steps to clean up resources after completing the codelab.

  1. Delete the Managed Kafka cluster, Publisher GCE VM and the Dataproc cluster.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. Delete your VPC subnetwork and network.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. Delete your GCS bucket if you no longer want to use the data.
gcloud storage rm --recursive $BUCKET

9. Congratulations

Congratulations, you have successfully built an IoT data processing pipeline with Manage Kafka and Dataproc that helps DemoIOT Solutions gain real time insights on the data published by their devices!

You created a Managed Kafka cluster, published IoT events to it and ran a Dataproc job that used Spark streaming to process these events in real time. You now know the key steps required to create data pipelines using Managed Kafka and Dataproc.

Reference docs