Dataproc と Google Managed Service for Apache Kafka を使用したリアルタイム IoT データ処理

Dataproc と Google Managed Service for Apache Kafka を使用したリアルタイム IoT データ処理

この Codelab について

subject最終更新: 6月 16, 2025
account_circle作成者: Devanshi Khatsuriya

1. はじめに

2efacab8643a653b.png

最終更新日: 2024 年 6 月 10 日

背景

スマートホーム ソリューションから産業用センサーまで、モノのインターネット(IoT)デバイスはネットワークのエッジで大量のデータを生成します。このデータは、デバイスのモニタリング、追跡、診断、監視、パーソナライズ、フリート最適化など、さまざまなユースケースで非常に有用です。Google Managed Service for Apache Kafka は、この継続的なデータ ストリームを OSS と互換性があり、使いやすく、安全な方法で取り込んで保存するためのスケーラブルで耐久性のある方法を提供します。一方、Google Cloud Dataproc では、Apache Spark クラスタと Hadoop クラスタを使用して、これらの大規模なデータセットを処理してデータ分析を行うことができます。

作成するアプリの概要

この Codelab では、Google Managed Service for Apache Kafka、Dataproc、Python、Apache Spark を使用して、リアルタイム分析を行う IoT データ処理パイプラインを構築します。パイプラインは次の処理を行います。

  • GCE VM を使用して IoT デバイスから Managed Kafka クラスタにデータをパブリッシュする
  • Manage Kafka クラスタから Dataproc クラスタにデータをストリーミングする
  • Dataproc Spark Streaming ジョブを使用してデータを処理する

学習内容

  • Google Managed Kafka クラスタと Dataproc クラスタを作成する方法
  • Dataproc を使用してストリーミング ジョブを実行する方法

必要なもの

2. 概要

この Codelab では、架空の会社である DemoIOT Solutions の例を紹介します。DemoIOT Solutions は、温度、湿度、圧力、照度、位置に関するデータを測定して送信するセンサー デバイスを提供しています。このデータを処理してリアルタイムの統計情報を顧客に表示するパイプラインを設定したいと考えています。このようなパイプラインを使用すると、ユーザーがセンサーを設置した場所に関するモニタリング、自動化された候補、アラート、分析情報など、さまざまなサービスをユーザーに提供できます。

これを行うには、GCE VM を使用して IoT デバイスをシミュレートします。デバイスは、Google Managed Kafka クラスタ内の Kafka トピックにデータをパブリッシュします。このデータは、Dataproc ストリーミング ジョブによって読み取られ、処理されます。前提条件の設定と次のページでは、これらの手順をすべて説明します。

前提条件の設定

  1. プロジェクトの名前とプロジェクト番号を確認します。詳しくは、プロジェクト名、番号、ID を確認するをご覧ください。
  2. VPC サブネットワーク。これにより、GCE VM、Kafka クラスタ、Dataproc クラスタ間の接続が可能になります。こちらの手順に沿って、gcloud CLI を使用して既存のサブネットを一覧表示します。必要に応じて、自動モードの VPC ネットワークを作成するの手順に沿って、Google Cloud リージョンごとにサブネットを持つ VPC ネットワークを作成します。ただし、この Codelab では、単一リージョンのサブネットワークのみを使用します。
  • このサブネットワークで、SSH に必要な tcp:22 からのすべての上り(内向き)を許可するファイアウォール ルールがあることを確認します。このルールは、ネットワークを作成するときに [ファイアウォール ルール] セクションで選択できるため、必ず選択してください。
  1. GCS バケット。Dataproc ジョブリソースを保存し、処理されたデータを保持するには、Google Cloud Storage バケットにアクセスする必要があります。ない場合は、GCP プロジェクトで作成できます。

環境変数に値を設定する

gcloud CLI を実行するターミナルで、これらの環境変数に値を入力して、後で使用できるようにします。

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

次のように置き換えます。

  • <project-id> は、設定した GCP プロジェクトの名前に置き換えます。
  • <project-number> は、前提条件のステップ 1 のプロジェクト番号の名前に置き換えます。
  • <region> は、利用可能なリージョンとゾーンの中から使用するリージョンの名前に置き換えます。たとえば、us-central1 を使用できます。
  • <zone> は、前に選択したリージョンの使用可能なリージョンとゾーンにあるゾーンの名前に置き換えます。たとえば、リージョンとして us-central1 を選択した場合は、ゾーンとして us-central1-f を使用できます。このゾーンは、IoT デバイスをシミュレートする GCE VM の作成に使用されます。選択したゾーンがリージョン内にあることを確認します。
  • <subnet-path> は、前提条件のステップ 2 のサブネットのフルパスに置き換えます。この値は projects/<project-id>/regions/<region>/subnetworks/<subnet-name> の形式にする必要があります。
  • <bucket-name> は、前提条件のステップ 3 の GCS バケットの名前に置き換えます。

3. Google Managed Kafka を設定する

このセクションでは、Kafka サーバーをデプロイする Google マネージド Kafka クラスタを設定し、このサーバーにトピックを作成します。このトピックでは、IoT データをサブスクライブしてパブリッシュし、読み取ることができます。DemoIOT Solutions では、すべてのデバイスがこのクラスタにデータをパブリッシュするようにこのクラスタを設定できます。

マネージド Kafka クラスタを作成する

  • マネージド Kafka クラスタを作成します。ここで、クラスタの名前は kafka-iot です。
gcloud managed-kafka clusters create kafka-iot \
   
--project=$PROJECT_ID \
   
--location=$REGION \
   
--cpu=3 \
   
--memory=12GiB \
   
--subnets=$SUBNET_PATH \
   
--auto-rebalance

次のようなレスポンスが返されます。

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                        
Created cluster [kafka-iot].

クラスタの作成には 20 ~ 30 分かかります。このオペレーションが完了するまで待ちます。

トピックを作成する

  • クラスタにマネージド Kafka トピックを作成します。ここで、トピックの名前は kafka-iot-topic です。
gcloud managed-kafka topics create kafka-iot-topic \
   
--cluster=kafka-iot \
   
--location=$REGION \
   
--partitions=10 \
   
--replication-factor=3

出力は次のようになります。

Created topic [kafka-iot-topic].

4. パブリッシャーを設定する

マネージド Kafka クラスタにパブリッシュするには、マネージド Kafka クラスタで使用されるサブネットを含む VPC にアクセスできる Google Compute Engine VM インスタンスを設定します。この VM は、DemoIOT Solutions が提供するセンサーデバイスをシミュレートします。

手順

  1. Google Compute Engine VM インスタンスを作成します。ここで、GCE VM の名前は publisher-instance です。
gcloud compute instances create publisher-instance \
   
--scopes=https://www.googleapis.com/auth/cloud-platform \
   
--subnet=$SUBNET_PATH \
   
--zone=$ZONE
  1. Google Compute Engine のデフォルトのサービス アカウントに、Managed Service for Apache Kafka を使用する権限を付与します。
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. SSH を使用して VM に接続します。または、Google Cloud コンソールを使用して SSH 接続します。
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. Kafka コマンドライン ツールを実行するように Java をインストールし、次のコマンドを使用して Kafka バイナリをダウンロードします。
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. マネージド Kafka 認証ライブラリとその依存関係をダウンロードし、Kafka クライアント プロパティを構成します。
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

パブリッシャー マシンのセットアップの詳細については、クライアント マシンを設定するをご覧ください。

5. Managed Kafka にパブリッシュする

パブリッシャーが設定されたので、Kafka コマンドラインを使用して、GCE VM(DemoIOT Solutions による IoT デバイスをシミュレート)からマネージド Kafka クラスタにダミーデータをパブリッシュできます。

  1. GCE VM インスタンスに SSH 接続したため、PROJECT_ID 変数を再入力する必要があります。
export PROJECT_ID=<project-id>
export REGION=<region>

次のように置き換えます。

  • <project-id> は、設定した GCP プロジェクトの名前に置き換えます。
  • <region> は、Kafka クラスタが作成されたリージョンに置き換えます。
  1. managed-kafka clusters describe コマンドを使用して、Kafka ブートストラップ サーバーの IP アドレスを取得します。このアドレスを使用して Kafka クラスタに接続できます。
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. クラスタ内のトピックを一覧表示します。
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

先ほど作成したトピック kafka-iot-topic を含む次の出力が表示されます。

__remote_log_metadata
kafka
-iot-topic
  1. このスクリプトを新しいファイル publish_iot_data.sh にコピーして貼り付けます。GCE VM に新しいファイルを作成するには、vimnano などのツールを使用します。
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data
.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data
() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data
() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status
() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka
() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

説明

  • このスクリプトは、デバイス ID、タイムスタンプ、センサーデータ(温度、湿度、気圧、照度)、位置情報(緯度、経度)、デバイスのステータス(バッテリー、電波、接続タイプ)、メタデータを含むシミュレートされたセンサー測定値を含む JSON メッセージを作成します。
  • 一意のデバイスを一定数設定し、各デバイスが指定された時間間隔でデータを送信することで、実際の IoT デバイスを模倣した連続的なメッセージ フローを生成します。ここでは、10 台のデバイスからデータを公開します。各デバイスは 10 秒間隔で 20 回の測定値を生成します。
  • また、Kafka プロデューサー コマンドライン ツールを使用して、生成されたすべてのデータを Kafka トピックにパブリッシュします。
  1. スクリプトで使用される依存関係をインストールします。数学計算用の bc パッケージと JSON 処理用の jq パッケージです。
sudo apt-get install bc jq
  1. スクリプトを実行可能に変更して、スクリプトを実行します。実行には約 2 分かかります。
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

イベントが正常に公開されたことを確認するには、次のコマンドを実行してすべてのイベントを出力します。終了するには <control-c> キーを押します。

kafka-console-consumer.sh \
   
--topic kafka-iot-topic \
   
--from-beginning \
   
--bootstrap-server $BOOTSTRAP \
   
--consumer.config client.properties

6. Dataproc クラスタを設定する

このセクションでは、マネージド Kafka クラスタが存在する VPC サブネットワークに Dataproc クラスタを作成します。このクラスタは、DemoIOT ソリューションに必要なリアルタイム統計情報と分析情報を生成するジョブの実行に使用されます。

  1. Dataproc クラスタを作成します。ここで、クラスタの名前は dataproc-iot です。
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

次のようなレスポンスが返されます。

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

クラスタの作成には 10 ~ 15 分かかることがあります。このオペレーションが正常に完了するまで待機し、クラスタの説明を取得して、クラスタが RUNNING 状態であることを確認します。

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. Dataproc を使用して Kafka メッセージを処理する

この最後のセクションでは、Spark Streaming を使用して公開されたメッセージを処理する Dataproc ジョブを送信します。このジョブは、DemoIOT ソリューションで使用できるリアルタイムの統計情報と分析情報を生成します。

  1. このコマンドを実行して、ローカルに process_iot.py という名前のストリーミング PySpark ジョブファイルを作成します。
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
   
StructField("device_id", StringType()),
   
StructField("timestamp", StringType()),
   
StructField(
       
"location",
       
StructType([
           
StructField("latitude", FloatType()),
           
StructField("longitude", FloatType()),
       
]),
   
),
   
StructField(
       
"sensor_data",
       
StructType([
           
StructField("temperature", FloatType()),
           
StructField("humidity", FloatType()),
           
StructField("pressure", FloatType()),
           
StructField("light_level", IntegerType()),
       
]),
   
),
   
StructField(
       
"device_status",
       
StructType([
           
StructField("battery_level", IntegerType()),
           
StructField("signal_strength", IntegerType()),
           
StructField("connection_type", StringType()),
       
]),
   
),
   
StructField(
       
"metadata",
       
StructType([
           
StructField("sensor_type", StringType()),
           
StructField("unit_temperature", StringType()),
           
StructField("unit_humidity", StringType()),
           
StructField("unit_pressure", StringType()),
           
StructField("unit_light_level", StringType()),
           
StructField("firmware_version", StringType()),
       
]),
   
),
])
CLIENT_PROPERTY_KEYS = [
   
"security.protocol",
   
"sasl.mechanism",
   
"sasl.login.callback.handler.class",
   
"sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
   
# Parse client properties file
   
parsed_path = urlparse(client_properties_path)
   
if parsed_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

   
bucket_name = parsed_path.netloc
   
blob_name = parsed_path.path.lstrip("/")

   
client = storage.Client()
   
bucket = client.bucket(bucket_name)
   
blob = bucket.blob(blob_name)
   
file_content = "[DEFAULT]\n" + blob.download_as_text()

   
config = configparser.ConfigParser()
   
config.read_string(file_content)

   
client_properties = dict()
   
for key in CLIENT_PROPERTY_KEYS:
       
client_properties[key] = config.get("DEFAULT", key)

   
print(f"Client properties: {client_properties}")
   
return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
   
print("Starting initial data processing...")
   
initial_rows = (
       
spark.readStream.format("kafka")
       
.option("kafka.bootstrap.servers", bootstrap_server_address)
       
.option("startingOffsets", "earliest")
       
.option("subscribe", "kafka-iot-topic")
       
.option("kafka.security.protocol", client_properties["security.protocol"])
       
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
       
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
       
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
       
.load()
   
)

   
initial_rows = (
       
initial_rows.selectExpr("CAST(value AS STRING)")
       
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
       
.select("data.*")
   
)
   
   
# Print first 20 rows
   
def print_top_rows(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
           
batch_df.limit(20).show(truncate=False)

   
initial_query_print = initial_rows.writeStream \
       
.foreachBatch(print_top_rows) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_print)

   
# Calculate and print average temperatures
   
def process_initial_avg_temp(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
           
current_averages = (
               
batch_df.select("device_id", "sensor_data.temperature")
               
.groupBy("device_id")
               
.agg(avg("temperature").alias("average_temperature"))
               
.orderBy("device_id")
           
)
           
current_averages.show(truncate=False)

   
initial_query_avg_temp = initial_rows.writeStream \
       
.foreachBatch(process_initial_avg_temp) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_avg_temp)
   
   
# Write data to GCS
   
initial_data_gcs_writer = (
       
initial_rows.writeStream.outputMode("append")
       
.format("avro")
       
.option("path", store_data_gcs_path+"/tables/iot_avro/")
       
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
       
.trigger(once=True) \
       
.start()
   
)
   
queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

   
client_properties = get_client_properties(client_properties_path)

   
# Create SparkSession
   
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
   
   
queries_to_await = []

   
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
   
   
# Wait for all queries to terminate
   
for query in queries_to_await:
       
try:
           
query.awaitTermination()
       
except Exception as e:
           
print(f"Error awaiting query: {e}")
       
finally:
           
query.stop()

   
spark.stop()

if __name__ == "__main__":
   
if len(sys.argv) < 4:
       
print("Invalid number of arguments passed ", len(sys.argv))
       
print(
           
"Usage: ",
           
sys.argv[0],
           
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
       
)
       
raise ValueError("Invalid number of arguments passed.")

   
parsed_data_path = urlparse(sys.argv[3])
   
if parsed_data_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


   
main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

説明

  • このコードは、指定された Kafka トピックからデータを読み取る PySpark Structured Streaming ジョブを設定します。提供された Kafka サーバー ブートストラップ アドレスと、GCS 構成ファイルから読み込まれた Kafka 構成を使用して、Kafka ブローカーに接続して認証を行います。
  • まず、未加工データをバイト配列のストリーミングとして Kafka から読み取り、それらのバイト配列を文字列にキャストします。次に、Spark の StructType を使用して json_schema を適用し、データの構造(デバイス ID、タイムスタンプ、位置情報、センサーデータなど)を指定します。
  • 最初の 10 行をコンソールに出力してプレビューし、センサーごとの平均温度を計算して、すべてのデータを avro 形式で GCS バケットに書き込みます。Avro は行ベースのデータ シリアル化システムで、構造化データをスキーマで定義されたコンパクトなバイナリ形式で効率的に保存します。大規模なデータ処理では、スキーマの進化、言語の非依存性、高度な圧縮を提供します。
  1. client.properties ファイルを作成し、kafka サーバーのブートストラップ アドレスの環境変数を入力します。
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. process_iot.py ファイルと client.properties ファイルを Google Cloud Storage バケットにアップロードして、Dataproc ジョブで使用できるようにします。
gsutil cp process_iot.py client.properties $BUCKET
  1. Dataproc ジョブの依存関係 JAR を GCS バケットにコピーします。このディレクトリには、Kafka で Spark Streaming ジョブを実行するために必要な JAR と、クライアントマシンを設定するで取得したマネージド Kafka 認証ライブラリとその依存関係が含まれています。
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. Spark ジョブを Dataproc クラスタに送信します。
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

Spark ドライバログが出力されます。また、これらのテーブルがコンソールにログに記録され、GCS バケットに保存されていることも確認できます。

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. クリーンアップ

Codelab の完了後にリソースをクリーンアップする手順を行います。

  1. マネージド Kafka クラスタ、パブリッシャー GCE VM、Dataproc クラスタを削除します。
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. VPC サブネットワークとネットワークを削除します。
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. データを使用しない場合は、GCS バケットを削除します。
gcloud storage rm --recursive $BUCKET

9. 完了

これで、Manage Kafka と Dataproc を使用して IoT データ処理パイプラインを正常に構築できました。このパイプラインは、デバイスによって公開されたデータに関するリアルタイム分析情報を DemoIOT Solutions が取得するのに役立ちます。

マネージド Kafka クラスタを作成し、IoT イベントを公開しました。また、Spark ストリーミングを使用してこれらのイベントをリアルタイムで処理する Dataproc ジョブを実行しました。これで、マネージド Kafka と Dataproc を使用してデータ パイプラインを作成するために必要な主な手順について理解できました。

リファレンス ドキュメント