1. Introduction
Last Updated: 2024-06-10
Background
Internet of Things (IoT) devices, ranging from smart home solutions to industrial sensors, generate vast amounts of data at the edge of the network. This data is invaluable for a variety of use-cases, like device monitoring, tracking, diagnostics, surveillance, personalization, fleet optimization and much more. Google Managed Service for Apache Kafka offers a scalable and durable way to ingest and store this continuous stream of data in an OSS compatible, easy to use and secure way, while Google Cloud Dataproc allows processing of these large datasets for data analytics using Apache Spark and Hadoop clusters.
What you'll build
In this codelab, you are going to build an IoT Data Processing pipeline using Google Managed Service for Apache Kafka, Dataproc, Python and Apache Spark that does real-time analytics. Your pipeline will:
- Publish data from IoT devices to a Managed Kafka cluster using GCE VMs
- Stream data from the Manage Kafka cluster to a Dataproc cluster
- Process data using a Dataproc Spark Streaming job
What you'll learn
- How to create Google Managed Kafka and Dataproc clusters
- How to run streaming jobs using Dataproc
What you'll need
- An active GCP account with a project set up. If you don't have one, you can sign up for a free trial.
- gcloud CLI installed and configured. You can follow the instructions for installing gcloud CLI on your OS.
- Enabled APIs for Google Managed Kafka and Dataproc in your GCP project.
2. Overview
For this codelab, let's follow the story of a dummy company, DemoIOT Solutions. DemoIOT Solutions provides sensor devices that measure and transmit data for temperature, humidity, pressure, light level and location. They would like to set up pipelines that process this data to show real time statistics to their customers. Using such pipelines, they can provide a wide variety of services to their customers, like monitoring, automated suggestions, alerts and insights about places where the customers have installed their sensors.
To do this, we will use a GCE VM to simulate the IoT device. The device will publish data to a Kafka topic in the Google Managed Kafka cluster, which will be read and processed by a Dataproc streaming job. The Prerequisite setup and the following pages will lead you to perform all these steps.
Prerequisite setup
- Find the project name and project number for your project. See Find the project name, number, and ID for reference.
- VPC subnetwork. This will allow connectivity between the GCE VM, Kafka cluster and Dataproc cluster. Follow this to list existing subnets using gcloud CLI. If required, follow create an auto mode VPC network which will create a VPC network with subnetwork in each Google Cloud region. Though, for the purpose of this codelab, we will use a subnetwork from a single region only.
- In this subnetwork, ensure that there is a Firewall rule allowing all ingress from tcp:22, which is required SSH. This rule will be available to select under the Firewall rules section when creating a network, so ensure that you select it.
- GCS bucket. You will need access to a Google Cloud storage bucket to store Dataproc job resources and persist processed data. If you don't have one, you can create one in your GCP project.
Populate environment variables
In your terminal where you run the gcloud CLI, populate these environment variables so that they can be used later.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
Replace the following:
<project-id>
with the name of the GCP project you set up.<project-number>
with the name of the project number from Prerequisite step 1.<region>
with the name of a region from Available regions and zones that you want to use. For instance, we can useus-central1
.<zone>
with the name of the zone from Available regions and zones under the region you previously selected. For instance, if you selectedus-central1
as the region, you can useus-central1-f
as the zone. This zone will be used to create the GCE VM that simulates IoT devices. Ensure that your zone is in the region that you have chosen.<subnet-path>
with the full path of the subnet from Prerequisite step 2. The value of this must be in the format:projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
.<bucket-name>
with the name of GCS bucket from Prerequisite step 3.
3. Set up Google Managed Kafka
This section sets up a Google Managed Kafka cluster, which deploys the Kafka server, and creates a topic on this server where the IoT data can be published and read from after subscribing to it. DemoIOT Solutions can set up this cluster so that all their devices publish data to it.
Create a Managed Kafka cluster
- Create the Managed Kafka cluster. Here, the name of the cluster is
kafka-iot
.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
You should get a response similar to the following:
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
Cluster creation takes 20-30 minutes. Wait for the completion of this operation.
Create a Topic
- Create a Managed Kafka topic on the cluster. Here, the name of the topic is
kafka-iot-topic
.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
You should get an output similar to the following:
Created topic [kafka-iot-topic].
4. Set up a Publisher
To publish to the Managed Kafka cluster, we set up a Google Compute Engine VM instance that can access the VPC containing the subnet used by the Managed Kafka cluster. This VM simulates the sensor devices provided by DemoIOT Solutions.
Steps
- Create the Google Compute Engine VM instance. Here, the name of the GCE VM is
publisher-instance
.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Give the Google Compute Engine default service account the permissions to use Managed Service for Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- Use SSH to connect to the VM. Alternatively, use the Google Cloud Console to SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Install Java to run Kafka command line tools, and download Kafka binary using these commands.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- Download Managed Kafka authentication library and its dependencies and configure Kafka client properties.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
For more details about the publisher machine setup, refer to Set up a client machine.
5. Publish to Managed Kafka
Now that the publisher is set up, we can use the Kafka command line on it to publish some dummy data from the GCE VM (simulating IoT devices by DemoIOT Solutions) to the Managed Kafka cluster.
- Since we have SSHed into the GCE VM instance, we need to re-populate the
PROJECT_ID
variable:
export PROJECT_ID=<project-id>
export REGION=<region>
Replace the following:
<project-id>
with the name of the GCP project you set up.<region>
with the region in which the Kafka cluster was created
- Use the
managed-kafka clusters describe
command to get the IP address of the Kafka bootstrap server. This address can be used to connect to the Kafka cluster.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- List the topics in the cluster:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
You should be able to see the following output, containing the topic kafka-iot-topic
we created earlier.
__remote_log_metadata
kafka-iot-topic
- Copy and paste this script into a new file
publish_iot_data.sh
. To create a new file on the GCE VM, you can use a tool likevim
ornano
.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
Explanation
- This script creates JSON messages with simulated sensor readings that have device ID, timestamp, sensor data (temperature, humidity, pressure, light), location information (latitude, longitude), device status (battery, signal, connection type) and some metadata.
- It generates a continuous flow of messages from a set number of unique devices, each sending data at a specified time interval, mimicking real-world IoT devices. Here, we publish data from 10 devices that produce 20 readings each, at a 10 second time interval.
- It also publishes all generated data to the Kafka topic using the Kafka producer command line tool.
- Install some dependencies used by the script -
bc
package for mathematical calculations andjq
package for JSON processing.
sudo apt-get install bc jq
- Modify the script to be an executable and run the script. It should take about 2 minutes to run.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
You can check that the events got published successfully by running this command which will print all events. Press <control-c>
to exit.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Set up the Dataproc Cluster
This section creates a Dataproc cluster in the VPC subnetwork where the Managed Kafka cluster is present. This cluster will be used to run jobs that generate the real time statistics and insights needed by DemoIOT Solutions.
- Create a Dataproc cluster. Here the name of the cluster is
dataproc-iot
.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
You should get a response similar to the following:
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
Cluster creation may take 10-15 minutes. Wait for the successful completion of this operation and check that the cluster is in RUNNING
state by describing the cluster.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Process Kafka messages using Dataproc
In this last section, you will submit a Dataproc job that processes the published messages using Spark Streaming. This job actually generates some real time statistics and insights that can be used by DemoIOT Solutions.
- Run this command to create the streaming PySpark job file called
process_iot.py
locally.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
Explanation
- This code sets up a PySpark Structured Streaming job to read data from a specified Kafka topic. It uses the provided Kafka server bootstrap address and Kafka configurations loaded from a GCS config file to connect and authenticate with the Kafka broker.
- It first reads the raw data from Kafka as a stream of byte arrays, and casts those byte arrays to strings, and applies the
json_schema
using Spark's StructType to specify the structure of the data (device ID, timestamp, location, sensor data, etc.). - It prints the first 10 rows to the console for preview, computes the average temperature per sensor and writes all data to the GCS bucket in
avro
format. Avro is a row-based data serialization system which efficiently stores structured data in a compact, schema-defined binary format, offering schema evolution, language neutrality, and high compression for large-scale data processing.
- Create the
client.properties
file and populate the environment variable for the bootstrap address of the kafka server.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Upload
process_iot.py
andclient.properties
files to your Google Cloud Storage bucket, so that they can be used by the Dataproc job.
gsutil cp process_iot.py client.properties $BUCKET
- Copy some dependency jars for the Dataproc job to your GCS bucket. This directory contains jars that are required to run Spark Streaming jobs with Kafka, and the Managed Kafka authentication library and its dependencies, taken from Set up a client machine.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Submit the Spark job to the Dataproc cluster.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
The Spark driver logs will be printed. You should also be able to see these tables logged to the console and data stored in your GCS bucket.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. Clean up
Follow the steps to clean up resources after completing the codelab.
- Delete the Managed Kafka cluster, Publisher GCE VM and the Dataproc cluster.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- Delete your VPC subnetwork and network.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- Delete your GCS bucket if you no longer want to use the data.
gcloud storage rm --recursive $BUCKET
9. Congratulations
Congratulations, you have successfully built an IoT data processing pipeline with Manage Kafka and Dataproc that helps DemoIOT Solutions gain real time insights on the data published by their devices!
You created a Managed Kafka cluster, published IoT events to it and ran a Dataproc job that used Spark streaming to process these events in real time. You now know the key steps required to create data pipelines using Managed Kafka and Dataproc.