Xử lý dữ liệu IoT theo thời gian thực bằng Dataproc và Dịch vụ do Google quản lý dành cho Apache Kafka

1. Giới thiệu

2efacab8643a653b.png

Lần cập nhật gần đây nhất: ngày 10 tháng 6 năm 2024

Thông tin cơ bản

Các thiết bị Internet of Things (IoT), từ các giải pháp nhà thông minh đến cảm biến công nghiệp, tạo ra lượng lớn dữ liệu ở rìa mạng. Dữ liệu này rất có giá trị đối với nhiều trường hợp sử dụng, chẳng hạn như giám sát, theo dõi, chẩn đoán, giám sát thiết bị, cá nhân hoá, tối ưu hoá đội xe và nhiều trường hợp khác. Dịch vụ được quản lý của Google dành cho Apache Kafka cung cấp một cách thức có khả năng mở rộng và bền bỉ để nhập và lưu trữ luồng dữ liệu liên tục này theo cách tương thích với OSS, dễ sử dụng và an toàn, trong khi Google Cloud Dataproc cho phép xử lý các tập dữ liệu lớn này để phân tích dữ liệu bằng cách sử dụng các cụm Apache Spark và Hadoop.

Sản phẩm bạn sẽ tạo ra

Trong lớp học lập trình này, bạn sẽ xây dựng một pipeline Xử lý dữ liệu IoT bằng Dịch vụ được quản lý của Google dành cho Apache Kafka, Dataproc, Python và Apache Spark để phân tích theo thời gian thực. Quy trình của bạn sẽ:

  • Xuất bản dữ liệu từ các thiết bị IoT đến một cụm Kafka được quản lý bằng cách sử dụng máy ảo GCE
  • Truyền trực tuyến dữ liệu từ cụm Manage Kafka đến cụm Dataproc
  • Xử lý dữ liệu bằng một công việc Truyền phát trực tiếp Spark của Dataproc

Kiến thức bạn sẽ học được

  • Cách tạo các cụm Dataproc và Kafka do Google quản lý
  • Cách chạy các công việc truyền phát trực tuyến bằng Dataproc

Bạn cần có

2. Tổng quan

Trong lớp học lập trình này, hãy theo dõi câu chuyện về một công ty ảo có tên là DemoIOT Solutions. DemoIOT Solutions cung cấp các thiết bị cảm biến đo lường và truyền dữ liệu về nhiệt độ, độ ẩm, áp suất, mức ánh sáng và vị trí. Họ muốn thiết lập các quy trình xử lý dữ liệu này để cho khách hàng thấy số liệu thống kê theo thời gian thực. Bằng cách sử dụng các quy trình như vậy, họ có thể cung cấp nhiều loại dịch vụ cho khách hàng, chẳng hạn như giám sát, đề xuất tự động, cảnh báo và thông tin chi tiết về những nơi mà khách hàng đã lắp đặt cảm biến.

Để thực hiện việc này, chúng ta sẽ sử dụng một máy ảo GCE để mô phỏng thiết bị IoT. Thiết bị sẽ xuất bản dữ liệu vào một chủ đề Kafka trong cụm Kafka do Google quản lý. Dữ liệu này sẽ được một công việc truyền trực tuyến Dataproc đọc và xử lý. Chế độ thiết lập Điều kiện tiên quyết và các trang sau đây sẽ hướng dẫn bạn thực hiện tất cả các bước này.

Thiết lập điều kiện tiên quyết

  1. Tìm tên dự án và số dự án cho dự án của bạn. Hãy xem phần Tìm tên, số và mã dự án để tham khảo.
  2. Mạng con VPC. Điều này sẽ cho phép kết nối giữa máy ảo GCE, cụm Kafka và cụm Dataproc. Hãy làm theo hướng dẫn này để liệt kê các mạng con hiện có bằng gcloud CLI. Nếu cần, hãy làm theo hướng dẫn tạo mạng VPC ở chế độ tự động. Thao tác này sẽ tạo một mạng VPC có mạng con ở mỗi khu vực của Google Cloud. Tuy nhiên, để phục vụ mục đích của lớp học lập trình này, chúng ta sẽ chỉ sử dụng một mạng con từ một khu vực duy nhất.
  • Trong mạng con này, hãy đảm bảo rằng có một quy tắc Tường lửa cho phép tất cả lưu lượng truy cập từ tcp:22 (SSH bắt buộc). Bạn có thể chọn quy tắc này trong phần Quy tắc tường lửa khi tạo mạng. Vì vậy, hãy nhớ chọn quy tắc này.
  1. Bộ chứa GCS. Bạn sẽ cần có quyền truy cập vào một bộ chứa lưu trữ trên Google Cloud để lưu trữ các tài nguyên của công việc Dataproc và duy trì dữ liệu đã xử lý. Nếu chưa có, bạn có thể tạo một khoá trong dự án GCP của mình.

Điền sẵn các biến môi trường

Trong thiết bị đầu cuối nơi bạn chạy giao diện dòng lệnh gcloud, hãy điền sẵn các biến môi trường này để có thể sử dụng sau này.

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

Thay thế nội dung sau:

  • <project-id> có tên của dự án GCP mà bạn thiết lập.
  • <project-number> bằng tên của số dự án ở bước 1 trong phần Điều kiện tiên quyết.
  • <region> bằng tên của một khu vực trong Các khu vực và múi giờ có thể sử dụng mà bạn muốn sử dụng. Ví dụ: chúng ta có thể sử dụng us-central1.
  • <zone> có tên của vùng trong Các khu vực và vùng có sẵn trong khu vực mà bạn đã chọn trước đó. Ví dụ: nếu chọn us-central1 làm vùng, bạn có thể dùng us-central1-f làm khu vực. Vùng này sẽ được dùng để tạo máy ảo GCE mô phỏng các thiết bị IoT. Đảm bảo rằng khu vực của bạn nằm trong vùng mà bạn đã chọn.
  • <subnet-path> có đường dẫn đầy đủ của mạng con trong Bước 2 của phần Điều kiện tiên quyết. Giá trị của tham số này phải ở định dạng: projects/<project-id>/regions/<region>/subnetworks/<subnet-name>.
  • <bucket-name> bằng tên của bộ chứa GCS trong Bước 3 của phần Điều kiện tiên quyết.

3. Thiết lập Kafka do Google quản lý

Phần này thiết lập một cụm Kafka do Google quản lý, triển khai máy chủ Kafka và tạo một chủ đề trên máy chủ này. Sau khi đăng ký, dữ liệu IoT có thể được xuất bản và đọc từ chủ đề này. DemoIOT Solutions có thể thiết lập cụm này để tất cả thiết bị của họ đều xuất bản dữ liệu lên cụm.

Tạo một cụm Kafka được quản lý

  • Tạo cụm Kafka được quản lý. Trong đó, tên của cụm là kafka-iot.
gcloud managed-kafka clusters create kafka-iot \
    --project=$PROJECT_ID \
    --location=$REGION \
    --cpu=3 \
    --memory=12GiB \
    --subnets=$SUBNET_PATH \
    --auto-rebalance

Bạn sẽ nhận được phản hồi tương tự như sau:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                         
Created cluster [kafka-iot].

Quá trình tạo cụm mất từ 20 đến 30 phút. Chờ quá trình này hoàn tất.

Tạo một chủ đề

  • Tạo một chủ đề Kafka được quản lý trên cụm. Ở đây, tên của chủ đề là kafka-iot-topic.
gcloud managed-kafka topics create kafka-iot-topic \
    --cluster=kafka-iot \
    --location=$REGION \
    --partitions=10 \
    --replication-factor=3

Bạn sẽ nhận được kết quả tương tự như sau:

Created topic [kafka-iot-topic].

4. Thiết lập nhà xuất bản

Để xuất bản vào cụm Kafka được quản lý, chúng ta thiết lập một thực thể máy ảo Google Compute Engine có thể truy cập vào VPC chứa mạng con mà cụm Kafka được quản lý sử dụng. VM này mô phỏng các thiết bị cảm biến do DemoIOT Solutions cung cấp.

Các bước

  1. Tạo thực thể máy ảo Google Compute Engine. Trong đó, tên của máy ảo GCE là publisher-instance.
gcloud compute instances create publisher-instance \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --subnet=$SUBNET_PATH \
    --zone=$ZONE
  1. Cấp cho tài khoản dịch vụ mặc định của Google Compute Engine quyền sử dụng Dịch vụ được quản lý cho Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. Sử dụng SSH để kết nối với VM. Hoặc sử dụng Google Cloud Console để SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. Cài đặt Java để chạy các công cụ dòng lệnh Kafka và tải xuống tệp nhị phân Kafka bằng các lệnh này.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. Tải thư viện xác thực Kafka được quản lý và các phần phụ thuộc của thư viện này xuống, đồng thời định cấu hình các thuộc tính của ứng dụng Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

Để biết thêm thông tin chi tiết về cách thiết lập máy của nhà xuất bản, hãy tham khảo bài viết Thiết lập máy khách.

5. Xuất bản lên Kafka được quản lý

Giờ đây, khi đã thiết lập nhà xuất bản, chúng ta có thể sử dụng dòng lệnh Kafka trên nhà xuất bản đó để xuất bản một số dữ liệu giả từ VM GCE (mô phỏng các thiết bị IoT bằng DemoIOT Solutions) sang Cụm Kafka được quản lý.

  1. Vì đã SSH vào phiên bản máy ảo GCE, nên chúng ta cần điền lại biến PROJECT_ID:
export PROJECT_ID=<project-id>
export REGION=<region>

Thay thế nội dung sau:

  • <project-id> có tên của dự án GCP mà bạn thiết lập.
  • <region> có khu vực mà cụm Kafka được tạo
  1. Sử dụng lệnh managed-kafka clusters describe để lấy địa chỉ IP của máy chủ khởi động Kafka. Bạn có thể dùng địa chỉ này để kết nối với cụm Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Liệt kê các chủ đề trong nhóm:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

Bạn sẽ thấy kết quả sau đây, trong đó có chủ đề kafka-iot-topic mà chúng ta đã tạo trước đó.

__remote_log_metadata
kafka-iot-topic
  1. Sao chép và dán tập lệnh này vào một tệp mới publish_iot_data.sh. Để tạo một tệp mới trên máy ảo GCE, bạn có thể sử dụng một công cụ như vim hoặc nano.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

Giải thích

  • Tập lệnh này tạo ra các thông báo JSON có số đọc cảm biến mô phỏng, bao gồm mã thiết bị, dấu thời gian, dữ liệu cảm biến (nhiệt độ, độ ẩm, áp suất, ánh sáng), thông tin vị trí (vĩ độ, kinh độ), trạng thái thiết bị (pin, tín hiệu, loại kết nối) và một số siêu dữ liệu.
  • Ứng dụng này tạo ra một luồng thông báo liên tục từ một số lượng thiết bị riêng biệt nhất định, mỗi thiết bị gửi dữ liệu theo một khoảng thời gian cụ thể, mô phỏng các thiết bị IoT trong thế giới thực. Ở đây, chúng tôi xuất bản dữ liệu từ 10 thiết bị, mỗi thiết bị tạo ra 20 chỉ số, trong khoảng thời gian 10 giây.
  • Nó cũng xuất bản tất cả dữ liệu đã tạo vào chủ đề Kafka bằng công cụ dòng lệnh của nhà sản xuất Kafka.
  1. Cài đặt một số phần phụ thuộc mà tập lệnh sử dụng – gói bc để tính toán toán học và gói jq để xử lý JSON.
sudo apt-get install bc jq
  1. Sửa đổi tập lệnh thành một tệp thực thi và chạy tập lệnh. Quá trình này sẽ mất khoảng 2 phút để chạy.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

Bạn có thể kiểm tra xem các sự kiện đã được xuất bản thành công hay chưa bằng cách chạy lệnh này. Lệnh này sẽ in tất cả các sự kiện. Nhấn <control-c> để thoát.

kafka-console-consumer.sh \
    --topic kafka-iot-topic \
    --from-beginning \
    --bootstrap-server $BOOTSTRAP \
    --consumer.config client.properties

6. Thiết lập Cụm Dataproc

Phần này tạo một cụm Dataproc trong mạng con VPC nơi có cụm Kafka được quản lý. Cụm này sẽ được dùng để chạy các công việc tạo ra số liệu thống kê và thông tin chi tiết theo thời gian thực mà DemoIOT Solutions cần.

  1. Tạo một cụm Dataproc. Trong đó, tên của cụm là dataproc-iot.
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

Bạn sẽ nhận được phản hồi tương tự như sau:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

Quá trình tạo cụm có thể mất từ 10 đến 15 phút. Chờ đến khi thao tác này hoàn tất thành công và kiểm tra để đảm bảo cụm ở trạng thái RUNNING bằng cách mô tả cụm.

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. Xử lý thông báo Kafka bằng Dataproc

Trong phần cuối cùng này, bạn sẽ gửi một công việc Dataproc để xử lý các thông báo đã xuất bản bằng Spark Streaming. Công việc này thực sự tạo ra một số số liệu thống kê và thông tin chi tiết theo thời gian thực mà DemoIOT Solutions có thể sử dụng.

  1. Chạy lệnh này để tạo tệp công việc PySpark truyền trực tuyến có tên là process_iot.py cục bộ.
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
    StructField("device_id", StringType()),
    StructField("timestamp", StringType()),
    StructField(
        "location",
        StructType([
            StructField("latitude", FloatType()),
            StructField("longitude", FloatType()),
        ]),
    ),
    StructField(
        "sensor_data",
        StructType([
            StructField("temperature", FloatType()),
            StructField("humidity", FloatType()),
            StructField("pressure", FloatType()),
            StructField("light_level", IntegerType()),
        ]),
    ),
    StructField(
        "device_status",
        StructType([
            StructField("battery_level", IntegerType()),
            StructField("signal_strength", IntegerType()),
            StructField("connection_type", StringType()),
        ]),
    ),
    StructField(
        "metadata",
        StructType([
            StructField("sensor_type", StringType()),
            StructField("unit_temperature", StringType()),
            StructField("unit_humidity", StringType()),
            StructField("unit_pressure", StringType()),
            StructField("unit_light_level", StringType()),
            StructField("firmware_version", StringType()),
        ]),
    ),
])
CLIENT_PROPERTY_KEYS = [
    "security.protocol",
    "sasl.mechanism",
    "sasl.login.callback.handler.class",
    "sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
    # Parse client properties file
    parsed_path = urlparse(client_properties_path)
    if parsed_path.scheme != "gs":
        raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

    bucket_name = parsed_path.netloc
    blob_name = parsed_path.path.lstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    file_content = "[DEFAULT]\n" + blob.download_as_text()

    config = configparser.ConfigParser()
    config.read_string(file_content)

    client_properties = dict()
    for key in CLIENT_PROPERTY_KEYS:
        client_properties[key] = config.get("DEFAULT", key)

    print(f"Client properties: {client_properties}")
    return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
    print("Starting initial data processing...")
    initial_rows = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_server_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", "kafka-iot-topic")
        .option("kafka.security.protocol", client_properties["security.protocol"])
        .option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
        .option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
        .option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
        .load()
    )

    initial_rows = (
        initial_rows.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), JSON_SCHEMA).alias("data"))
        .select("data.*")
    )
    
    # Print first 20 rows
    def print_top_rows(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
            batch_df.limit(20).show(truncate=False)

    initial_query_print = initial_rows.writeStream \
        .foreachBatch(print_top_rows) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_print)

    # Calculate and print average temperatures
    def process_initial_avg_temp(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
            current_averages = (
                batch_df.select("device_id", "sensor_data.temperature")
                .groupBy("device_id")
                .agg(avg("temperature").alias("average_temperature"))
                .orderBy("device_id")
            )
            current_averages.show(truncate=False)

    initial_query_avg_temp = initial_rows.writeStream \
        .foreachBatch(process_initial_avg_temp) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_avg_temp)
    
    # Write data to GCS
    initial_data_gcs_writer = (
        initial_rows.writeStream.outputMode("append")
        .format("avro")
        .option("path", store_data_gcs_path+"/tables/iot_avro/")
        .option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
        .trigger(once=True) \
        .start()
    )
    queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

    client_properties = get_client_properties(client_properties_path)

    # Create SparkSession
    spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
    
    queries_to_await = []

    process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
    
    # Wait for all queries to terminate
    for query in queries_to_await:
        try:
            query.awaitTermination()
        except Exception as e:
            print(f"Error awaiting query: {e}")
        finally:
            query.stop()

    spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Invalid number of arguments passed ", len(sys.argv))
        print(
            "Usage: ",
            sys.argv[0],
            " <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
        )
        raise ValueError("Invalid number of arguments passed.")

    parsed_data_path = urlparse(sys.argv[3])
    if parsed_data_path.scheme != "gs":
        raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


    main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

Giải thích

  • Đoạn mã này thiết lập một công việc Truyền trực tuyến có cấu trúc PySpark để đọc dữ liệu từ một chủ đề Kafka được chỉ định. Nó sử dụng địa chỉ khởi động máy chủ Kafka được cung cấp và các cấu hình Kafka được tải từ tệp cấu hình GCS để kết nối và xác thực với trình môi giới Kafka.
  • Trước tiên, nó đọc dữ liệu thô từ Kafka dưới dạng một luồng mảng byte, truyền các mảng byte đó thành chuỗi và áp dụng json_schema bằng StructType của Spark để chỉ định cấu trúc của dữ liệu (mã thiết bị, dấu thời gian, vị trí, dữ liệu cảm biến, v.v.).
  • Thao tác này in 10 hàng đầu tiên vào bảng điều khiển để xem trước, tính toán nhiệt độ trung bình trên mỗi cảm biến và ghi tất cả dữ liệu vào vùng lưu trữ GCS ở định dạng avro. Avro là một hệ thống chuyển đổi tuần tự dữ liệu dựa trên hàng, lưu trữ hiệu quả dữ liệu có cấu trúc ở định dạng nhị phân nhỏ gọn, được xác định theo giản đồ, cung cấp khả năng phát triển giản đồ, tính trung lập về ngôn ngữ và khả năng nén cao để xử lý dữ liệu quy mô lớn.
  1. Tạo tệp client.properties và điền biến môi trường cho địa chỉ khởi động của máy chủ kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Tải tệp process_iot.pyclient.properties lên bộ chứa Google Cloud Storage để có thể sử dụng các tệp này trong công việc Dataproc.
gsutil cp process_iot.py client.properties $BUCKET
  1. Sao chép một số tệp jar phần phụ thuộc cho công việc Dataproc vào bộ chứa GCS. Thư mục này chứa các tệp jar cần thiết để chạy các tác vụ Spark Streaming bằng Kafka, cũng như thư viện xác thực Kafka được quản lý và các phần phụ thuộc của thư viện này, lấy từ phần Thiết lập máy khách.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. Gửi công việc Spark đến cụm Dataproc.
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

Nhật ký trình điều khiển Spark sẽ được in. Bạn cũng có thể thấy các bảng này được ghi vào nhật ký trên bảng điều khiển và dữ liệu được lưu trữ trong vùng chứa GCS.

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. Dọn dẹp

Làm theo các bước để dọn dẹp tài nguyên sau khi hoàn tất lớp học lập trình.

  1. Xoá cụm Kafka được quản lý, VM GCE của Nhà xuất bản và cụm Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. Xoá mạng và mạng con VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. Xoá bộ chứa GCS nếu bạn không muốn sử dụng dữ liệu nữa.
gcloud storage rm --recursive $BUCKET

9. Xin chúc mừng

Xin chúc mừng! Bạn đã xây dựng thành công một quy trình xử lý dữ liệu IoT bằng Manage Kafka và Dataproc, giúp DemoIOT Solutions thu thập thông tin chi tiết theo thời gian thực về dữ liệu do các thiết bị của họ xuất bản!

Bạn đã tạo một cụm Kafka được quản lý, xuất bản các sự kiện IoT vào cụm đó và chạy một công việc Dataproc sử dụng tính năng phát trực tuyến Spark để xử lý các sự kiện này theo thời gian thực. Giờ đây, bạn đã biết các bước chính cần thiết để tạo quy trình dữ liệu bằng Managed Kafka và Dataproc.

Tài liệu tham khảo