使用 Dataproc 和 Google Managed Service for Apache Kafka 进行实时 IoT 数据处理

1. 简介

2efacab8643a653b.png

上次更新时间: 2024-06-10

背景

从智能家居解决方案到工业传感器,物联网 (IoT) 设备在网络边缘生成大量数据。这些数据对于各种使用场景(例如设备监控、跟踪、诊断、监视、个性化、车队优化等)来说非常宝贵。Google Managed Service for Apache Kafka 提供了一种可扩容且持久的方式,以 OSS 兼容、易于使用且安全的方式注入和存储这种连续的数据流,而 Google Cloud Dataproc 则允许使用 Apache Spark 和 Hadoop 集群处理这些大型数据集以进行数据分析。

构建内容

在此 Codelab 中,您将使用 Google Managed Service for Apache Kafka、Dataproc、Python 和 Apache Spark 构建一个 IoT 数据处理流水线,该流水线可进行实时分析。您的流水线将:

  • 使用 GCE 虚拟机将 IoT 设备中的数据发布到 Managed Kafka 集群
  • 将数据从 Managed Kafka 集群流式传输到 Dataproc 集群
  • 使用 Dataproc Spark Streaming 作业处理数据

学习内容

  • 如何创建 Google Managed Kafka 和 Dataproc 集群
  • 如何使用 Dataproc 运行流式作业

所需条件

2. 概览

在此 Codelab 中,我们来了解一家虚构的公司 DemoIOT Solutions。DemoIOT Solutions 提供传感器设备,用于测量和传输温度、湿度、压力、光照强度和位置数据。他们希望设置处理这些数据的流水线,以便向客户显示实时统计信息。借助此类流水线,他们可以向客户提供各种服务,例如监控、自动建议、提醒以及客户安装传感器的位置的相关洞见。

为此,我们将使用 GCE 虚拟机来模拟 IoT 设备。该设备会将数据发布到 Google Managed Kafka 集群中的 Kafka 主题,Dataproc 流式作业将读取并处理这些数据。前提条件设置和后续页面将引导您执行所有这些步骤。

前提条件设置

  1. 查找项目的项目名称和项目编号。如需参考,请参阅查找项目名称、编号和 ID
  2. VPC 子网。这将允许 GCE 虚拟机、Kafka 集群和 Dataproc 集群之间建立连接。按照此步骤使用 gcloud CLI 列出现有子网。如果需要,请按照 创建自动模式 VPC 网络 中的步骤操作,该步骤将在每个 Google Cloud 区域中创建一个包含子网的 VPC 网络。不过,就此 Codelab 而言,我们将仅使用单个区域中的子网。
  • 在此子网中,确保存在一条防火墙规则,允许来自 tcp:22 的所有入站流量,这是 SSH 所必需的。创建网络时,您可以在“防火墙规则”部分下选择此规则,因此请确保选择它。
  1. GCS 存储分区。您需要访问 Google Cloud Storage 存储分区,以存储 Dataproc 作业资源并保留处理后的数据。如果您没有存储分区,可以在 GCP 项目中 创建一个

填充环境变量

在运行 gcloud CLI 的终端中,填充这些环境变量,以便稍后使用。

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

替换以下内容:

  • <project-id> 替换为您设置的 GCP 项目的名称。
  • <project-number> 替换为前提条件步骤 1 中的项目编号的名称。
  • <region> 替换为 可用区域和可用区中您要使用的区域的名称。例如,我们可以使用 us-central1
  • <zone> 替换为 可用区域和可用区中您之前选择的区域下的可用区的名称。例如,如果您选择 us-central1 作为区域,则可以使用 us-central1-f 作为可用区。此可用区将用于创建模拟 IoT 设备的 GCE 虚拟机。确保您的 可用区位于您选择的区域中
  • <subnet-path> 替换为前提条件步骤 2 中的子网的完整路径。此值必须采用以下格式:projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
  • <bucket-name> 替换为前提条件步骤 3 中的 GCS 存储分区的名称。

3. 设置 Google Managed Kafka

本部分将设置 Google Managed Kafka 集群,该集群会部署 Kafka 服务器,并在此服务器上创建一个主题,IoT 数据可以在订阅后发布到该主题并从中读取。DemoIOT Solutions 可以设置此集群,以便其所有设备都将数据发布到该集群。

创建 Managed Kafka 集群

  • 创建 Managed Kafka 集群。在此示例中,集群的名称为 kafka-iot
gcloud managed-kafka clusters create kafka-iot \
    --project=$PROJECT_ID \
    --location=$REGION \
    --cpu=3 \
    --memory=12GiB \
    --subnets=$SUBNET_PATH \
    --auto-rebalance

您应该会看到如下所示的响应:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                         
Created cluster [kafka-iot].

创建集群需要 20-30 分钟。请等待此操作完成。

创建主题

  • 在集群上创建 Managed Kafka 主题。在此示例中,主题的名称为 kafka-iot-topic
gcloud managed-kafka topics create kafka-iot-topic \
    --cluster=kafka-iot \
    --location=$REGION \
    --partitions=10 \
    --replication-factor=3

您应该会看到如下所示的输出:

Created topic [kafka-iot-topic].

4. 设置发布者

如需发布到 Managed Kafka 集群,我们设置了一个 Google Compute Engine 虚拟机实例,该实例可以访问包含 Managed Kafka 集群使用的子网的 VPC。此虚拟机模拟 DemoIOT Solutions 提供的传感器设备。

步骤

  1. 创建 Google Compute Engine 虚拟机实例。在此示例中,GCE 虚拟机的名称为 publisher-instance
gcloud compute instances create publisher-instance \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --subnet=$SUBNET_PATH \
    --zone=$ZONE
  1. 向 Google Compute Engine 默认服务账号授予使用 Managed Service for Apache Kafka 的权限。
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. 使用 SSH 连接到虚拟机。或者,您也可以使用 Google Cloud 控制台通过 SSH 连接
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. 安装 Java 以运行 Kafka 命令行工具,并使用这些命令下载 Kafka 二进制文件。
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. 下载 Managed Kafka 身份验证库及其依赖项,并配置 Kafka 客户端属性。
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

如需详细了解发布商机器设置,请参阅 设置客户端机器

5. 发布到 Managed Kafka

现在发布方已设置完毕,我们可以在其上使用 Kafka 命令行,将一些来自 GCE 虚拟机(模拟 DemoIOT Solutions 的 IoT 设备)的虚拟数据发布到 Managed Kafka 集群。

  1. 由于我们已通过 SSH 连接到 GCE 虚拟机实例,因此需要重新填充 PROJECT_ID 变量:
export PROJECT_ID=<project-id>
export REGION=<region>

替换以下内容:

  • <project-id> 替换为您设置的 GCP 项目的名称。
  • <region> 替换为创建 Kafka 集群的区域
  1. 使用 managed-kafka clusters describe 命令获取 Kafka 引导服务器的 IP 地址。此地址可用于连接到 Kafka 集群。
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. 列出集群中的主题:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

您应该能够看到以下输出,其中包含我们之前创建的主题 kafka-iot-topic

__remote_log_metadata
kafka-iot-topic
  1. 将此脚本复制并粘贴到新文件 publish_iot_data.sh 中。如需在 GCE 虚拟机上创建新文件,您可以使用 vimnano 等工具。
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

解释

  • 此脚本会创建包含模拟传感器读数的 JSON 消息,这些消息具有设备 ID、时间戳、传感器数据(温度、湿度、压力、光照)、位置信息(纬度、经度)、设备状态(电池、信号、连接类型)和一些元数据。
  • 它会从一组数量固定的唯一设备生成连续的消息流,每个设备都以指定的时间间隔发送数据,从而模拟真实的 IoT 设备。在此示例中,我们从 10 个设备发布数据,每个设备生成 20 个读数,时间间隔为 10 秒。
  • 它还会使用 Kafka 生产者命令行工具将所有生成的数据发布到 Kafka 主题。
  1. 安装脚本使用的一些依赖项 - 用于数学计算的 bc 软件包和用于 JSON 处理的 jq 软件包。
sudo apt-get install bc jq
  1. 修改脚本以使其可执行,然后运行该脚本。运行该脚本大约需要 2 分钟。
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

您可以运行此命令来检查事件是否已成功发布,该命令将打印所有事件。按 <control-c> 退出。

kafka-console-consumer.sh \
    --topic kafka-iot-topic \
    --from-beginning \
    --bootstrap-server $BOOTSTRAP \
    --consumer.config client.properties

6. 设置 Dataproc 集群

本部分将在 Managed Kafka 集群所在的 VPC 子网中创建一个 Dataproc 集群。此集群将用于运行生成 DemoIOT Solutions 所需的实时统计信息和洞见的作业。

  1. 创建 Dataproc 集群。在此示例中,集群的名称为 dataproc-iot
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

您应该会看到如下所示的响应:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

创建集群可能需要 10-15 分钟。请等待此操作成功完成,并通过描述集群检查集群是否处于 RUNNING 状态。

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. 使用 Dataproc 处理 Kafka 消息

在最后一个部分中,您将提交一个 Dataproc 作业,该作业使用 Spark Streaming 处理已发布的消息。此作业实际上会生成一些 DemoIOT Solutions 可以使用的实时统计信息和洞见。

  1. 运行此命令以在本地创建名为 process_iot.py 的流式 PySpark 作业文件。
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
    StructField("device_id", StringType()),
    StructField("timestamp", StringType()),
    StructField(
        "location",
        StructType([
            StructField("latitude", FloatType()),
            StructField("longitude", FloatType()),
        ]),
    ),
    StructField(
        "sensor_data",
        StructType([
            StructField("temperature", FloatType()),
            StructField("humidity", FloatType()),
            StructField("pressure", FloatType()),
            StructField("light_level", IntegerType()),
        ]),
    ),
    StructField(
        "device_status",
        StructType([
            StructField("battery_level", IntegerType()),
            StructField("signal_strength", IntegerType()),
            StructField("connection_type", StringType()),
        ]),
    ),
    StructField(
        "metadata",
        StructType([
            StructField("sensor_type", StringType()),
            StructField("unit_temperature", StringType()),
            StructField("unit_humidity", StringType()),
            StructField("unit_pressure", StringType()),
            StructField("unit_light_level", StringType()),
            StructField("firmware_version", StringType()),
        ]),
    ),
])
CLIENT_PROPERTY_KEYS = [
    "security.protocol",
    "sasl.mechanism",
    "sasl.login.callback.handler.class",
    "sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
    # Parse client properties file
    parsed_path = urlparse(client_properties_path)
    if parsed_path.scheme != "gs":
        raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

    bucket_name = parsed_path.netloc
    blob_name = parsed_path.path.lstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    file_content = "[DEFAULT]\n" + blob.download_as_text()

    config = configparser.ConfigParser()
    config.read_string(file_content)

    client_properties = dict()
    for key in CLIENT_PROPERTY_KEYS:
        client_properties[key] = config.get("DEFAULT", key)

    print(f"Client properties: {client_properties}")
    return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
    print("Starting initial data processing...")
    initial_rows = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_server_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", "kafka-iot-topic")
        .option("kafka.security.protocol", client_properties["security.protocol"])
        .option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
        .option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
        .option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
        .load()
    )

    initial_rows = (
        initial_rows.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), JSON_SCHEMA).alias("data"))
        .select("data.*")
    )
    
    # Print first 20 rows
    def print_top_rows(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
            batch_df.limit(20).show(truncate=False)

    initial_query_print = initial_rows.writeStream \
        .foreachBatch(print_top_rows) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_print)

    # Calculate and print average temperatures
    def process_initial_avg_temp(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
            current_averages = (
                batch_df.select("device_id", "sensor_data.temperature")
                .groupBy("device_id")
                .agg(avg("temperature").alias("average_temperature"))
                .orderBy("device_id")
            )
            current_averages.show(truncate=False)

    initial_query_avg_temp = initial_rows.writeStream \
        .foreachBatch(process_initial_avg_temp) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_avg_temp)
    
    # Write data to GCS
    initial_data_gcs_writer = (
        initial_rows.writeStream.outputMode("append")
        .format("avro")
        .option("path", store_data_gcs_path+"/tables/iot_avro/")
        .option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
        .trigger(once=True) \
        .start()
    )
    queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

    client_properties = get_client_properties(client_properties_path)

    # Create SparkSession
    spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
    
    queries_to_await = []

    process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
    
    # Wait for all queries to terminate
    for query in queries_to_await:
        try:
            query.awaitTermination()
        except Exception as e:
            print(f"Error awaiting query: {e}")
        finally:
            query.stop()

    spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Invalid number of arguments passed ", len(sys.argv))
        print(
            "Usage: ",
            sys.argv[0],
            " <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
        )
        raise ValueError("Invalid number of arguments passed.")

    parsed_data_path = urlparse(sys.argv[3])
    if parsed_data_path.scheme != "gs":
        raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


    main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

解释

  • 此代码设置了一个 PySpark 结构化串流作业,用于从指定的 Kafka 主题读取数据。它使用提供的 Kafka 服务器引导地址和从 GCS 配置文件加载的 Kafka 配置,以连接 Kafka 代理并进行身份验证。
  • 它首先从 Kafka 读取原始数据作为字节数组流,并将这些字节数组转换为字符串,然后使用 Spark 的 StructType 应用 json_schema 来指定数据的结构(设备 ID、时间戳、位置、传感器数据等)。
  • 它会将前 10 行打印到控制台以进行预览,计算每个传感器的平均温度,并将所有数据以 avro 格式写入 GCS 存储分区。Avro 是一种基于行的数据序列化系统,可将结构化数据高效地存储在紧凑的架构定义二进制格式中,提供架构演变、语言中立性和高压缩率,适用于大规模数据处理。
  1. 创建 client.properties 文件,并填充 Kafka 服务器的引导地址的环境变量。
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. process_iot.pyclient.properties 文件上传到 Google Cloud Storage 存储分区,以便 Dataproc 作业可以使用它们。
gsutil cp process_iot.py client.properties $BUCKET
  1. 将一些 Dataproc 作业的依赖项 jar 复制到您的 GCS 存储分区。此目录包含使用 Kafka 运行 Spark Streaming 作业所需的 jar,以及 Managed Kafka 身份验证库及其依赖项,这些 jar 取自 设置客户端机器
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. 将 Spark 作业提交到 Dataproc 集群。
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

系统将打印 Spark 驱动程序日志。您还应该能够在控制台中看到记录的这些表,以及存储在 GCS 存储分区中的数据。

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. 清理

按照相应步骤在完成 Codelab 后清理资源。

  1. 删除 Managed Kafka 集群、发布者 GCE 虚拟机和 Dataproc 集群。
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. 删除 VPC 子网和网络。
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. 如果您不再想使用数据,请删除 GCS 存储分区。
gcloud storage rm --recursive $BUCKET

9. 恭喜

恭喜!您已成功使用 Managed Kafka 和 Dataproc 构建了一个 IoT 数据处理流水线,该流水线可帮助 DemoIOT Solutions 实时了解其设备发布的数据!

您创建了一个 Managed Kafka 集群,向其发布了 IoT 事件,并运行了一个 Dataproc 作业,该作业使用 Spark Streaming 实时处理这些事件。现在,您已了解使用 Managed Kafka 和 Dataproc 创建数据流水线所需的主要步骤。

参考文档