Xử lý dữ liệu IoT theo thời gian thực bằng Dataproc và Dịch vụ do Google quản lý dành cho Apache Kafka
Thông tin về lớp học lập trình này
1. Giới thiệu
Lần cập nhật gần đây nhất: ngày 10 tháng 6 năm 2024
Nền
Các thiết bị Internet của vạn vật (IoT), từ giải pháp nhà thông minh đến cảm biến công nghiệp, tạo ra một lượng lớn dữ liệu ở rìa mạng. Dữ liệu này rất có giá trị cho nhiều trường hợp sử dụng, chẳng hạn như giám sát thiết bị, theo dõi, chẩn đoán, giám sát, cá nhân hoá, tối ưu hoá thiết bị và nhiều trường hợp khác. Dịch vụ do Google quản lý cho Apache Kafka cung cấp một cách linh hoạt và bền vững để nhập và lưu trữ luồng dữ liệu liên tục này theo cách tương thích với OSS, dễ sử dụng và an toàn, trong khi Google Cloud Dataproc cho phép xử lý các tập dữ liệu lớn này để phân tích dữ liệu bằng cách sử dụng các cụm Apache Spark và Hadoop.
Sản phẩm bạn sẽ tạo ra
Trong lớp học lập trình này, bạn sẽ xây dựng quy trình Xử lý dữ liệu IoT bằng Dịch vụ do Google quản lý dành cho Apache Kafka, Dataproc, Python và Apache Spark để phân tích theo thời gian thực. Quy trình của bạn sẽ:
- Xuất bản dữ liệu từ các thiết bị IoT đến cụm Kafka được quản lý bằng máy ảo GCE
- Truyền trực tuyến dữ liệu từ cụm Manage Kafka đến cụm Dataproc
- Xử lý dữ liệu bằng công việc Dataproc Spark Streaming
Kiến thức bạn sẽ học được
- Cách tạo cụm Kafka và Dataproc do Google quản lý
- Cách chạy công việc phát trực tuyến bằng Dataproc
Bạn cần có
- Tài khoản GCP đang hoạt động và đã thiết lập dự án. Nếu chưa có tài khoản, bạn có thể đăng ký dùng thử miễn phí.
- Đã cài đặt và định cấu hình gcloud CLI. Bạn có thể làm theo hướng dẫn để cài đặt gcloud CLI trên hệ điều hành.
- Đã bật API cho Kafka do Google quản lý và Dataproc trong dự án GCP của bạn.
2. Tổng quan
Trong lớp học lập trình này, hãy cùng theo dõi câu chuyện về một công ty giả lập, DemoIOT Solutions. DemoIOT Solutions cung cấp các thiết bị cảm biến đo lường và truyền dữ liệu về nhiệt độ, độ ẩm, áp suất, mức độ ánh sáng và vị trí. Họ muốn thiết lập quy trình xử lý dữ liệu này để hiển thị số liệu thống kê theo thời gian thực cho khách hàng. Bằng cách sử dụng các quy trình như vậy, họ có thể cung cấp nhiều dịch vụ cho khách hàng, chẳng hạn như giám sát, đề xuất tự động, cảnh báo và thông tin chi tiết về những nơi khách hàng đã lắp đặt cảm biến.
Để thực hiện việc này, chúng ta sẽ sử dụng máy ảo GCE để mô phỏng thiết bị IoT. Thiết bị sẽ phát hành dữ liệu đến một chủ đề Kafka trong cụm Kafka do Google quản lý. Dữ liệu này sẽ được đọc và xử lý bằng một công việc truyền trực tuyến Dataproc. Phần thiết lập Điều kiện tiên quyết và các trang sau đây sẽ hướng dẫn bạn thực hiện tất cả các bước này.
Thiết lập điều kiện tiên quyết
- Tìm tên dự án và số dự án của dự án. Hãy xem bài viết Tìm tên, số và mã dự án để tham khảo.
- Mạng con VPC. Thao tác này sẽ cho phép kết nối giữa máy ảo GCE, cụm Kafka và cụm Dataproc. Làm theo hướng dẫn này để liệt kê các mạng con hiện có bằng gcloud CLI. Nếu cần, hãy làm theo hướng dẫn tạo mạng VPC ở chế độ tự động để tạo mạng VPC có mạng con ở mỗi khu vực của Google Cloud. Tuy nhiên, trong lớp học lập trình này, chúng ta sẽ chỉ sử dụng một mạng con từ một khu vực duy nhất.
- Trong mạng con này, hãy đảm bảo có một quy tắc Tường lửa cho phép tất cả lưu lượng truy cập từ tcp:22, đây là SSH bắt buộc. Bạn có thể chọn quy tắc này trong phần Quy tắc tường lửa khi tạo mạng. Vì vậy, hãy nhớ chọn quy tắc này.
- Bộ chứa GCS. Bạn sẽ cần quyền truy cập vào một bộ chứa trên Google Cloud Storage để lưu trữ tài nguyên công việc Dataproc và lưu trữ dữ liệu đã xử lý. Nếu chưa có, bạn có thể tạo một trong dự án GCP của mình.
Điền sẵn biến môi trường
Trong thiết bị đầu cuối nơi bạn chạy CLI gcloud, hãy điền các biến môi trường này để có thể sử dụng sau.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
Thay thế nội dung sau:
<project-id>
có tên của dự án GCP mà bạn thiết lập.<project-number>
có tên là số dự án trong bước 1 của Điều kiện tiên quyết.<region>
có tên của một khu vực trong Các khu vực và vùng có sẵn mà bạn muốn sử dụng. Ví dụ: chúng ta có thể sử dụngus-central1
.<zone>
có tên của khu vực trong phần Khu vực và múi giờ hiện có trong khu vực mà bạn đã chọn trước đó. Ví dụ: nếu đã chọnus-central1
làm khu vực, bạn có thể sử dụngus-central1-f
làm vùng. Vùng này sẽ được dùng để tạo máy ảo GCE mô phỏng các thiết bị IoT. Đảm bảo rằng khu vực của bạn nằm trong khu vực mà bạn đã chọn.<subnet-path>
có đường dẫn đầy đủ của mạng con từ bước 2 trong phần Điều kiện tiên quyết. Giá trị của thuộc tính này phải ở định dạng:projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
.<bucket-name>
có tên của bộ chứa GCS từ bước 3 trong phần Điều kiện tiên quyết.
3. Thiết lập Kafka do Google quản lý
Phần này thiết lập một cụm Kafka do Google quản lý, cụm này sẽ triển khai máy chủ Kafka và tạo một chủ đề trên máy chủ này để có thể phát hành và đọc dữ liệu IoT sau khi đăng ký. DemoIOT Solutions có thể thiết lập cụm này để tất cả thiết bị của họ phát hành dữ liệu cho cụm này.
Tạo cụm Kafka được quản lý
- Tạo cụm Kafka được quản lý. Ở đây, tên của cụm là
kafka-iot
.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
Bạn sẽ nhận được phản hồi tương tự như sau:
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
Quá trình tạo cụm sẽ mất từ 20 đến 30 phút. Chờ thao tác này hoàn tất.
Tạo chủ đề
- Tạo một chủ đề Kafka được quản lý trên cụm. Ở đây, tên của chủ đề là
kafka-iot-topic
.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
Bạn sẽ nhận được kết quả tương tự như sau:
Created topic [kafka-iot-topic].
4. Thiết lập Nhà xuất bản
Để phát hành cho cụm Kafka được quản lý, chúng ta thiết lập một phiên bản máy ảo Google Compute Engine có thể truy cập vào VPC chứa mạng con mà cụm Kafka được quản lý sử dụng. Máy ảo này mô phỏng các thiết bị cảm biến do DemoIOT Solutions cung cấp.
Các bước
- Tạo phiên bản máy ảo Google Compute Engine. Ở đây, tên của máy ảo GCE là
publisher-instance
.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Cấp cho tài khoản dịch vụ mặc định của Google Compute Engine quyền sử dụng Dịch vụ do đám mây quản lý dành cho Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- Sử dụng SSH để kết nối với máy ảo. Ngoài ra, bạn có thể sử dụng Google Cloud Console để SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Cài đặt Java để chạy các công cụ dòng lệnh Kafka và tải tệp nhị phân Kafka xuống bằng các lệnh sau.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- Tải thư viện xác thực Kafka được quản lý và các phần phụ thuộc của thư viện đó xuống, đồng thời định cấu hình các thuộc tính ứng dụng Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
Để biết thêm thông tin chi tiết về cách thiết lập máy của nhà xuất bản, hãy tham khảo bài viết Thiết lập máy khách.
5. Xuất bản lên Kafka được quản lý
Giờ đây, khi đã thiết lập trình xuất bản, chúng ta có thể sử dụng dòng lệnh Kafka trên trình xuất bản đó để phát hành một số dữ liệu giả lập từ máy ảo GCE (mô phỏng các thiết bị IoT bằng Giải pháp DemoIOT) đến cụm Kafka được quản lý.
- Vì đã kết nối SSH vào thực thể máy ảo GCE, nên chúng ta cần điền lại biến
PROJECT_ID
:
export PROJECT_ID=<project-id>
export REGION=<region>
Thay thế nội dung sau:
<project-id>
có tên của dự án GCP mà bạn thiết lập.<region>
với khu vực tạo cụm Kafka
- Sử dụng lệnh
managed-kafka clusters describe
để lấy địa chỉ IP của máy chủ khởi động Kafka. Bạn có thể dùng địa chỉ này để kết nối với cụm Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Liệt kê các chủ đề trong cụm:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
Bạn sẽ thấy kết quả sau đây, chứa chủ đề kafka-iot-topic
mà chúng ta đã tạo trước đó.
__remote_log_metadata
kafka-iot-topic
- Sao chép và dán tập lệnh này vào tệp mới
publish_iot_data.sh
. Để tạo tệp mới trên máy ảo GCE, bạn có thể sử dụng một công cụ nhưvim
hoặcnano
.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
Giải thích
- Tập lệnh này tạo thông điệp JSON có các giá trị đọc cảm biến được mô phỏng có mã thiết bị, dấu thời gian, dữ liệu cảm biến (nhiệt độ, độ ẩm, áp suất, ánh sáng), thông tin vị trí (vĩ độ, kinh độ), trạng thái thiết bị (pin, tín hiệu, loại kết nối) và một số siêu dữ liệu.
- Công cụ này tạo ra một luồng tin nhắn liên tục từ một số thiết bị riêng biệt, mỗi thiết bị gửi dữ liệu theo một khoảng thời gian cụ thể, mô phỏng các thiết bị IoT trong thực tế. Ở đây, chúng tôi phát hành dữ liệu từ 10 thiết bị, mỗi thiết bị tạo ra 20 lần đọc, trong khoảng thời gian 10 giây.
- Công cụ này cũng phát hành tất cả dữ liệu đã tạo đến chủ đề Kafka bằng công cụ dòng lệnh của nhà sản xuất Kafka.
- Cài đặt một số phần phụ thuộc mà tập lệnh sử dụng – gói
bc
để tính toán toán học và góijq
để xử lý JSON.
sudo apt-get install bc jq
- Sửa đổi tập lệnh để có thể thực thi và chạy tập lệnh. Quá trình này sẽ mất khoảng 2 phút.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
Bạn có thể kiểm tra xem các sự kiện đã được xuất bản thành công hay chưa bằng cách chạy lệnh này. Lệnh này sẽ in tất cả các sự kiện. Nhấn <control-c>
để thoát.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Thiết lập cụm Dataproc
Phần này tạo một cụm Dataproc trong mạng con VPC có cụm Kafka được quản lý. Cụm này sẽ được dùng để chạy các công việc tạo ra số liệu thống kê và thông tin chi tiết theo thời gian thực mà Giải pháp DemoIOT cần.
- Tạo cụm Dataproc. Ở đây, tên của cụm là
dataproc-iot
.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
Bạn sẽ nhận được phản hồi tương tự như sau:
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
Quá trình tạo cụm có thể mất 10 đến 15 phút. Chờ thao tác này hoàn tất và kiểm tra để đảm bảo cụm đang ở trạng thái RUNNING
bằng cách mô tả cụm.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Xử lý thông báo Kafka bằng Dataproc
Trong phần cuối cùng này, bạn sẽ gửi một công việc Dataproc để xử lý các thông báo đã xuất bản bằng Spark Streaming. Công việc này thực sự tạo ra một số số liệu thống kê và thông tin chi tiết theo thời gian thực mà Giải pháp DemoIOT có thể sử dụng.
- Chạy lệnh này để tạo tệp công việc PySpark truyền trực tuyến có tên
process_iot.py
trên máy.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
Giải thích
- Mã này thiết lập một công việc Truyền phát có cấu trúc PySpark để đọc dữ liệu từ một chủ đề Kafka đã chỉ định. Ứng dụng này sử dụng địa chỉ khởi động máy chủ Kafka và cấu hình Kafka được cung cấp được tải từ tệp cấu hình GCS để kết nối và xác thực với trình dàn xếp Kafka.
- Trước tiên, lớp này đọc dữ liệu thô từ Kafka dưới dạng luồng các mảng byte, sau đó truyền các mảng byte đó thành chuỗi và áp dụng
json_schema
bằng StructType của Spark để chỉ định cấu trúc của dữ liệu (mã thiết bị, dấu thời gian, vị trí, dữ liệu cảm biến, v.v.). - Hàm này in 10 hàng đầu tiên vào bảng điều khiển để xem trước, tính toán nhiệt độ trung bình trên mỗi cảm biến và ghi tất cả dữ liệu vào bộ chứa GCS ở định dạng
avro
. Avro là một hệ thống chuyển đổi tuần tự dữ liệu dựa trên hàng, giúp lưu trữ hiệu quả dữ liệu có cấu trúc ở định dạng tệp nhị phân nhỏ gọn, do giản đồ xác định, cung cấp khả năng phát triển giản đồ, tính trung lập về ngôn ngữ và khả năng nén cao để xử lý dữ liệu trên quy mô lớn.
- Tạo tệp
client.properties
và điền biến môi trường cho địa chỉ khởi động của máy chủ kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Tải tệp
process_iot.py
vàclient.properties
lên bộ chứa Google Cloud Storage để công việc Dataproc có thể sử dụng các tệp đó.
gsutil cp process_iot.py client.properties $BUCKET
- Sao chép một số tệp jar phần phụ thuộc cho công việc Dataproc vào bộ chứa GCS. Thư mục này chứa các tệp jar cần thiết để chạy các công việc Spark Streaming với Kafka, cũng như thư viện xác thực Kafka được quản lý và các phần phụ thuộc của thư viện này, được lấy từ phần Thiết lập máy khách.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Gửi công việc Spark đến cụm Dataproc.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
Nhật ký trình điều khiển Spark sẽ được in. Bạn cũng có thể xem các bảng này được ghi lại vào bảng điều khiển và dữ liệu được lưu trữ trong bộ chứa GCS.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. Dọn dẹp
Làm theo các bước để dọn dẹp tài nguyên sau khi hoàn thành lớp học lập trình.
- Xoá cụm Kafka được quản lý, máy ảo GCE của Nhà xuất bản và cụm Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- Xoá mạng con và mạng VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- Xoá bộ chứa GCS nếu bạn không còn muốn sử dụng dữ liệu.
gcloud storage rm --recursive $BUCKET
9. Xin chúc mừng
Xin chúc mừng! Bạn đã xây dựng thành công quy trình xử lý dữ liệu IoT bằng cách sử dụng Manage Kafka và Dataproc. Điều này giúp DemoIOT Solutions có được thông tin chi tiết theo thời gian thực về dữ liệu do các thiết bị của họ phát hành!
Bạn đã tạo một cụm Kafka được quản lý, phát hành các sự kiện IoT cho cụm đó và chạy một công việc Dataproc sử dụng tính năng truyền trực tuyến Spark để xử lý các sự kiện này theo thời gian thực. Giờ đây, bạn đã biết các bước chính cần thiết để tạo quy trình dữ liệu bằng cách sử dụng Kafka được quản lý và Dataproc.