Обработка данных IoT в реальном времени с использованием Dataproc и Google Managed Service для Apache Kafka

Обработка данных IoT в реальном времени с использованием Dataproc и Google Managed Service для Apache Kafka

О практической работе

subjectПоследнее обновление: июн. 16, 2025
account_circleАвторы: Devanshi Khatsuriya

1. Введение

2efacab8643a653b.png

Последнее обновление: 2024-06-10

Фон

Устройства Интернета вещей (IoT), от решений для умного дома до промышленных датчиков, генерируют огромные объемы данных на границе сети. Эти данные бесценны для различных вариантов использования, таких как мониторинг устройств, отслеживание, диагностика, наблюдение, персонализация, оптимизация автопарка и многое другое. Google Managed Service для Apache Kafka предлагает масштабируемый и надежный способ приема и хранения этого непрерывного потока данных в OSS-совместимом, простом в использовании и безопасном виде, в то время как Google Cloud Dataproc позволяет обрабатывать эти большие наборы данных для аналитики данных с использованием кластеров Apache Spark и Hadoop.

Что вы построите

В этой кодовой лаборатории вы собираетесь построить конвейер обработки данных IoT с использованием Google Managed Service для Apache Kafka, Dataproc, Python и Apache Spark, который выполняет аналитику в реальном времени. Ваш конвейер будет:

  • Публикация данных с устройств IoT в управляемом кластере Kafka с использованием виртуальных машин GCE
  • Передача данных из кластера Manage Kafka в кластер Dataproc
  • Обработка данных с использованием задания Dataproc Spark Streaming

Чему вы научитесь

  • Как создать кластеры Kafka и Dataproc, управляемые Google
  • Как запускать потоковые задания с помощью Dataproc

Что вам понадобится

2. Обзор

Для этой кодовой лаборатории давайте проследим историю фиктивной компании DemoIOT Solutions. DemoIOT Solutions предоставляет сенсорные устройства, которые измеряют и передают данные о температуре, влажности, давлении, уровне освещенности и местоположении. Они хотели бы настроить конвейеры, которые обрабатывают эти данные, чтобы показывать статистику в реальном времени своим клиентам. Используя такие конвейеры, они могут предоставлять своим клиентам широкий спектр услуг, таких как мониторинг, автоматизированные предложения, оповещения и аналитика о местах, где клиенты установили свои датчики.

Для этого мы будем использовать GCE VM для имитации устройства IoT. Устройство будет публиковать данные в теме Kafka в кластере Google Managed Kafka, которые будут считываться и обрабатываться потоковым заданием Dataproc. Настройка предварительных условий и следующие страницы приведут вас к выполнению всех этих шагов.

Предварительная настройка

  1. Найдите название и номер проекта для вашего проекта. См. раздел Найти название, номер и идентификатор проекта для справки.
  2. Подсеть VPC. Это позволит обеспечить связь между GCE VM, кластером Kafka и кластером Dataproc. Следуйте этому , чтобы получить список существующих подсетей с помощью gcloud CLI. При необходимости следуйте этому, чтобы создать сеть VPC в автоматическом режиме , которая создаст сеть VPC с подсетью в каждом регионе Google Cloud. Хотя для целей этой кодовой лаборатории мы будем использовать подсеть только из одного региона.
  • В этой подсети убедитесь, что есть правило брандмауэра, разрешающее весь входящий трафик с tcp:22, что требуется SSH. Это правило будет доступно для выбора в разделе правил брандмауэра при создании сети, поэтому убедитесь, что вы выбрали его.
  1. GCS bucket. Вам понадобится доступ к Google Cloud storage bucket для хранения ресурсов задания Dataproc и сохранения обработанных данных. Если у вас его нет, вы можете создать его в своем проекте GCP.

Заполнить переменные среды

В терминале, где вы запускаете gcloud CLI, заполните эти переменные среды, чтобы их можно было использовать позже.

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

Заменить следующее:

  • <project-id> на имя настроенного вами проекта GCP.
  • <project-number> с названием номера проекта из предварительного шага 1.
  • <region> с названием региона из Доступные регионы и зоны , которые вы хотите использовать. Например, мы можем использовать us-central1 .
  • <zone> с именем зоны из Доступные регионы и зоны в ранее выбранном вами регионе. Например, если вы выбрали us-central1 в качестве региона, вы можете использовать us-central1-f в качестве зоны. Эта зона будет использоваться для создания GCE VM, которая имитирует устройства IoT. Убедитесь, что ваша зона находится в выбранном вами регионе .
  • <subnet-path> с полным путем подсети из шага 2 предварительного условия. Значение должно быть в формате: projects/<project-id>/regions/<region>/subnetworks/<subnet-name> .
  • <bucket-name> с именем контейнера GCS из предварительного шага 3.

3. Настройка управляемого Google Kafka

В этом разделе настраивается кластер Google Managed Kafka, который развертывает сервер Kafka и создает тему на этом сервере, где данные IoT могут быть опубликованы и прочитаны после подписки на них. DemoIOT Solutions может настроить этот кластер так, чтобы все их устройства публиковали в нем данные.

Создать управляемый кластер Kafka

  • Создайте управляемый кластер Kafka. Здесь имя кластера — kafka-iot .
gcloud managed-kafka clusters create kafka-iot \
   
--project=$PROJECT_ID \
   
--location=$REGION \
   
--cpu=3 \
   
--memory=12GiB \
   
--subnets=$SUBNET_PATH \
   
--auto-rebalance

Вы должны получить ответ, подобный следующему:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                        
Created cluster [kafka-iot].

Создание кластера занимает 20-30 минут. Дождитесь завершения этой операции.

Создать тему

  • Создайте тему Managed Kafka на кластере. Здесь имя темы — kafka-iot-topic .
gcloud managed-kafka topics create kafka-iot-topic \
   
--cluster=kafka-iot \
   
--location=$REGION \
   
--partitions=10 \
   
--replication-factor=3

Вы должны получить вывод, подобный следующему:

Created topic [kafka-iot-topic].

4. Создать издателя

Для публикации в кластере Managed Kafka мы настраиваем экземпляр виртуальной машины Google Compute Engine, который может получить доступ к VPC, содержащему подсеть, используемую кластером Managed Kafka. Эта виртуальная машина имитирует сенсорные устройства, предоставляемые DemoIOT Solutions.

Шаги

  1. Создайте экземпляр Google Compute Engine VM. Здесь имя GCE VM — publisher-instance .
gcloud compute instances create publisher-instance \
   
--scopes=https://www.googleapis.com/auth/cloud-platform \
   
--subnet=$SUBNET_PATH \
   
--zone=$ZONE
  1. Предоставьте учетной записи службы Google Compute Engine по умолчанию разрешения на использование управляемой службы для Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. Используйте SSH для подключения к виртуальной машине. В качестве альтернативы используйте Google Cloud Console для SSH .
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. Установите Java для запуска инструментов командной строки Kafka и загрузите двоичный файл Kafka с помощью этих команд.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. Загрузите библиотеку аутентификации Managed Kafka и ее зависимости, а также настройте свойства клиента Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

Более подробную информацию о настройке машины-издателя см. в разделе Настройка клиентской машины .

5. Опубликовать в управляемом Kafka

Теперь, когда издатель настроен, мы можем использовать командную строку Kafka для публикации некоторых фиктивных данных из виртуальной машины GCE (имитирующей устройства IoT от DemoIOT Solutions) в управляемом кластере Kafka.

  1. Поскольку мы подключились по SSH к экземпляру виртуальной машины GCE, нам необходимо повторно заполнить переменную PROJECT_ID :
export PROJECT_ID=<project-id>
export REGION=<region>

Заменить следующее:

  • <project-id> на имя настроенного вами проекта GCP.
  • <region> с регионом, в котором был создан кластер Kafka
  1. Используйте команду managed-kafka clusters describe , чтобы получить IP-адрес сервера начальной загрузки Kafka. Этот адрес можно использовать для подключения к кластеру Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Перечислите темы в кластере:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

Вы должны увидеть следующий вывод, содержащий тему kafka-iot-topic мы создали ранее.

__remote_log_metadata
kafka
-iot-topic
  1. Скопируйте и вставьте этот скрипт в новый файл publish_iot_data.sh . Чтобы создать новый файл на виртуальной машине GCE, вы можете использовать такой инструмент, как vim или nano .
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data
.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data
() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data
() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status
() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka
() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

Объяснение

  • Этот скрипт создает сообщения JSON с имитированными показаниями датчика, которые содержат идентификатор устройства, временную метку, данные датчика (температура, влажность, давление, освещенность), информацию о местоположении (широта, долгота), состояние устройства (батарея, сигнал, тип подключения) и некоторые метаданные.
  • Он генерирует непрерывный поток сообщений от заданного количества уникальных устройств, каждое из которых отправляет данные с заданным интервалом времени, имитируя реальные устройства IoT. Здесь мы публикуем данные от 10 устройств, каждое из которых производит 20 показаний с интервалом времени 10 секунд.
  • Он также публикует все сгенерированные данные в теме Kafka с помощью инструмента командной строки Kafka Producer.
  1. Установите некоторые зависимости, используемые скриптом — пакет bc для математических вычислений и пакет jq для обработки JSON.
sudo apt-get install bc jq
  1. Измените скрипт так, чтобы он был исполняемым, и запустите его. Выполнение должно занять около 2 минут.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

Вы можете проверить, что события были успешно опубликованы, выполнив эту команду, которая выведет все события. Нажмите <control-c> для выхода.

kafka-console-consumer.sh \
   
--topic kafka-iot-topic \
   
--from-beginning \
   
--bootstrap-server $BOOTSTRAP \
   
--consumer.config client.properties

6. Настройте кластер Dataproc

Этот раздел создает кластер Dataproc в подсети VPC, где присутствует кластер Managed Kafka. Этот кластер будет использоваться для запуска заданий, которые генерируют статистику и информацию в реальном времени, необходимые для DemoIOT Solutions.

  1. Создайте кластер Dataproc. Здесь имя кластера — dataproc-iot .
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

Вы должны получить ответ, подобный следующему:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

Создание кластера может занять 10-15 минут. Дождитесь успешного завершения этой операции и проверьте, что кластер находится в состоянии RUNNING , описав кластер.

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. Обработка сообщений Kafka с помощью Dataproc

В этом последнем разделе вы отправите задание Dataproc, которое обрабатывает опубликованные сообщения с помощью Spark Streaming . Это задание фактически генерирует некоторую статистику и информацию в реальном времени, которые могут быть использованы DemoIOT Solutions.

  1. Запустите эту команду, чтобы создать локальный файл потокового задания PySpark с именем process_iot.py .
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
   
StructField("device_id", StringType()),
   
StructField("timestamp", StringType()),
   
StructField(
       
"location",
       
StructType([
           
StructField("latitude", FloatType()),
           
StructField("longitude", FloatType()),
       
]),
   
),
   
StructField(
       
"sensor_data",
       
StructType([
           
StructField("temperature", FloatType()),
           
StructField("humidity", FloatType()),
           
StructField("pressure", FloatType()),
           
StructField("light_level", IntegerType()),
       
]),
   
),
   
StructField(
       
"device_status",
       
StructType([
           
StructField("battery_level", IntegerType()),
           
StructField("signal_strength", IntegerType()),
           
StructField("connection_type", StringType()),
       
]),
   
),
   
StructField(
       
"metadata",
       
StructType([
           
StructField("sensor_type", StringType()),
           
StructField("unit_temperature", StringType()),
           
StructField("unit_humidity", StringType()),
           
StructField("unit_pressure", StringType()),
           
StructField("unit_light_level", StringType()),
           
StructField("firmware_version", StringType()),
       
]),
   
),
])
CLIENT_PROPERTY_KEYS = [
   
"security.protocol",
   
"sasl.mechanism",
   
"sasl.login.callback.handler.class",
   
"sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
   
# Parse client properties file
   
parsed_path = urlparse(client_properties_path)
   
if parsed_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

   
bucket_name = parsed_path.netloc
   
blob_name = parsed_path.path.lstrip("/")

   
client = storage.Client()
   
bucket = client.bucket(bucket_name)
   
blob = bucket.blob(blob_name)
   
file_content = "[DEFAULT]\n" + blob.download_as_text()

   
config = configparser.ConfigParser()
   
config.read_string(file_content)

   
client_properties = dict()
   
for key in CLIENT_PROPERTY_KEYS:
       
client_properties[key] = config.get("DEFAULT", key)

   
print(f"Client properties: {client_properties}")
   
return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
   
print("Starting initial data processing...")
   
initial_rows = (
       
spark.readStream.format("kafka")
       
.option("kafka.bootstrap.servers", bootstrap_server_address)
       
.option("startingOffsets", "earliest")
       
.option("subscribe", "kafka-iot-topic")
       
.option("kafka.security.protocol", client_properties["security.protocol"])
       
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
       
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
       
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
       
.load()
   
)

   
initial_rows = (
       
initial_rows.selectExpr("CAST(value AS STRING)")
       
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
       
.select("data.*")
   
)
   
   
# Print first 20 rows
   
def print_top_rows(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
           
batch_df.limit(20).show(truncate=False)

   
initial_query_print = initial_rows.writeStream \
       
.foreachBatch(print_top_rows) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_print)

   
# Calculate and print average temperatures
   
def process_initial_avg_temp(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
           
current_averages = (
               
batch_df.select("device_id", "sensor_data.temperature")
               
.groupBy("device_id")
               
.agg(avg("temperature").alias("average_temperature"))
               
.orderBy("device_id")
           
)
           
current_averages.show(truncate=False)

   
initial_query_avg_temp = initial_rows.writeStream \
       
.foreachBatch(process_initial_avg_temp) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_avg_temp)
   
   
# Write data to GCS
   
initial_data_gcs_writer = (
       
initial_rows.writeStream.outputMode("append")
       
.format("avro")
       
.option("path", store_data_gcs_path+"/tables/iot_avro/")
       
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
       
.trigger(once=True) \
       
.start()
   
)
   
queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

   
client_properties = get_client_properties(client_properties_path)

   
# Create SparkSession
   
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
   
   
queries_to_await = []

   
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
   
   
# Wait for all queries to terminate
   
for query in queries_to_await:
       
try:
           
query.awaitTermination()
       
except Exception as e:
           
print(f"Error awaiting query: {e}")
       
finally:
           
query.stop()

   
spark.stop()

if __name__ == "__main__":
   
if len(sys.argv) < 4:
       
print("Invalid number of arguments passed ", len(sys.argv))
       
print(
           
"Usage: ",
           
sys.argv[0],
           
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
       
)
       
raise ValueError("Invalid number of arguments passed.")

   
parsed_data_path = urlparse(sys.argv[3])
   
if parsed_data_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


   
main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

Объяснение

  • Этот код настраивает задание PySpark Structured Streaming для чтения данных из указанной темы Kafka. Он использует предоставленный адрес начальной загрузки сервера Kafka и конфигурации Kafka, загруженные из файла конфигурации GCS, для подключения и аутентификации с брокером Kafka.
  • Сначала он считывает необработанные данные из Kafka как поток байтовых массивов, преобразует эти байтовые массивы в строки и применяет json_schema , используя StructType Spark, чтобы указать структуру данных (идентификатор устройства, временная метка, местоположение, данные датчика и т. д.).
  • Он выводит первые 10 строк на консоль для предварительного просмотра, вычисляет среднюю температуру на датчик и записывает все данные в контейнер GCS в формате avro . Avro — это система сериализации данных на основе строк, которая эффективно хранит структурированные данные в компактном двоичном формате, определяемом схемой, предлагая эволюцию схемы, нейтральность к языку и высокую степень сжатия для крупномасштабной обработки данных.
  1. Создайте файл client.properties и заполните переменную среды для адреса начальной загрузки сервера kafka.
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Загрузите файлы process_iot.py и client.properties в хранилище Google Cloud Storage, чтобы их можно было использовать в задании Dataproc.
gsutil cp process_iot.py client.properties $BUCKET
  1. Скопируйте несколько jar-файлов зависимостей для задания Dataproc в контейнер GCS. Этот каталог содержит jar-файлы, необходимые для запуска заданий Spark Streaming с Kafka, а также библиотеку аутентификации Managed Kafka и ее зависимости, взятые из Set up a client machine .
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. Отправьте задание Spark в кластер Dataproc.
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

Журналы драйвера Spark будут распечатаны. Вы также сможете увидеть эти таблицы, записанные в консоль, и данные, сохраненные в вашем контейнере GCS.

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. Очистить

После завершения кодовой лаборатории выполните следующие действия по очистке ресурсов.

  1. Удалите управляемый кластер Kafka, виртуальную машину Publisher GCE и кластер Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. Удалите подсеть и сеть VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. Удалите свой контейнер GCS, если вы больше не хотите использовать данные.
gcloud storage rm --recursive $BUCKET

9. Поздравления

Поздравляем, вы успешно создали конвейер обработки данных IoT с помощью Manage Kafka и Dataproc, который помогает DemoIOT Solutions получать информацию в режиме реального времени о данных, публикуемых их устройствами!

Вы создали кластер Managed Kafka, опубликовали в нем события IoT и запустили задание Dataproc, которое использовало потоковую передачу Spark для обработки этих событий в реальном времени. Теперь вы знаете основные шаги, необходимые для создания конвейеров данных с использованием Managed Kafka и Dataproc.

Справочные документы