Pemrosesan Data IoT Real-Time menggunakan Dataproc dan Layanan Terkelola Google untuk Apache Kafka

Pemrosesan Data IoT Real-Time menggunakan Dataproc dan Layanan Terkelola Google untuk Apache Kafka

Tentang codelab ini

subjectTerakhir diperbarui Jun 16, 2025
account_circleDitulis oleh Devanshi Khatsuriya

1. Pengantar

2efacab8643a653b.png

Terakhir Diperbarui: 10-06-2024

Latar belakang

Perangkat Internet of Things (IoT), mulai dari solusi smart home hingga sensor industri, menghasilkan data dalam jumlah besar di edge jaringan. Data ini sangat berharga untuk berbagai kasus penggunaan, seperti pemantauan perangkat, pelacakan, diagnostik, pengawasan, personalisasi, pengoptimalan armada, dan banyak lagi. Google Managed Service for Apache Kafka menawarkan cara yang skalabel dan andal untuk menyerap dan menyimpan aliran data yang berkelanjutan ini dengan cara yang kompatibel dengan OSS, mudah digunakan, dan aman, sementara Google Cloud Dataproc memungkinkan pemrosesan set data besar ini untuk analisis data menggunakan cluster Apache Spark dan Hadoop.

Yang akan Anda build

Dalam codelab ini, Anda akan membuat pipeline Pemrosesan Data IoT menggunakan Google Managed Service for Apache Kafka, Dataproc, Python, dan Apache Spark yang melakukan analisis real-time. Pipeline Anda akan:

  • Memublikasikan data dari perangkat IoT ke cluster Kafka Terkelola menggunakan VM GCE
  • Menstreaming data dari cluster Manage Kafka ke cluster Dataproc
  • Memproses data menggunakan tugas Dataproc Spark Streaming

Yang akan Anda pelajari

  • Cara membuat cluster Kafka dan Dataproc yang Dikelola Google
  • Cara menjalankan tugas streaming menggunakan Dataproc

Yang Anda butuhkan

2. Ringkasan

Untuk codelab ini, mari kita ikuti kisah perusahaan fiktif, DemoIOT Solutions. DemoIOT Solutions menyediakan perangkat sensor yang mengukur dan mengirimkan data untuk suhu, kelembapan, tekanan, tingkat cahaya, dan lokasi. Mereka ingin menyiapkan pipeline yang memproses data ini untuk menampilkan statistik real-time kepada pelanggan mereka. Dengan menggunakan pipeline tersebut, mereka dapat menyediakan berbagai layanan kepada pelanggan, seperti pemantauan, saran otomatis, pemberitahuan, dan insight tentang tempat pelanggan memasang sensor mereka.

Untuk melakukannya, kita akan menggunakan VM GCE untuk menyimulasikan perangkat IoT. Perangkat akan memublikasikan data ke topik Kafka di cluster Google Managed Kafka, yang akan dibaca dan diproses oleh tugas streaming Dataproc. Penyiapan Prasyarat dan halaman berikut akan memandu Anda melakukan semua langkah ini.

Penyiapan prasyarat

  1. Temukan nama project dan nomor project untuk project Anda. Lihat Menemukan nama, nomor, dan ID project untuk referensi.
  2. Subnet VPC. Tindakan ini akan memungkinkan konektivitas antara VM GCE, cluster Kafka, dan cluster Dataproc. Ikuti langkah-langkah ini untuk mencantumkan subnet yang ada menggunakan gcloud CLI. Jika diperlukan, ikuti membuat jaringan VPC mode otomatis yang akan membuat jaringan VPC dengan subnetwork di setiap region Google Cloud. Namun, untuk tujuan codelab ini, kita hanya akan menggunakan subjaringan dari satu region.
  • Di subjaringan ini, pastikan ada aturan Firewall yang mengizinkan semua traffic masuk dari tcp:22, yang merupakan SSH yang diperlukan. Aturan ini akan tersedia untuk dipilih di bagian Aturan firewall saat membuat jaringan, jadi pastikan Anda memilihnya.
  1. Bucket GCS. Anda memerlukan akses ke bucket penyimpanan Google Cloud untuk menyimpan resource tugas Dataproc dan mempertahankan data yang diproses. Jika belum memilikinya, Anda dapat membuat project di GCP.

Mengisi variabel lingkungan

Di terminal tempat Anda menjalankan gcloud CLI, isi variabel lingkungan ini agar dapat digunakan nanti.

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

Ganti kode berikut:

  • <project-id> dengan nama project GCP yang Anda siapkan.
  • <project-number> dengan nama nomor project dari langkah 1 Prasyarat.
  • <region> dengan nama region dari Region dan zona yang tersedia yang ingin Anda gunakan. Misalnya, kita dapat menggunakan us-central1.
  • <zone> dengan nama zona dari Region dan zona yang tersedia di bagian region yang sebelumnya Anda pilih. Misalnya, jika memilih us-central1 sebagai region, Anda dapat menggunakan us-central1-f sebagai zona. Zona ini akan digunakan untuk membuat VM GCE yang menyimulasikan perangkat IoT. Pastikan zona Anda berada di region yang telah Anda pilih.
  • <subnet-path> dengan jalur lengkap subnet dari Prasyarat langkah 2. Nilai ini harus dalam format: projects/<project-id>/regions/<region>/subnetworks/<subnet-name>.
  • <bucket-name> dengan nama bucket GCS dari langkah 3 Prasyarat.

3. Menyiapkan Kafka Terkelola Google

Bagian ini menyiapkan cluster Kafka Terkelola Google, yang men-deploy server Kafka, dan membuat topik di server ini tempat data IoT dapat dipublikasikan dan dibaca setelah berlangganan. DemoIOT Solutions dapat menyiapkan cluster ini sehingga semua perangkatnya memublikasikan data ke cluster tersebut.

Membuat cluster Kafka Terkelola

  • Buat cluster Kafka Terkelola. Di sini, nama cluster adalah kafka-iot.
gcloud managed-kafka clusters create kafka-iot \
   
--project=$PROJECT_ID \
   
--location=$REGION \
   
--cpu=3 \
   
--memory=12GiB \
   
--subnets=$SUBNET_PATH \
   
--auto-rebalance

Anda akan mendapatkan respons seperti berikut:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                        
Created cluster [kafka-iot].

Pembuatan cluster memerlukan waktu 20-30 menit. Tunggu hingga operasi ini selesai.

Membuat Topik

  • Buat topik Kafka Terkelola di cluster. Di sini, nama topik adalah kafka-iot-topic.
gcloud managed-kafka topics create kafka-iot-topic \
   
--cluster=kafka-iot \
   
--location=$REGION \
   
--partitions=10 \
   
--replication-factor=3

Anda akan mendapatkan output yang mirip dengan berikut ini:

Created topic [kafka-iot-topic].

4. Menyiapkan Penayang

Untuk memublikasikan ke cluster Managed Kafka, kita menyiapkan instance VM Google Compute Engine yang dapat mengakses VPC yang berisi subnet yang digunakan oleh cluster Managed Kafka. VM ini menyimulasikan perangkat sensor yang disediakan oleh DemoIOT Solutions.

Langkah

  1. Buat instance VM Google Compute Engine. Di sini, nama VM GCE adalah publisher-instance.
gcloud compute instances create publisher-instance \
   
--scopes=https://www.googleapis.com/auth/cloud-platform \
   
--subnet=$SUBNET_PATH \
   
--zone=$ZONE
  1. Berikan izin kepada akun layanan default Google Compute Engine untuk menggunakan Managed Service for Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. Gunakan SSH untuk terhubung ke VM. Atau, gunakan Konsol Google Cloud untuk SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. Instal Java untuk menjalankan alat command line Kafka, dan download biner Kafka menggunakan perintah ini.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. Download library autentikasi Kafka Terkelola dan dependensinya, lalu konfigurasikan properti klien Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

Untuk mengetahui detail selengkapnya tentang penyiapan mesin penayang, lihat Menyiapkan mesin klien.

5. Memublikasikan ke Managed Kafka

Setelah penayang disiapkan, kita dapat menggunakan command line Kafka di dalamnya untuk memublikasikan beberapa data dummy dari VM GCE (menyimulasikan perangkat IoT oleh DemoIOT Solutions) ke cluster Kafka Terkelola.

  1. Karena telah menggunakan SSH ke instance VM GCE, kita perlu mengisi ulang variabel PROJECT_ID:
export PROJECT_ID=<project-id>
export REGION=<region>

Ganti kode berikut:

  • <project-id> dengan nama project GCP yang Anda siapkan.
  • <region> dengan region tempat cluster Kafka dibuat
  1. Gunakan perintah managed-kafka clusters describe untuk mendapatkan alamat IP server bootstrap Kafka. Alamat ini dapat digunakan untuk terhubung ke cluster Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Cantumkan topik di cluster:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

Anda akan dapat melihat output berikut, yang berisi topik kafka-iot-topic yang kita buat sebelumnya.

__remote_log_metadata
kafka
-iot-topic
  1. Salin dan tempel skrip ini ke dalam file baru publish_iot_data.sh. Untuk membuat file baru di VM GCE, Anda dapat menggunakan alat seperti vim atau nano.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data
.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data
() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data
() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status
() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka
() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

Explanation

  • Skrip ini membuat pesan JSON dengan simulasi pembacaan sensor yang memiliki ID perangkat, stempel waktu, data sensor (suhu, kelembapan, tekanan, cahaya), informasi lokasi (lintang, bujur), status perangkat (baterai, sinyal, jenis koneksi), dan beberapa metadata.
  • Alat ini menghasilkan aliran pesan yang berkelanjutan dari sejumlah perangkat unik yang ditetapkan, yang masing-masing mengirim data pada interval waktu yang ditentukan, meniru perangkat IoT di dunia nyata. Di sini, kita memublikasikan data dari 10 perangkat yang masing-masing menghasilkan 20 pembacaan, dengan interval waktu 10 detik.
  • Alat ini juga memublikasikan semua data yang dihasilkan ke topik Kafka menggunakan alat command line produsen Kafka.
  1. Instal beberapa dependensi yang digunakan oleh skrip - paket bc untuk penghitungan matematika dan paket jq untuk pemrosesan JSON.
sudo apt-get install bc jq
  1. Ubah skrip menjadi file yang dapat dieksekusi dan jalankan skrip. Proses ini memerlukan waktu sekitar 2 menit.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

Anda dapat memeriksa apakah peristiwa berhasil dipublikasikan dengan menjalankan perintah ini yang akan mencetak semua peristiwa. Tekan <control-c> untuk keluar.

kafka-console-consumer.sh \
   
--topic kafka-iot-topic \
   
--from-beginning \
   
--bootstrap-server $BOOTSTRAP \
   
--consumer.config client.properties

6. Menyiapkan Cluster Dataproc

Bagian ini membuat cluster Dataproc di subjaringan VPC tempat cluster Managed Kafka berada. Cluster ini akan digunakan untuk menjalankan tugas yang menghasilkan statistik dan insight real-time yang diperlukan oleh Solusi DemoIOT.

  1. Buat cluster Dataproc. Di sini, nama cluster adalah dataproc-iot.
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

Anda akan mendapatkan respons seperti berikut:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

Pembuatan cluster mungkin memerlukan waktu 10-15 menit. Tunggu hingga operasi ini berhasil diselesaikan dan pastikan cluster berada dalam status RUNNING dengan mendeskripsikan cluster.

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. Memproses pesan Kafka menggunakan Dataproc

Di bagian terakhir ini, Anda akan mengirimkan tugas Dataproc yang memproses pesan yang dipublikasikan menggunakan Spark Streaming. Tugas ini sebenarnya menghasilkan beberapa statistik dan insight real time yang dapat digunakan oleh DemoIOT Solutions.

  1. Jalankan perintah ini untuk membuat file tugas PySpark streaming yang disebut process_iot.py secara lokal.
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
   
StructField("device_id", StringType()),
   
StructField("timestamp", StringType()),
   
StructField(
       
"location",
       
StructType([
           
StructField("latitude", FloatType()),
           
StructField("longitude", FloatType()),
       
]),
   
),
   
StructField(
       
"sensor_data",
       
StructType([
           
StructField("temperature", FloatType()),
           
StructField("humidity", FloatType()),
           
StructField("pressure", FloatType()),
           
StructField("light_level", IntegerType()),
       
]),
   
),
   
StructField(
       
"device_status",
       
StructType([
           
StructField("battery_level", IntegerType()),
           
StructField("signal_strength", IntegerType()),
           
StructField("connection_type", StringType()),
       
]),
   
),
   
StructField(
       
"metadata",
       
StructType([
           
StructField("sensor_type", StringType()),
           
StructField("unit_temperature", StringType()),
           
StructField("unit_humidity", StringType()),
           
StructField("unit_pressure", StringType()),
           
StructField("unit_light_level", StringType()),
           
StructField("firmware_version", StringType()),
       
]),
   
),
])
CLIENT_PROPERTY_KEYS = [
   
"security.protocol",
   
"sasl.mechanism",
   
"sasl.login.callback.handler.class",
   
"sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
   
# Parse client properties file
   
parsed_path = urlparse(client_properties_path)
   
if parsed_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

   
bucket_name = parsed_path.netloc
   
blob_name = parsed_path.path.lstrip("/")

   
client = storage.Client()
   
bucket = client.bucket(bucket_name)
   
blob = bucket.blob(blob_name)
   
file_content = "[DEFAULT]\n" + blob.download_as_text()

   
config = configparser.ConfigParser()
   
config.read_string(file_content)

   
client_properties = dict()
   
for key in CLIENT_PROPERTY_KEYS:
       
client_properties[key] = config.get("DEFAULT", key)

   
print(f"Client properties: {client_properties}")
   
return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
   
print("Starting initial data processing...")
   
initial_rows = (
       
spark.readStream.format("kafka")
       
.option("kafka.bootstrap.servers", bootstrap_server_address)
       
.option("startingOffsets", "earliest")
       
.option("subscribe", "kafka-iot-topic")
       
.option("kafka.security.protocol", client_properties["security.protocol"])
       
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
       
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
       
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
       
.load()
   
)

   
initial_rows = (
       
initial_rows.selectExpr("CAST(value AS STRING)")
       
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
       
.select("data.*")
   
)
   
   
# Print first 20 rows
   
def print_top_rows(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
           
batch_df.limit(20).show(truncate=False)

   
initial_query_print = initial_rows.writeStream \
       
.foreachBatch(print_top_rows) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_print)

   
# Calculate and print average temperatures
   
def process_initial_avg_temp(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
           
current_averages = (
               
batch_df.select("device_id", "sensor_data.temperature")
               
.groupBy("device_id")
               
.agg(avg("temperature").alias("average_temperature"))
               
.orderBy("device_id")
           
)
           
current_averages.show(truncate=False)

   
initial_query_avg_temp = initial_rows.writeStream \
       
.foreachBatch(process_initial_avg_temp) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_avg_temp)
   
   
# Write data to GCS
   
initial_data_gcs_writer = (
       
initial_rows.writeStream.outputMode("append")
       
.format("avro")
       
.option("path", store_data_gcs_path+"/tables/iot_avro/")
       
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
       
.trigger(once=True) \
       
.start()
   
)
   
queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

   
client_properties = get_client_properties(client_properties_path)

   
# Create SparkSession
   
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
   
   
queries_to_await = []

   
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
   
   
# Wait for all queries to terminate
   
for query in queries_to_await:
       
try:
           
query.awaitTermination()
       
except Exception as e:
           
print(f"Error awaiting query: {e}")
       
finally:
           
query.stop()

   
spark.stop()

if __name__ == "__main__":
   
if len(sys.argv) < 4:
       
print("Invalid number of arguments passed ", len(sys.argv))
       
print(
           
"Usage: ",
           
sys.argv[0],
           
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
       
)
       
raise ValueError("Invalid number of arguments passed.")

   
parsed_data_path = urlparse(sys.argv[3])
   
if parsed_data_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


   
main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

Explanation

  • Kode ini menyiapkan tugas Streaming Terstruktur PySpark untuk membaca data dari topik Kafka yang ditentukan. Fungsi ini menggunakan alamat bootstrap server Kafka yang disediakan dan konfigurasi Kafka yang dimuat dari file konfigurasi GCS untuk terhubung dan mengautentikasi dengan broker Kafka.
  • Pertama, kode ini membaca data mentah dari Kafka sebagai aliran array byte, dan mentransmisikan array byte tersebut ke string, serta menerapkan json_schema menggunakan StructType Spark untuk menentukan struktur data (ID perangkat, stempel waktu, lokasi, data sensor, dll.).
  • Kode ini mencetak 10 baris pertama ke konsol untuk pratinjau, menghitung suhu rata-rata per sensor, dan menulis semua data ke bucket GCS dalam format avro. Avro adalah sistem serialisasi data berbasis baris yang menyimpan data terstruktur secara efisien dalam format biner yang ringkas dan ditentukan skema, yang menawarkan evolusi skema, netralitas bahasa, dan kompresi tinggi untuk pemrosesan data skala besar.
  1. Buat file client.properties dan isi variabel lingkungan untuk alamat bootstrap server kafka.
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Upload file process_iot.py dan client.properties ke bucket Google Cloud Storage Anda, sehingga file tersebut dapat digunakan oleh tugas Dataproc.
gsutil cp process_iot.py client.properties $BUCKET
  1. Salin beberapa jar dependensi untuk tugas Dataproc ke bucket GCS Anda. Direktori ini berisi jar yang diperlukan untuk menjalankan tugas Spark Streaming dengan Kafka, serta library autentikasi Kafka Terkelola dan dependensinya, yang diambil dari Menyiapkan komputer klien.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. Kirim tugas Spark ke cluster Dataproc.
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

Log driver Spark akan dicetak. Anda juga akan dapat melihat tabel ini dicatat ke konsol dan data yang disimpan di bucket GCS.

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. Pembersihan

Ikuti langkah-langkah untuk membersihkan resource setelah menyelesaikan codelab.

  1. Hapus cluster Kafka Terkelola, VM GCE Penayang, dan cluster Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. Hapus subnetwork dan jaringan VPC Anda.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. Hapus bucket GCS jika Anda tidak ingin lagi menggunakan data tersebut.
gcloud storage rm --recursive $BUCKET

9. Selamat

Selamat, Anda telah berhasil membuat pipeline pemrosesan data IoT dengan Manage Kafka dan Dataproc yang membantu DemoIOT Solutions mendapatkan insight real-time tentang data yang dipublikasikan oleh perangkat mereka.

Anda telah membuat cluster Kafka Terkelola, memublikasikan peristiwa IoT ke cluster tersebut, dan menjalankan tugas Dataproc yang menggunakan streaming Spark untuk memproses peristiwa ini secara real time. Sekarang Anda mengetahui langkah-langkah utama yang diperlukan untuk membuat pipeline data menggunakan Managed Kafka dan Dataproc.

Dokumen referensi