使用 Dataproc 和 Google Managed Service for Apache Kafka 进行实时 IoT 数据处理

使用 Dataproc 和 Google Managed Service for Apache Kafka 进行实时 IoT 数据处理

关于此 Codelab

subject上次更新时间:6月 16, 2025
account_circleDevanshi Khatsuriya 编写

1. 简介

2efacab8643a653b.png

上次更新日期:2024 年 6 月 10 日

背景

物联网 (IoT) 设备(从智能家居解决方案到工业传感器)会在网络边缘生成大量数据。这些数据对于各种用例(例如设备监控、跟踪、诊断、监视、个性化、车队优化等)而言非常有用。Google Managed Service for Apache Kafka 提供了一种可扩缩且持久的方式,可以兼容 OSS、易于使用且安全的方式提取和存储这类持续流式数据,而 Google Cloud Dataproc 则允许使用 Apache Spark 和 Hadoop 集群处理这些大型数据集,以便进行数据分析。

构建内容

在本 Codelab 中,您将使用 Google Managed Service for Apache Kafka、Dataproc、Python 和 Apache Spark 构建一个可进行实时分析的 IoT 数据处理流水线。您的流水线将:

  • 使用 GCE 虚拟机将 IoT 设备中的数据发布到托管 Kafka 集群
  • 将数据从 Managed Kafka 集群流式传输到 Dataproc 集群
  • 使用 Dataproc Spark Streaming 作业处理数据

学习内容

  • 如何创建 Google 托管的 Kafka 和 Dataproc 集群
  • 如何使用 Dataproc 运行流式作业

所需条件

2. 概览

在本 Codelab 中,我们将跟随一个虚构公司 DemoIOT Solutions 的故事。DemoIOT Solutions 提供传感器设备,用于测量和传输温度、湿度、压力、光照度和位置数据。他们希望设置用于处理此类数据的流水线,以向客户显示实时统计信息。借助此类流水线,他们可以为客户提供各种服务,例如监控、自动建议、提醒以及有关客户安装传感器的位置的数据分析。

为此,我们将使用 GCE 虚拟机来模拟 IoT 设备。设备将将数据发布到 Google 托管的 Kafka 集群中的 Kafka 主题,Dataproc 流式作业将读取并处理这些数据。您可以按照“前提条件”设置和以下页面中的说明完成所有这些步骤。

前提条件设置

  1. 找到项目的项目名称和项目编号。如需参考信息,请参阅查找项目名称、编号和 ID
  2. VPC 子网。这样便可在 GCE 虚拟机、Kafka 集群和 Dataproc 集群之间建立连接。请按照此处的说明使用 gcloud CLI 列出现有子网。如有必要,请按照创建自动模式 VPC 网络中的说明操作,以便在每个 Google Cloud 区域中创建包含子网的 VPC 网络。不过,在此 Codelab 中,我们将仅使用来自单个区域的子网。
  • 在此子网中,请确保有一条防火墙规则允许来自 tcp:22(这是 SSH 的必需端口)的所有入站流量。创建网络时,您可以在“防火墙规则”部分下选择此规则,因此请务必选择此规则。
  1. GCS 存储分区。您需要访问 Google Cloud 存储分区,以存储 Dataproc 作业资源并保留已处理的数据。如果您还没有数据中心,可以在 GCP 项目中创建一个。

填充环境变量

在运行 gcloud CLI 的终端中,填充这些环境变量,以便稍后使用。

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

替换以下内容:

  • <project-id> 替换为您设置的 GCP 项目的名称。
  • <project-number> 替换为前提条件第 1 步中项目编号的名称。
  • <region> 替换为您要使用的可用区域和可用区中的某个区域的名称。例如,我们可以使用 us-central1
  • <zone> 替换为您之前选择的区域下可用的区域和可用区中相应可用区的名称。例如,如果您选择了 us-central1 作为区域,则可以使用 us-central1-f 作为可用区。此可用区将用于创建模拟 IoT 设备的 GCE 虚拟机。确保您的区域位于您所选的地区
  • <subnet-path> 替换为前提步骤 2 中的子网的完整路径。此值的格式必须为:projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
  • <bucket-name> 替换为前提条件第 3 步中的 GCS 存储分区名称。

3. 设置 Google 托管的 Kafka

本部分将设置 Google 托管的 Kafka 集群,该集群会部署 Kafka 服务器,并在此服务器上创建一个主题,以便在订阅 IoT 数据后发布和读取数据。DemoIOT Solutions 可以设置此集群,以便其所有设备向其发布数据。

创建托管式 Kafka 集群

  • 创建托管式 Kafka 集群。在本例中,集群的名称为 kafka-iot
gcloud managed-kafka clusters create kafka-iot \
   
--project=$PROJECT_ID \
   
--location=$REGION \
   
--cpu=3 \
   
--memory=12GiB \
   
--subnets=$SUBNET_PATH \
   
--auto-rebalance

您应该会看到如下所示的响应:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                        
Created cluster [kafka-iot].

集群创建需要 20-30 分钟。等待此操作完成。

创建主题

  • 在集群上创建一个托管 Kafka 主题。在本例中,主题的名称为 kafka-iot-topic
gcloud managed-kafka topics create kafka-iot-topic \
   
--cluster=kafka-iot \
   
--location=$REGION \
   
--partitions=10 \
   
--replication-factor=3

您应该会看到类似于以下内容的输出:

Created topic [kafka-iot-topic].

4. 设置发布商

如需发布到托管式 Kafka 集群,我们设置了一个 Google Compute Engine 虚拟机实例,该实例可以访问包含托管式 Kafka 集群使用的子网的 VPC。此虚拟机模拟 DemoIOT Solutions 提供的传感器设备。

步骤

  1. 创建 Google Compute Engine 虚拟机实例。在本例中,GCE 虚拟机的名称为 publisher-instance
gcloud compute instances create publisher-instance \
   
--scopes=https://www.googleapis.com/auth/cloud-platform \
   
--subnet=$SUBNET_PATH \
   
--zone=$ZONE
  1. 向 Google Compute Engine 默认服务账号授予使用 Managed Service for Apache Kafka 的权限。
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. 使用 SSH 连接到虚拟机。或者,您也可以使用 Google Cloud 控制台进行 SSH 连接
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. 安装 Java 以运行 Kafka 命令行工具,并使用以下命令下载 Kafka 二进制文件。
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. 下载 Managed Kafka 身份验证库及其依赖项,并配置 Kafka 客户端属性。
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

如需详细了解发布商机器设置,请参阅设置客户端机器

5. 发布到托管 Kafka

现在,发布端已设置完毕,我们可以使用其 Kafka 命令行将一些虚构数据从 GCE 虚拟机(由 DemoIOT Solutions 模拟的 IoT 设备)发布到托管式 Kafka 集群。

  1. 由于我们已通过 SSH 连接到 GCE 虚拟机实例,因此需要重新填充 PROJECT_ID 变量:
export PROJECT_ID=<project-id>
export REGION=<region>

替换以下内容:

  • <project-id> 替换为您设置的 GCP 项目的名称。
  • <region> 替换为创建 Kafka 集群的区域
  1. 使用 managed-kafka clusters describe 命令获取 Kafka 引导服务器的 IP 地址。此地址可用于连接到 Kafka 集群。
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. 列出集群中的主题:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

您应该会看到以下输出,其中包含我们之前创建的主题 kafka-iot-topic

__remote_log_metadata
kafka
-iot-topic
  1. 将此脚本复制并粘贴到新文件 publish_iot_data.sh 中。如需在 GCE 虚拟机上创建新文件,您可以使用 vimnano 等工具。
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data
.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data
() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data
() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status
() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka
() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

解释

  • 此脚本会创建包含模拟传感器读数的 JSON 消息,其中包含设备 ID、时间戳、传感器数据(温度、湿度、气压、光照)、位置信息(经度、纬度)、设备状态(电池电量、信号、连接类型)和一些元数据。
  • 它会从一定数量的唯一设备生成连续的消息流,每台设备都会在指定的时间间隔发送数据,模拟真实的 IoT 设备。在这里,我们发布了 10 个设备的数据,每个设备每隔 10 秒生成 20 次读数。
  • 它还会使用 Kafka 生产者命令行工具将所有生成的数据发布到 Kafka 主题。
  1. 安装脚本使用的部分依赖项:用于数学计算的 bc 软件包和用于 JSON 处理的 jq 软件包。
sudo apt-get install bc jq
  1. 将脚本修改为可执行脚本,然后运行该脚本。运行该作业大约需要 2 分钟。
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

您可以运行以下命令来检查事件是否已成功发布,该命令会输出所有事件。按 <control-c> 即可退出。

kafka-console-consumer.sh \
   
--topic kafka-iot-topic \
   
--from-beginning \
   
--bootstrap-server $BOOTSTRAP \
   
--consumer.config client.properties

6. 设置 Dataproc 集群

在本部分中,您将在托管式 Kafka 集群所在的 VPC 子网中创建一个 Dataproc 集群。此集群将用于运行生成 DemoIOT Solutions 所需实时统计信息和数据分析的作业。

  1. 创建 Dataproc 集群。在这里,集群的名称为 dataproc-iot
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

您应该会看到如下所示的响应:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

集群创建可能需要 10 到 15 分钟。等待此操作成功完成,然后通过描述集群来检查集群是否处于 RUNNING 状态。

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. 使用 Dataproc 处理 Kafka 消息

在本最后一部分中,您将提交一个 Dataproc 作业,该作业使用 Spark Streaming 处理已发布的消息。此作业实际上会生成一些实时统计信息和数据洞见,供 DemoIOT Solutions 使用。

  1. 运行以下命令,在本地创建名为 process_iot.py 的流式 PySpark 作业文件。
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
   
StructField("device_id", StringType()),
   
StructField("timestamp", StringType()),
   
StructField(
       
"location",
       
StructType([
           
StructField("latitude", FloatType()),
           
StructField("longitude", FloatType()),
       
]),
   
),
   
StructField(
       
"sensor_data",
       
StructType([
           
StructField("temperature", FloatType()),
           
StructField("humidity", FloatType()),
           
StructField("pressure", FloatType()),
           
StructField("light_level", IntegerType()),
       
]),
   
),
   
StructField(
       
"device_status",
       
StructType([
           
StructField("battery_level", IntegerType()),
           
StructField("signal_strength", IntegerType()),
           
StructField("connection_type", StringType()),
       
]),
   
),
   
StructField(
       
"metadata",
       
StructType([
           
StructField("sensor_type", StringType()),
           
StructField("unit_temperature", StringType()),
           
StructField("unit_humidity", StringType()),
           
StructField("unit_pressure", StringType()),
           
StructField("unit_light_level", StringType()),
           
StructField("firmware_version", StringType()),
       
]),
   
),
])
CLIENT_PROPERTY_KEYS = [
   
"security.protocol",
   
"sasl.mechanism",
   
"sasl.login.callback.handler.class",
   
"sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
   
# Parse client properties file
   
parsed_path = urlparse(client_properties_path)
   
if parsed_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

   
bucket_name = parsed_path.netloc
   
blob_name = parsed_path.path.lstrip("/")

   
client = storage.Client()
   
bucket = client.bucket(bucket_name)
   
blob = bucket.blob(blob_name)
   
file_content = "[DEFAULT]\n" + blob.download_as_text()

   
config = configparser.ConfigParser()
   
config.read_string(file_content)

   
client_properties = dict()
   
for key in CLIENT_PROPERTY_KEYS:
       
client_properties[key] = config.get("DEFAULT", key)

   
print(f"Client properties: {client_properties}")
   
return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
   
print("Starting initial data processing...")
   
initial_rows = (
       
spark.readStream.format("kafka")
       
.option("kafka.bootstrap.servers", bootstrap_server_address)
       
.option("startingOffsets", "earliest")
       
.option("subscribe", "kafka-iot-topic")
       
.option("kafka.security.protocol", client_properties["security.protocol"])
       
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
       
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
       
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
       
.load()
   
)

   
initial_rows = (
       
initial_rows.selectExpr("CAST(value AS STRING)")
       
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
       
.select("data.*")
   
)
   
   
# Print first 20 rows
   
def print_top_rows(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
           
batch_df.limit(20).show(truncate=False)

   
initial_query_print = initial_rows.writeStream \
       
.foreachBatch(print_top_rows) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_print)

   
# Calculate and print average temperatures
   
def process_initial_avg_temp(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
           
current_averages = (
               
batch_df.select("device_id", "sensor_data.temperature")
               
.groupBy("device_id")
               
.agg(avg("temperature").alias("average_temperature"))
               
.orderBy("device_id")
           
)
           
current_averages.show(truncate=False)

   
initial_query_avg_temp = initial_rows.writeStream \
       
.foreachBatch(process_initial_avg_temp) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_avg_temp)
   
   
# Write data to GCS
   
initial_data_gcs_writer = (
       
initial_rows.writeStream.outputMode("append")
       
.format("avro")
       
.option("path", store_data_gcs_path+"/tables/iot_avro/")
       
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
       
.trigger(once=True) \
       
.start()
   
)
   
queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

   
client_properties = get_client_properties(client_properties_path)

   
# Create SparkSession
   
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
   
   
queries_to_await = []

   
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
   
   
# Wait for all queries to terminate
   
for query in queries_to_await:
       
try:
           
query.awaitTermination()
       
except Exception as e:
           
print(f"Error awaiting query: {e}")
       
finally:
           
query.stop()

   
spark.stop()

if __name__ == "__main__":
   
if len(sys.argv) < 4:
       
print("Invalid number of arguments passed ", len(sys.argv))
       
print(
           
"Usage: ",
           
sys.argv[0],
           
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
       
)
       
raise ValueError("Invalid number of arguments passed.")

   
parsed_data_path = urlparse(sys.argv[3])
   
if parsed_data_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


   
main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

解释

  • 此代码会设置一个 PySpark 结构化流式传输作业,以从指定的 Kafka 主题读取数据。它使用提供的 Kafka 服务器引导地址和从 GCS 配置文件加载的 Kafka 配置来连接到 Kafka 代理并对其进行身份验证。
  • 它首先将 Kafka 中的原始数据读取为字节数组流,然后将这些字节数组转换为字符串,并使用 Spark 的 StructType 应用 json_schema 来指定数据的结构(设备 ID、时间戳、位置、传感器数据等)。
  • 它会将前 10 行输出到控制台以供预览,计算每个传感器的平均温度,并以 avro 格式将所有数据写入 GCS 存储分区。Avro 是一种基于行的可序列化数据系统,可以架构定义的紧凑二进制格式高效存储结构化数据,并支持架构演变、语言中立性和高压缩率,适用于大规模数据处理。
  1. 创建 client.properties 文件,并为 Kafka 服务器的引导地址填充环境变量。
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. process_iot.pyclient.properties 文件上传到您的 Google Cloud Storage 存储分区,以便 Dataproc 作业可以使用它们。
gsutil cp process_iot.py client.properties $BUCKET
  1. 将 Dataproc 作业的部分依赖项 jar 复制到您的 GCS 存储分区。此目录包含使用 Kafka 运行 Spark Streaming 作业所需的 jar 文件,以及从设置客户端机器中获取的 Managed Kafka 身份验证库及其依赖项。
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. 将 Spark 作业提交到 Dataproc 集群。
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

系统会输出 Spark 驱动程序日志。您还应该能够看到记录到控制台的这些表以及存储在 GCS 存储分区中的数据。

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. 清理

完成 Codelab 后,请按照相应步骤清理资源。

  1. 删除托管式 Kafka 集群、发布方 GCE 虚拟机和 Dataproc 集群。
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. 删除您的 VPC 子网和网络。
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. 如果您不想再使用这些数据,请删除您的 GCS 存储分区。
gcloud storage rm --recursive $BUCKET

9. 恭喜

恭喜,您已成功使用 Manage Kafka 和 Dataproc 构建了物联网数据处理流水线,该流水线可帮助 DemoIOT Solutions 针对其设备发布的数据获得实时数据洞见!

您创建了一个托管式 Kafka 集群,向其中发布了 IoT 事件,并运行了一个 Dataproc 作业,该作业使用 Spark 流式传输实时处理这些事件。现在,您已了解使用托管式 Kafka 和 Dataproc 创建数据流水线所需的主要步骤。

参考文档