Pemrosesan Data IoT Real-Time menggunakan Dataproc dan Layanan Terkelola Google untuk Apache Kafka
Tentang codelab ini
1. Pengantar
Terakhir Diperbarui: 10-06-2024
Latar belakang
Perangkat Internet of Things (IoT), mulai dari solusi smart home hingga sensor industri, menghasilkan data dalam jumlah besar di edge jaringan. Data ini sangat berharga untuk berbagai kasus penggunaan, seperti pemantauan perangkat, pelacakan, diagnostik, pengawasan, personalisasi, pengoptimalan armada, dan banyak lagi. Google Managed Service for Apache Kafka menawarkan cara yang skalabel dan andal untuk menyerap dan menyimpan aliran data yang berkelanjutan ini dengan cara yang kompatibel dengan OSS, mudah digunakan, dan aman, sementara Google Cloud Dataproc memungkinkan pemrosesan set data besar ini untuk analisis data menggunakan cluster Apache Spark dan Hadoop.
Yang akan Anda build
Dalam codelab ini, Anda akan membuat pipeline Pemrosesan Data IoT menggunakan Google Managed Service for Apache Kafka, Dataproc, Python, dan Apache Spark yang melakukan analisis real-time. Pipeline Anda akan:
- Memublikasikan data dari perangkat IoT ke cluster Kafka Terkelola menggunakan VM GCE
- Menstreaming data dari cluster Manage Kafka ke cluster Dataproc
- Memproses data menggunakan tugas Dataproc Spark Streaming
Yang akan Anda pelajari
- Cara membuat cluster Kafka dan Dataproc yang Dikelola Google
- Cara menjalankan tugas streaming menggunakan Dataproc
Yang Anda butuhkan
- Akun GCP aktif dengan penyiapan project. Jika belum memilikinya, Anda dapat mendaftar untuk uji coba gratis.
- gcloud CLI diinstal dan dikonfigurasi. Anda dapat mengikuti petunjuk untuk menginstal gcloud CLI di OS.
- Mengaktifkan API untuk Google Managed Kafka dan Dataproc di project GCP Anda.
2. Ringkasan
Untuk codelab ini, mari kita ikuti kisah perusahaan fiktif, DemoIOT Solutions. DemoIOT Solutions menyediakan perangkat sensor yang mengukur dan mengirimkan data untuk suhu, kelembapan, tekanan, tingkat cahaya, dan lokasi. Mereka ingin menyiapkan pipeline yang memproses data ini untuk menampilkan statistik real-time kepada pelanggan mereka. Dengan menggunakan pipeline tersebut, mereka dapat menyediakan berbagai layanan kepada pelanggan, seperti pemantauan, saran otomatis, pemberitahuan, dan insight tentang tempat pelanggan memasang sensor mereka.
Untuk melakukannya, kita akan menggunakan VM GCE untuk menyimulasikan perangkat IoT. Perangkat akan memublikasikan data ke topik Kafka di cluster Google Managed Kafka, yang akan dibaca dan diproses oleh tugas streaming Dataproc. Penyiapan Prasyarat dan halaman berikut akan memandu Anda melakukan semua langkah ini.
Penyiapan prasyarat
- Temukan nama project dan nomor project untuk project Anda. Lihat Menemukan nama, nomor, dan ID project untuk referensi.
- Subnet VPC. Tindakan ini akan memungkinkan konektivitas antara VM GCE, cluster Kafka, dan cluster Dataproc. Ikuti langkah-langkah ini untuk mencantumkan subnet yang ada menggunakan gcloud CLI. Jika diperlukan, ikuti membuat jaringan VPC mode otomatis yang akan membuat jaringan VPC dengan subnetwork di setiap region Google Cloud. Namun, untuk tujuan codelab ini, kita hanya akan menggunakan subjaringan dari satu region.
- Di subjaringan ini, pastikan ada aturan Firewall yang mengizinkan semua traffic masuk dari tcp:22, yang merupakan SSH yang diperlukan. Aturan ini akan tersedia untuk dipilih di bagian Aturan firewall saat membuat jaringan, jadi pastikan Anda memilihnya.
- Bucket GCS. Anda memerlukan akses ke bucket penyimpanan Google Cloud untuk menyimpan resource tugas Dataproc dan mempertahankan data yang diproses. Jika belum memilikinya, Anda dapat membuat project di GCP.
Mengisi variabel lingkungan
Di terminal tempat Anda menjalankan gcloud CLI, isi variabel lingkungan ini agar dapat digunakan nanti.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
Ganti kode berikut:
<project-id>
dengan nama project GCP yang Anda siapkan.<project-number>
dengan nama nomor project dari langkah 1 Prasyarat.<region>
dengan nama region dari Region dan zona yang tersedia yang ingin Anda gunakan. Misalnya, kita dapat menggunakanus-central1
.<zone>
dengan nama zona dari Region dan zona yang tersedia di bagian region yang sebelumnya Anda pilih. Misalnya, jika memilihus-central1
sebagai region, Anda dapat menggunakanus-central1-f
sebagai zona. Zona ini akan digunakan untuk membuat VM GCE yang menyimulasikan perangkat IoT. Pastikan zona Anda berada di region yang telah Anda pilih.<subnet-path>
dengan jalur lengkap subnet dari Prasyarat langkah 2. Nilai ini harus dalam format:projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
.<bucket-name>
dengan nama bucket GCS dari langkah 3 Prasyarat.
3. Menyiapkan Kafka Terkelola Google
Bagian ini menyiapkan cluster Kafka Terkelola Google, yang men-deploy server Kafka, dan membuat topik di server ini tempat data IoT dapat dipublikasikan dan dibaca setelah berlangganan. DemoIOT Solutions dapat menyiapkan cluster ini sehingga semua perangkatnya memublikasikan data ke cluster tersebut.
Membuat cluster Kafka Terkelola
- Buat cluster Kafka Terkelola. Di sini, nama cluster adalah
kafka-iot
.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
Anda akan mendapatkan respons seperti berikut:
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
Pembuatan cluster memerlukan waktu 20-30 menit. Tunggu hingga operasi ini selesai.
Membuat Topik
- Buat topik Kafka Terkelola di cluster. Di sini, nama topik adalah
kafka-iot-topic
.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
Anda akan mendapatkan output yang mirip dengan berikut ini:
Created topic [kafka-iot-topic].
4. Menyiapkan Penayang
Untuk memublikasikan ke cluster Managed Kafka, kita menyiapkan instance VM Google Compute Engine yang dapat mengakses VPC yang berisi subnet yang digunakan oleh cluster Managed Kafka. VM ini menyimulasikan perangkat sensor yang disediakan oleh DemoIOT Solutions.
Langkah
- Buat instance VM Google Compute Engine. Di sini, nama VM GCE adalah
publisher-instance
.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Berikan izin kepada akun layanan default Google Compute Engine untuk menggunakan Managed Service for Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- Gunakan SSH untuk terhubung ke VM. Atau, gunakan Konsol Google Cloud untuk SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Instal Java untuk menjalankan alat command line Kafka, dan download biner Kafka menggunakan perintah ini.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- Download library autentikasi Kafka Terkelola dan dependensinya, lalu konfigurasikan properti klien Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
Untuk mengetahui detail selengkapnya tentang penyiapan mesin penayang, lihat Menyiapkan mesin klien.
5. Memublikasikan ke Managed Kafka
Setelah penayang disiapkan, kita dapat menggunakan command line Kafka di dalamnya untuk memublikasikan beberapa data dummy dari VM GCE (menyimulasikan perangkat IoT oleh DemoIOT Solutions) ke cluster Kafka Terkelola.
- Karena telah menggunakan SSH ke instance VM GCE, kita perlu mengisi ulang variabel
PROJECT_ID
:
export PROJECT_ID=<project-id>
export REGION=<region>
Ganti kode berikut:
<project-id>
dengan nama project GCP yang Anda siapkan.<region>
dengan region tempat cluster Kafka dibuat
- Gunakan perintah
managed-kafka clusters describe
untuk mendapatkan alamat IP server bootstrap Kafka. Alamat ini dapat digunakan untuk terhubung ke cluster Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Cantumkan topik di cluster:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
Anda akan dapat melihat output berikut, yang berisi topik kafka-iot-topic
yang kita buat sebelumnya.
__remote_log_metadata
kafka-iot-topic
- Salin dan tempel skrip ini ke dalam file baru
publish_iot_data.sh
. Untuk membuat file baru di VM GCE, Anda dapat menggunakan alat sepertivim
ataunano
.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
Explanation
- Skrip ini membuat pesan JSON dengan simulasi pembacaan sensor yang memiliki ID perangkat, stempel waktu, data sensor (suhu, kelembapan, tekanan, cahaya), informasi lokasi (lintang, bujur), status perangkat (baterai, sinyal, jenis koneksi), dan beberapa metadata.
- Alat ini menghasilkan aliran pesan yang berkelanjutan dari sejumlah perangkat unik yang ditetapkan, yang masing-masing mengirim data pada interval waktu yang ditentukan, meniru perangkat IoT di dunia nyata. Di sini, kita memublikasikan data dari 10 perangkat yang masing-masing menghasilkan 20 pembacaan, dengan interval waktu 10 detik.
- Alat ini juga memublikasikan semua data yang dihasilkan ke topik Kafka menggunakan alat command line produsen Kafka.
- Instal beberapa dependensi yang digunakan oleh skrip - paket
bc
untuk penghitungan matematika dan paketjq
untuk pemrosesan JSON.
sudo apt-get install bc jq
- Ubah skrip menjadi file yang dapat dieksekusi dan jalankan skrip. Proses ini memerlukan waktu sekitar 2 menit.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
Anda dapat memeriksa apakah peristiwa berhasil dipublikasikan dengan menjalankan perintah ini yang akan mencetak semua peristiwa. Tekan <control-c>
untuk keluar.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Menyiapkan Cluster Dataproc
Bagian ini membuat cluster Dataproc di subjaringan VPC tempat cluster Managed Kafka berada. Cluster ini akan digunakan untuk menjalankan tugas yang menghasilkan statistik dan insight real-time yang diperlukan oleh Solusi DemoIOT.
- Buat cluster Dataproc. Di sini, nama cluster adalah
dataproc-iot
.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
Anda akan mendapatkan respons seperti berikut:
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
Pembuatan cluster mungkin memerlukan waktu 10-15 menit. Tunggu hingga operasi ini berhasil diselesaikan dan pastikan cluster berada dalam status RUNNING
dengan mendeskripsikan cluster.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Memproses pesan Kafka menggunakan Dataproc
Di bagian terakhir ini, Anda akan mengirimkan tugas Dataproc yang memproses pesan yang dipublikasikan menggunakan Spark Streaming. Tugas ini sebenarnya menghasilkan beberapa statistik dan insight real time yang dapat digunakan oleh DemoIOT Solutions.
- Jalankan perintah ini untuk membuat file tugas PySpark streaming yang disebut
process_iot.py
secara lokal.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
Explanation
- Kode ini menyiapkan tugas Streaming Terstruktur PySpark untuk membaca data dari topik Kafka yang ditentukan. Fungsi ini menggunakan alamat bootstrap server Kafka yang disediakan dan konfigurasi Kafka yang dimuat dari file konfigurasi GCS untuk terhubung dan mengautentikasi dengan broker Kafka.
- Pertama, kode ini membaca data mentah dari Kafka sebagai aliran array byte, dan mentransmisikan array byte tersebut ke string, serta menerapkan
json_schema
menggunakan StructType Spark untuk menentukan struktur data (ID perangkat, stempel waktu, lokasi, data sensor, dll.). - Kode ini mencetak 10 baris pertama ke konsol untuk pratinjau, menghitung suhu rata-rata per sensor, dan menulis semua data ke bucket GCS dalam format
avro
. Avro adalah sistem serialisasi data berbasis baris yang menyimpan data terstruktur secara efisien dalam format biner yang ringkas dan ditentukan skema, yang menawarkan evolusi skema, netralitas bahasa, dan kompresi tinggi untuk pemrosesan data skala besar.
- Buat file
client.properties
dan isi variabel lingkungan untuk alamat bootstrap server kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Upload file
process_iot.py
danclient.properties
ke bucket Google Cloud Storage Anda, sehingga file tersebut dapat digunakan oleh tugas Dataproc.
gsutil cp process_iot.py client.properties $BUCKET
- Salin beberapa jar dependensi untuk tugas Dataproc ke bucket GCS Anda. Direktori ini berisi jar yang diperlukan untuk menjalankan tugas Spark Streaming dengan Kafka, serta library autentikasi Kafka Terkelola dan dependensinya, yang diambil dari Menyiapkan komputer klien.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Kirim tugas Spark ke cluster Dataproc.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
Log driver Spark akan dicetak. Anda juga akan dapat melihat tabel ini dicatat ke konsol dan data yang disimpan di bucket GCS.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. Pembersihan
Ikuti langkah-langkah untuk membersihkan resource setelah menyelesaikan codelab.
- Hapus cluster Kafka Terkelola, VM GCE Penayang, dan cluster Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- Hapus subnetwork dan jaringan VPC Anda.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- Hapus bucket GCS jika Anda tidak ingin lagi menggunakan data tersebut.
gcloud storage rm --recursive $BUCKET
9. Selamat
Selamat, Anda telah berhasil membuat pipeline pemrosesan data IoT dengan Manage Kafka dan Dataproc yang membantu DemoIOT Solutions mendapatkan insight real-time tentang data yang dipublikasikan oleh perangkat mereka.
Anda telah membuat cluster Kafka Terkelola, memublikasikan peristiwa IoT ke cluster tersebut, dan menjalankan tugas Dataproc yang menggunakan streaming Spark untuk memproses peristiwa ini secara real time. Sekarang Anda mengetahui langkah-langkah utama yang diperlukan untuk membuat pipeline data menggunakan Managed Kafka dan Dataproc.