Pemrosesan Data IoT Real-Time menggunakan Dataproc dan Layanan Terkelola Google untuk Apache Kafka

1. Pengantar

2efacab8643a653b.png

Terakhir Diperbarui: 10-06-2024

Latar belakang

Perangkat Internet of Things (IoT), mulai dari solusi smart home hingga sensor industri, menghasilkan data dalam jumlah besar di edge jaringan. Data ini sangat berharga untuk berbagai kasus penggunaan, seperti pemantauan, pelacakan, diagnostik, pengawasan, personalisasi, pengoptimalan armada, dan banyak lagi. Google Managed Service for Apache Kafka menawarkan cara yang skalabel dan andal untuk menyerap dan menyimpan aliran data berkelanjutan ini dengan cara yang kompatibel dengan OSS, mudah digunakan, dan aman, sementara Google Cloud Dataproc memungkinkan pemrosesan set data besar ini untuk analisis data menggunakan cluster Apache Spark dan Hadoop.

Yang akan Anda bangun

Dalam codelab ini, Anda akan membangun pipeline Pemrosesan Data IoT menggunakan Google Managed Service untuk Apache Kafka, Dataproc, Python, dan Apache Spark yang melakukan analisis real-time. Pipeline Anda akan:

  • Memublikasikan data dari perangkat IoT ke cluster Kafka terkelola menggunakan VM GCE
  • Mengalirkan data dari cluster Kafka Terkelola ke cluster Dataproc
  • Memproses data menggunakan tugas Spark Streaming Dataproc

Yang akan Anda pelajari

  • Cara membuat cluster Dataproc dan Google Managed Kafka
  • Cara menjalankan tugas streaming menggunakan Dataproc

Yang Anda butuhkan

2. Ringkasan

Untuk codelab ini, mari kita ikuti kisah perusahaan dummy, DemoIOT Solutions. DemoIOT Solutions menyediakan perangkat sensor yang mengukur dan mengirimkan data untuk suhu, kelembapan, tekanan, tingkat cahaya, dan lokasi. Mereka ingin menyiapkan pipeline yang memproses data ini untuk menampilkan statistik real-time kepada pelanggan mereka. Dengan menggunakan pipeline tersebut, mereka dapat menyediakan berbagai layanan kepada pelanggan, seperti pemantauan, saran otomatis, pemberitahuan, dan insight tentang tempat pelanggan memasang sensor mereka.

Untuk melakukannya, kita akan menggunakan VM GCE untuk menyimulasikan perangkat IoT. Perangkat akan memublikasikan data ke topik Kafka di cluster Kafka Terkelola Google, yang akan dibaca dan diproses oleh tugas streaming Dataproc. Penyiapan Prasyarat dan halaman berikutnya akan memandu Anda melakukan semua langkah ini.

Penyiapan prasyarat

  1. Temukan nama project dan nomor project untuk project Anda. Lihat Menemukan nama, nomor, dan ID project sebagai referensi.
  2. Subjaringan VPC. Tindakan ini akan memungkinkan konektivitas antara VM GCE, cluster Kafka, dan cluster Dataproc. Ikuti langkah-langkah ini untuk membuat daftar subnet yang ada menggunakan gcloud CLI. Jika diperlukan, ikuti membuat jaringan VPC mode otomatis yang akan membuat jaringan VPC dengan subnetwork di setiap region Google Cloud. Namun, untuk tujuan codelab ini, kita hanya akan menggunakan subnetwork dari satu region.
  • Di subnetwork ini, pastikan ada aturan Firewall yang mengizinkan semua ingress dari tcp:22, yang diperlukan untuk SSH. Aturan ini akan tersedia untuk dipilih di bagian Aturan firewall saat membuat jaringan, jadi pastikan Anda memilihnya.
  1. Bucket GCS. Anda akan memerlukan akses ke bucket penyimpanan Google Cloud untuk menyimpan resource tugas Dataproc dan mempertahankan data yang diproses. Jika Anda belum memilikinya, Anda dapat membuatnya di project GCP Anda.

Isi variabel lingkungan

Di terminal tempat Anda menjalankan gcloud CLI, isi variabel lingkungan ini agar dapat digunakan nanti.

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

Ganti kode berikut:

  • <project-id> dengan nama project GCP yang Anda siapkan.
  • <project-number> dengan nama nomor project dari langkah Prasyarat 1.
  • <region> dengan nama region dari Region dan zona yang tersedia yang ingin Anda gunakan. Misalnya, kita dapat menggunakan us-central1.
  • <zone> dengan nama zona dari Available regions and zones di bagian region yang sebelumnya Anda pilih. Misalnya, jika Anda memilih us-central1 sebagai region, Anda dapat menggunakan us-central1-f sebagai zona. Zona ini akan digunakan untuk membuat VM GCE yang menyimulasikan perangkat IoT. Pastikan zona Anda berada di region yang telah Anda pilih.
  • <subnet-path> dengan jalur lengkap subnet dari Langkah prasyarat 2. Nilai ini harus dalam format: projects/<project-id>/regions/<region>/subnetworks/<subnet-name>.
  • <bucket-name> dengan nama bucket GCS dari langkah Prasyarat 3.

3. Menyiapkan Google Managed Kafka

Bagian ini menyiapkan cluster Google Managed Kafka, yang men-deploy server Kafka, dan membuat topik di server ini tempat data IoT dapat dipublikasikan dan dibaca setelah berlangganan. DemoIOT Solutions dapat menyiapkan cluster ini sehingga semua perangkatnya memublikasikan data ke cluster ini.

Membuat cluster Kafka Terkelola

  • Buat cluster Managed Kafka. Di sini, nama cluster adalah kafka-iot.
gcloud managed-kafka clusters create kafka-iot \
    --project=$PROJECT_ID \
    --location=$REGION \
    --cpu=3 \
    --memory=12GiB \
    --subnets=$SUBNET_PATH \
    --auto-rebalance

Anda akan mendapatkan respons seperti berikut:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                         
Created cluster [kafka-iot].

Pembuatan cluster memerlukan waktu 20-30 menit. Tunggu hingga operasi ini selesai.

Membuat Topik

  • Buat topik Managed Kafka di cluster. Di sini, nama topiknya adalah kafka-iot-topic.
gcloud managed-kafka topics create kafka-iot-topic \
    --cluster=kafka-iot \
    --location=$REGION \
    --partitions=10 \
    --replication-factor=3

Anda akan melihat output yang mirip dengan berikut ini:

Created topic [kafka-iot-topic].

4. Menyiapkan Penayang

Untuk memublikasikan ke cluster Managed Kafka, kita menyiapkan instance VM Google Compute Engine yang dapat mengakses VPC yang berisi subnet yang digunakan oleh cluster Managed Kafka. VM ini menyimulasikan perangkat sensor yang disediakan oleh DemoIOT Solutions.

Langkah

  1. Buat instance VM Google Compute Engine. Di sini, nama VM GCE adalah publisher-instance.
gcloud compute instances create publisher-instance \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --subnet=$SUBNET_PATH \
    --zone=$ZONE
  1. Beri akun layanan default Google Compute Engine izin untuk menggunakan Managed Service for Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. Gunakan SSH untuk terhubung ke VM. Atau, gunakan Konsol Google Cloud untuk SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. Instal Java untuk menjalankan alat command line Kafka, dan download biner Kafka menggunakan perintah ini.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. Download library autentikasi Managed Kafka dan dependensinya, lalu konfigurasi properti klien Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

Untuk mengetahui detail selengkapnya tentang penyiapan mesin penayang, lihat Menyiapkan mesin klien.

5. Memublikasikan ke Managed Kafka

Setelah publisher disiapkan, kita dapat menggunakan command line Kafka di publisher untuk memublikasikan beberapa data dummy dari VM GCE (mensimulasikan perangkat IoT oleh DemoIOT Solutions) ke cluster Kafka Terkelola.

  1. Karena telah menggunakan SSH ke instance VM GCE, kita perlu mengisi ulang variabel PROJECT_ID:
export PROJECT_ID=<project-id>
export REGION=<region>

Ganti kode berikut:

  • <project-id> dengan nama project GCP yang Anda siapkan.
  • <region> dengan region tempat cluster Kafka dibuat
  1. Gunakan perintah managed-kafka clusters describe untuk mendapatkan alamat IP server bootstrap Kafka. Alamat ini dapat digunakan untuk terhubung ke cluster Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Mencantumkan topik dalam cluster:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

Anda akan dapat melihat output berikut, yang berisi topik kafka-iot-topic yang kita buat sebelumnya.

__remote_log_metadata
kafka-iot-topic
  1. Salin dan tempel skrip ini ke dalam file baru publish_iot_data.sh. Untuk membuat file baru di VM GCE, Anda dapat menggunakan alat seperti vim atau nano.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

Explanation

  • Skrip ini membuat pesan JSON dengan simulasi pembacaan sensor yang memiliki ID perangkat, stempel waktu, data sensor (suhu, kelembapan, tekanan, cahaya), informasi lokasi (lintang, bujur), status perangkat (baterai, sinyal, jenis koneksi), dan beberapa metadata.
  • Alat ini menghasilkan aliran pesan berkelanjutan dari sejumlah perangkat unik, yang masing-masing mengirimkan data pada interval waktu tertentu, meniru perangkat IoT di dunia nyata. Di sini, kita memublikasikan data dari 10 perangkat yang menghasilkan 20 pembacaan masing-masing, pada interval waktu 10 detik.
  • Aplikasi ini juga memublikasikan semua data yang dihasilkan ke topik Kafka menggunakan alat command line produsen Kafka.
  1. Instal beberapa dependensi yang digunakan oleh skrip - paket bc untuk perhitungan matematika dan paket jq untuk pemrosesan JSON.
sudo apt-get install bc jq
  1. Ubah skrip menjadi yang dapat dieksekusi dan jalankan skrip. Proses ini akan memakan waktu sekitar 2 menit.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

Anda dapat memeriksa apakah peristiwa berhasil dipublikasikan dengan menjalankan perintah ini yang akan mencetak semua peristiwa. Tekan <control-c> untuk keluar.

kafka-console-consumer.sh \
    --topic kafka-iot-topic \
    --from-beginning \
    --bootstrap-server $BOOTSTRAP \
    --consumer.config client.properties

6. Menyiapkan Cluster Dataproc

Bagian ini membuat cluster Dataproc di subnetwork VPC tempat cluster Managed Kafka berada. Cluster ini akan digunakan untuk menjalankan tugas yang menghasilkan statistik dan insight real-time yang diperlukan oleh Solusi DemoIOT.

  1. Membuat cluster Dataproc. Di sini, nama cluster adalah dataproc-iot.
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

Anda akan mendapatkan respons seperti berikut:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

Pembuatan cluster dapat memakan waktu 10-15 menit. Tunggu hingga operasi ini berhasil diselesaikan dan periksa apakah cluster berada dalam status RUNNING dengan mendeskripsikan cluster.

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. Memproses pesan Kafka menggunakan Dataproc

Di bagian terakhir ini, Anda akan mengirimkan tugas Dataproc yang memproses pesan yang dipublikasikan menggunakan Spark Streaming. Job ini sebenarnya menghasilkan beberapa statistik dan insight real time yang dapat digunakan oleh DemoIOT Solutions.

  1. Jalankan perintah ini untuk membuat file tugas PySpark streaming bernama process_iot.py secara lokal.
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
    StructField("device_id", StringType()),
    StructField("timestamp", StringType()),
    StructField(
        "location",
        StructType([
            StructField("latitude", FloatType()),
            StructField("longitude", FloatType()),
        ]),
    ),
    StructField(
        "sensor_data",
        StructType([
            StructField("temperature", FloatType()),
            StructField("humidity", FloatType()),
            StructField("pressure", FloatType()),
            StructField("light_level", IntegerType()),
        ]),
    ),
    StructField(
        "device_status",
        StructType([
            StructField("battery_level", IntegerType()),
            StructField("signal_strength", IntegerType()),
            StructField("connection_type", StringType()),
        ]),
    ),
    StructField(
        "metadata",
        StructType([
            StructField("sensor_type", StringType()),
            StructField("unit_temperature", StringType()),
            StructField("unit_humidity", StringType()),
            StructField("unit_pressure", StringType()),
            StructField("unit_light_level", StringType()),
            StructField("firmware_version", StringType()),
        ]),
    ),
])
CLIENT_PROPERTY_KEYS = [
    "security.protocol",
    "sasl.mechanism",
    "sasl.login.callback.handler.class",
    "sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
    # Parse client properties file
    parsed_path = urlparse(client_properties_path)
    if parsed_path.scheme != "gs":
        raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

    bucket_name = parsed_path.netloc
    blob_name = parsed_path.path.lstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    file_content = "[DEFAULT]\n" + blob.download_as_text()

    config = configparser.ConfigParser()
    config.read_string(file_content)

    client_properties = dict()
    for key in CLIENT_PROPERTY_KEYS:
        client_properties[key] = config.get("DEFAULT", key)

    print(f"Client properties: {client_properties}")
    return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
    print("Starting initial data processing...")
    initial_rows = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_server_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", "kafka-iot-topic")
        .option("kafka.security.protocol", client_properties["security.protocol"])
        .option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
        .option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
        .option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
        .load()
    )

    initial_rows = (
        initial_rows.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), JSON_SCHEMA).alias("data"))
        .select("data.*")
    )
    
    # Print first 20 rows
    def print_top_rows(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
            batch_df.limit(20).show(truncate=False)

    initial_query_print = initial_rows.writeStream \
        .foreachBatch(print_top_rows) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_print)

    # Calculate and print average temperatures
    def process_initial_avg_temp(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
            current_averages = (
                batch_df.select("device_id", "sensor_data.temperature")
                .groupBy("device_id")
                .agg(avg("temperature").alias("average_temperature"))
                .orderBy("device_id")
            )
            current_averages.show(truncate=False)

    initial_query_avg_temp = initial_rows.writeStream \
        .foreachBatch(process_initial_avg_temp) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_avg_temp)
    
    # Write data to GCS
    initial_data_gcs_writer = (
        initial_rows.writeStream.outputMode("append")
        .format("avro")
        .option("path", store_data_gcs_path+"/tables/iot_avro/")
        .option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
        .trigger(once=True) \
        .start()
    )
    queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

    client_properties = get_client_properties(client_properties_path)

    # Create SparkSession
    spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
    
    queries_to_await = []

    process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
    
    # Wait for all queries to terminate
    for query in queries_to_await:
        try:
            query.awaitTermination()
        except Exception as e:
            print(f"Error awaiting query: {e}")
        finally:
            query.stop()

    spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Invalid number of arguments passed ", len(sys.argv))
        print(
            "Usage: ",
            sys.argv[0],
            " <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
        )
        raise ValueError("Invalid number of arguments passed.")

    parsed_data_path = urlparse(sys.argv[3])
    if parsed_data_path.scheme != "gs":
        raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


    main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

Explanation

  • Kode ini menyiapkan tugas PySpark Structured Streaming untuk membaca data dari topik Kafka yang ditentukan. Aplikasi ini menggunakan alamat bootstrap server Kafka yang diberikan dan konfigurasi Kafka yang dimuat dari file konfigurasi GCS untuk terhubung dan melakukan autentikasi dengan broker Kafka.
  • Pertama-tama, data mentah dibaca dari Kafka sebagai aliran array byte, lalu array byte tersebut di-casting ke string, dan json_schema diterapkan menggunakan StructType Spark untuk menentukan struktur data (ID perangkat, stempel waktu, lokasi, data sensor, dll.).
  • Skrip ini mencetak 10 baris pertama ke konsol untuk pratinjau, menghitung suhu rata-rata per sensor, dan menulis semua data ke bucket GCS dalam format avro. Avro adalah sistem serialisasi data berbasis baris yang secara efisien menyimpan data terstruktur dalam format biner yang ringkas dan ditentukan skemanya, sehingga menawarkan evolusi skema, netralitas bahasa, dan kompresi tinggi untuk pemrosesan data skala besar.
  1. Buat file client.properties dan isi variabel lingkungan untuk alamat bootstrap server Kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Upload file process_iot.py dan client.properties ke bucket Google Cloud Storage Anda, sehingga file tersebut dapat digunakan oleh tugas Dataproc.
gsutil cp process_iot.py client.properties $BUCKET
  1. Salin beberapa jar dependensi untuk tugas Dataproc ke bucket GCS Anda. Direktori ini berisi JAR yang diperlukan untuk menjalankan tugas Spark Streaming dengan Kafka, serta library autentikasi Managed Kafka dan dependensinya, yang diambil dari Menyiapkan mesin klien.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. Kirimkan tugas Spark ke cluster Dataproc.
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

Log driver Spark akan dicetak. Anda juga akan dapat melihat tabel ini dicatat ke konsol dan data yang disimpan di bucket GCS Anda.

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. Pembersihan

Ikuti langkah-langkah untuk membersihkan resource setelah menyelesaikan codelab.

  1. Hapus cluster Managed Kafka, VM GCE Publisher, dan cluster Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. Hapus subnetwork dan jaringan VPC Anda.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. Hapus bucket GCS Anda jika Anda tidak ingin lagi menggunakan data tersebut.
gcloud storage rm --recursive $BUCKET

9. Selamat

Selamat, Anda telah berhasil membangun pipeline pemrosesan data IoT dengan Managed Kafka dan Dataproc yang membantu DemoIOT Solutions mendapatkan insight real-time tentang data yang dipublikasikan oleh perangkat mereka.

Anda telah membuat cluster Managed Kafka, memublikasikan peristiwa IoT ke cluster tersebut, dan menjalankan tugas Dataproc yang menggunakan streaming Spark untuk memproses peristiwa ini secara real time. Sekarang Anda telah mengetahui langkah-langkah penting yang diperlukan untuk membuat pipeline data menggunakan Managed Kafka dan Dataproc.

Dokumen referensi