使用 Dataproc 和 Google Managed Service for Apache Kafka 處理即時物聯網資料

1. 簡介

2efacab8643a653b.png

上次更新時間:2024 年 6 月 10 日

背景

物聯網 (IoT) 裝置 (從智慧型家居解決方案到工業感應器) 會在網路邊緣產生大量資料。這類資料在許多用途上都非常實用,例如裝置監控、追蹤、診斷、監控、個人化、車隊最佳化等等。Google Managed Service for Apache Kafka 提供可擴充且耐用的資料擷取方式,可以相容且易於使用的安全方式,擷取及儲存持續性資料串流;Google Cloud Dataproc 則可使用 Apache Spark 和 Hadoop 叢集,處理這些大型資料集以進行資料分析。

建構項目

在本程式碼研究室中,您將使用 Google Managed Service for Apache Kafka、Dataproc、Python 和 Apache Spark 建構 IoT 資料處理管道,以便進行即時分析。管道會執行以下操作:

  • 使用 GCE VM 將 IoT 裝置的資料發布至 Managed Kafka 叢集
  • 將資料從 Managed Kafka 叢集串流至 Dataproc 叢集
  • 使用 Dataproc Spark Streaming 工作處理資料

課程內容

  • 如何建立 Google 代管 Kafka 和 Dataproc 叢集
  • 如何使用 Dataproc 執行串流工作

軟硬體需求

2. 總覽

在本程式碼研究室中,我們將追蹤虛擬公司 DemoIOT Solutions 的故事。DemoIOT Solutions 提供感應器裝置,可測量及傳送溫度、濕度、壓力、光照程度和位置資料。他們希望設定資料處理管道,向客戶顯示即時統計資料。透過這類管道,他們可以為客戶提供多種服務,例如監控、自動建議、警示,以及關於客戶安裝感應器的地點的洞察資料。

為此,我們將使用 GCE VM 模擬 IoT 裝置。裝置會將資料發布至 Google 代管 Kafka 叢集中的 Kafka 主題,而 Dataproc 串流工作會讀取並處理這些資料。您可以按照「必要條件」設定和後續頁面中的說明,完成所有步驟。

必要條件設定

  1. 找出專案的名稱和專案編號。請參閱「尋找專案名稱、編號和 ID」一節。
  2. 虛擬私有雲子網路。這樣一來,GCE VM、Kafka 叢集和 Dataproc 叢集之間就能建立連線。請按照這篇文章的說明,使用 gcloud CLI 列出現有的子網路。如有需要,請按照建立自動模式 VPC 網路中的說明操作,在每個 Google Cloud 區域中建立含有子網路的 VPC 網路。不過,為了讓本程式碼研究室更容易上手,我們只會使用單一區域的子網路。
  • 請確認這個子網路有防火牆規則,允許所有來自 tcp:22 的傳入流量,因為 SSH 需要這個通訊埠。建立網路時,您可以在「防火牆規則」部分選取這項規則,因此請務必選取這項規則。
  1. GCS 值區。您需要存取 Google Cloud Storage 值區,才能儲存 Dataproc 工作資源和已處理的資料。如果沒有,您可以在 GCP 專案中建立一個。

填入環境變數

在執行 gcloud CLI 的終端機中填入這些環境變數,以便日後使用。

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

更改下列內容:

  • <project-id> 為您設定的 GCP 專案名稱。
  • <project-number> 與步驟 1 中「先決條件」的專案編號名稱。
  • <region> 與您要使用的可用地區和區域中的地區名稱。例如,我們可以使用 us-central1
  • <zone> 與所選區域下方「可用地區與區域」中的區域名稱。舉例來說,如果您選取 us-central1 做為地區,可以將 us-central1-f 設為區域。這個可用區會用來建立模擬 IoT 裝置的 GCE VM。請確認區域位於您選擇的地區。
  • <subnet-path> 包含子網路的完整路徑,請參閱先決條件步驟 2。這個值的格式必須為 projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
  • <bucket-name> 使用先決條件步驟 3 中的 GCS 值區名稱。

3. 設定 Google 代管 Kafka

本節將設定 Google 代管 Kafka 叢集,該叢集會部署 Kafka 伺服器,並在該伺服器上建立主題,讓 IoT 資料在訂閱後即可發布及讀取。DemoIOT Solutions 可以設定這個叢集,讓所有裝置將資料發布到這個叢集。

建立代管 Kafka 叢集

  • 建立 Managed Kafka 叢集。這裡的叢集名稱為 kafka-iot
gcloud managed-kafka clusters create kafka-iot \
    --project=$PROJECT_ID \
    --location=$REGION \
    --cpu=3 \
    --memory=12GiB \
    --subnets=$SUBNET_PATH \
    --auto-rebalance

您應該會收到類似以下的回應:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                         
Created cluster [kafka-iot].

叢集建立作業需要 20 到 30 分鐘。等待這項作業完成。

建立主題

  • 在叢集中建立 Managed Kafka 主題。此處的主題名稱為 kafka-iot-topic
gcloud managed-kafka topics create kafka-iot-topic \
    --cluster=kafka-iot \
    --location=$REGION \
    --partitions=10 \
    --replication-factor=3

您應該會看到類似以下的輸出內容:

Created topic [kafka-iot-topic].

4. 設定發布商

為了將資料發布至 Managed Kafka 叢集,我們設定了 Google Compute Engine VM 執行個體,讓它能夠存取 Managed Kafka 叢集使用的 VPC 內的子網路。這個 VM 會模擬 DemoIOT Solutions 提供的感應器裝置。

步驟

  1. 建立 Google Compute Engine VM 執行個體。此處的 GCE VM 名稱為 publisher-instance
gcloud compute instances create publisher-instance \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --subnet=$SUBNET_PATH \
    --zone=$ZONE
  1. 授予 Google Compute Engine 預設服務帳戶使用 Managed Service for Apache Kafka 的權限。
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. 使用 SSH 連線至 VM。或者,您也可以使用 Google Cloud 控制台進行 SSH 連線
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. 安裝 Java 以執行 Kafka 指令列工具,並使用下列指令下載 Kafka 二進位檔。
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. 下載代管 Kafka 驗證程式庫及其依附元件,並設定 Kafka 用戶端屬性。
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

如要進一步瞭解發布端電腦設定,請參閱「設定用戶端電腦」。

5. 發布至代管 Kafka

發布端設定完成後,我們可以使用 Kafka 指令列,將一些虛擬資料從 GCE VM (透過 DemoIOT Solutions 模擬 IoT 裝置) 發布至 Managed Kafka 叢集。

  1. 由於我們已透過 SSH 連線至 GCE VM 執行個體,因此需要重新填入 PROJECT_ID 變數:
export PROJECT_ID=<project-id>
export REGION=<region>

更改下列內容:

  • <project-id> 為您設定的 GCP 專案名稱。
  • <region> 改成 Kafka 叢集建立的地區
  1. 使用 managed-kafka clusters describe 指令取得 Kafka 啟動伺服器的 IP 位址。這個位址可用於連線至 Kafka 叢集。
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. 列出叢集中的主題:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

您應該會看到下列輸出內容,其中包含我們先前建立的主題 kafka-iot-topic

__remote_log_metadata
kafka-iot-topic
  1. 複製並貼上這個指令碼到新檔案 publish_iot_data.sh 中。如要在 GCE VM 上建立新檔案,您可以使用 vimnano 等工具。
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

說明

  • 這個指令碼會建立 JSON 訊息,其中包含模擬的感應器讀數,包括裝置 ID、時間戳記、感應器資料 (溫度、濕度、壓力、光線)、位置資訊 (緯度、經度)、裝置狀態 (電池、訊號、連線類型) 和一些中繼資料。
  • 它會模擬實際的物聯網裝置,從特定數量的獨特裝置產生連續的訊息流,每個裝置都會在指定的時間間隔傳送資料。在此,我們會發布 10 部裝置的資料,每部裝置會產生 20 次讀數,且每 10 秒產生一次。
  • 並使用 Kafka 產生器指令列工具,將所有產生的資料發布至 Kafka 主題。
  1. 安裝指令碼使用的部分依附元件:bc 套件用於數學運算,jq 套件用於 JSON 處理。
sudo apt-get install bc jq
  1. 修改指令碼,使其成為可執行檔,然後執行指令碼。執行這項作業大約需要 2 分鐘。
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

您可以執行這項指令,查看所有事件是否已成功發布。按下 <control-c> 即可退出。

kafka-console-consumer.sh \
    --topic kafka-iot-topic \
    --from-beginning \
    --bootstrap-server $BOOTSTRAP \
    --consumer.config client.properties

6. 設定 Dataproc 叢集

本節會在 Managed Kafka 叢集所在的 VPC 子網路中,建立 Dataproc 叢集。這個叢集將用於執行工作,產生 DemoIOT 解決方案所需的即時統計資料和洞察資料。

  1. 建立 Dataproc 叢集。這裡的叢集名稱為 dataproc-iot
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

您應該會收到類似以下的回應:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

叢集建立作業可能需要 10 到 15 分鐘才能完成。等待這項作業順利完成,然後描述叢集,確認叢集處於 RUNNING 狀態。

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. 使用 Dataproc 處理 Kafka 訊息

在最後一節中,您將提交 Dataproc 工作,以便使用 Spark Streaming 處理已發布的訊息。這個工作實際上會產生一些可供 DemoIOT 解決方案使用的即時統計資料和洞察資料。

  1. 執行這項指令,在本機建立名為 process_iot.py 的串流 PySpark 工作檔案。
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
    StructField("device_id", StringType()),
    StructField("timestamp", StringType()),
    StructField(
        "location",
        StructType([
            StructField("latitude", FloatType()),
            StructField("longitude", FloatType()),
        ]),
    ),
    StructField(
        "sensor_data",
        StructType([
            StructField("temperature", FloatType()),
            StructField("humidity", FloatType()),
            StructField("pressure", FloatType()),
            StructField("light_level", IntegerType()),
        ]),
    ),
    StructField(
        "device_status",
        StructType([
            StructField("battery_level", IntegerType()),
            StructField("signal_strength", IntegerType()),
            StructField("connection_type", StringType()),
        ]),
    ),
    StructField(
        "metadata",
        StructType([
            StructField("sensor_type", StringType()),
            StructField("unit_temperature", StringType()),
            StructField("unit_humidity", StringType()),
            StructField("unit_pressure", StringType()),
            StructField("unit_light_level", StringType()),
            StructField("firmware_version", StringType()),
        ]),
    ),
])
CLIENT_PROPERTY_KEYS = [
    "security.protocol",
    "sasl.mechanism",
    "sasl.login.callback.handler.class",
    "sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
    # Parse client properties file
    parsed_path = urlparse(client_properties_path)
    if parsed_path.scheme != "gs":
        raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

    bucket_name = parsed_path.netloc
    blob_name = parsed_path.path.lstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    file_content = "[DEFAULT]\n" + blob.download_as_text()

    config = configparser.ConfigParser()
    config.read_string(file_content)

    client_properties = dict()
    for key in CLIENT_PROPERTY_KEYS:
        client_properties[key] = config.get("DEFAULT", key)

    print(f"Client properties: {client_properties}")
    return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
    print("Starting initial data processing...")
    initial_rows = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_server_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", "kafka-iot-topic")
        .option("kafka.security.protocol", client_properties["security.protocol"])
        .option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
        .option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
        .option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
        .load()
    )

    initial_rows = (
        initial_rows.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), JSON_SCHEMA).alias("data"))
        .select("data.*")
    )
    
    # Print first 20 rows
    def print_top_rows(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
            batch_df.limit(20).show(truncate=False)

    initial_query_print = initial_rows.writeStream \
        .foreachBatch(print_top_rows) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_print)

    # Calculate and print average temperatures
    def process_initial_avg_temp(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
            current_averages = (
                batch_df.select("device_id", "sensor_data.temperature")
                .groupBy("device_id")
                .agg(avg("temperature").alias("average_temperature"))
                .orderBy("device_id")
            )
            current_averages.show(truncate=False)

    initial_query_avg_temp = initial_rows.writeStream \
        .foreachBatch(process_initial_avg_temp) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_avg_temp)
    
    # Write data to GCS
    initial_data_gcs_writer = (
        initial_rows.writeStream.outputMode("append")
        .format("avro")
        .option("path", store_data_gcs_path+"/tables/iot_avro/")
        .option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
        .trigger(once=True) \
        .start()
    )
    queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

    client_properties = get_client_properties(client_properties_path)

    # Create SparkSession
    spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
    
    queries_to_await = []

    process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
    
    # Wait for all queries to terminate
    for query in queries_to_await:
        try:
            query.awaitTermination()
        except Exception as e:
            print(f"Error awaiting query: {e}")
        finally:
            query.stop()

    spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Invalid number of arguments passed ", len(sys.argv))
        print(
            "Usage: ",
            sys.argv[0],
            " <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
        )
        raise ValueError("Invalid number of arguments passed.")

    parsed_data_path = urlparse(sys.argv[3])
    if parsed_data_path.scheme != "gs":
        raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


    main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

說明

  • 這段程式碼會設定 PySpark 結構化串流工作,以便讀取指定 Kafka 主題中的資料。它會使用提供的 Kafka 伺服器啟動條件位址,以及從 GCS 設定檔載入的 Kafka 設定,與 Kafka 仲介器建立連線並驗證。
  • 它會先將 Kafka 中的原始資料讀取為位元組陣列串流,然後將這些位元組陣列轉換為字串,並使用 Spark 的 StructType 套用 json_schema,指定資料結構 (裝置 ID、時間戳記、位置、感應器資料等)。
  • 它會將前 10 列資料輸出至控制台供預覽,並計算每個感應器的平均溫度,然後以 avro 格式將所有資料寫入 GCS 值區。Avro 是一種以列為基礎的資料序列化系統,可有效率地以結構定義的二進位格式儲存結構化資料,提供結構定義演進、語言中立性和高壓縮率,以便進行大規模資料處理作業。
  1. 建立 client.properties 檔案,並為 Kafka 伺服器的引導程序位址填入環境變數。
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. process_iot.pyclient.properties 檔案上傳至 Google Cloud Storage 值區,以便 Dataproc 工作使用。
gsutil cp process_iot.py client.properties $BUCKET
  1. 將 Dataproc 工作的部分依附元件 JAR 複製到 GCS 值區。這個目錄包含執行 Spark Streaming 工作時,與 Kafka 搭配使用的 JAR 檔案,以及從設定用戶端機器取得的 Managed Kafka 驗證程式庫及其依附元件。
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. 將 Spark 工作提交至 Dataproc 叢集。
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

系統會列印 Spark 驅動程式記錄。您應該也能看到這些表格記錄到控制台,以及儲存在 GCS 值區中的資料。

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. 清理

完成程式碼研究室後,請按照步驟清理資源。

  1. 刪除代管 Kafka 叢集、發布者 GCE VM 和 Dataproc 叢集。
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. 刪除 VPC 子網路和網路。
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. 如果不再需要使用資料,請刪除 GCS 值區。
gcloud storage rm --recursive $BUCKET

9. 恭喜

恭喜!您已成功使用 Manage Kafka 和 Dataproc 建構 IoT 資料處理管道,協助 DemoIOT Solutions 取得裝置發布的即時資料洞察!

您建立了 Managed Kafka 叢集,並將 IoT 事件發布至該叢集,然後執行 Dataproc 工作,使用 Spark 串流即時處理這些事件。您現在已瞭解使用 Managed Kafka 和 Dataproc 建立資料管道時,必須採取哪些重要步驟。

參考文件