Przetwarzanie danych IoT w czasie rzeczywistym za pomocą Dataproc i usługi zarządzanej Google dla Apache Kafka

Przetwarzanie danych IoT w czasie rzeczywistym za pomocą Dataproc i usługi zarządzanej Google dla Apache Kafka

Informacje o tym ćwiczeniu (w Codelabs)

subjectOstatnia aktualizacja: cze 16, 2025
account_circleAutorzy: Devanshi Khatsuriya

1. Wprowadzenie

2efacab8643a653b.png

Ostatnia aktualizacja: 2024-06-10

Informacje ogólne

Urządzenia internetu rzeczy (IoT), od rozwiązań inteligentnego domu po czujniki przemysłowe, generują ogromne ilości danych na obrzeżach sieci. Te dane są nieocenione w różnych zastosowaniach, takich jak monitorowanie urządzeń, śledzenie, diagnostyka, nadzór, personalizacja, optymalizacja floty itp. Usługa zarządzana Google dla Apache Kafka zapewnia skalowalny i trwały sposób przechwytywania i przechowywania ciągłego strumienia danych w sposób zgodny z OSS, łatwy w użyciu i bezpieczny, a Google Cloud Dataproc umożliwia przetwarzanie tych dużych zbiorów danych na potrzeby analizy danych za pomocą klastrów Apache Spark i Hadoop.

Co utworzysz

W tym laboratorium kodu zbudujesz potok przetwarzania danych IoT za pomocą usługi zarządzanej Google dla Apache Kafka, Dataproc, Python i Apache Spark, która wykonuje analizę w czasie rzeczywistym. Twój potok:

  • Publikowanie danych z urządzeń IoT w klastrze Managed Kafka za pomocą maszyn wirtualnych GCE
  • Przesyłanie danych z klastra zarządzanego Kafka do klastra Dataproc
  • Przetwarzanie danych za pomocą zadania Dataproc Spark Streaming

Czego się nauczysz

  • Jak utworzyć klastry zarządzanej usługi Google Kafka i Dataproc
  • Jak uruchamiać zadania strumieniowe za pomocą Dataproc

Czego potrzebujesz

2. Omówienie

W tym ćwiczeniu z programowania poznasz fikcyjną firmę DemoIOT Solutions. DemoIOT Solutions dostarcza urządzenia z czujnikami, które mierzą i przekazują dane o temperaturze, wilgotności, ciśnieniu, natężeniu oświetlenia i lokalizacji. Chcą skonfigurować potoki, które przetwarzają te dane, aby wyświetlać klientom statystyki w czasie rzeczywistym. Dzięki tym systemom mogą oni świadczyć klientom wiele usług, takich jak monitorowanie, automatyczne sugestie, alerty i statystyki dotyczące miejsc, w których klienci zainstalowali czujniki.

W tym celu użyjemy maszyny wirtualnej GCE do symulowania urządzenia IoT. Urządzenie będzie publikować dane w temacie Kafka w klastrze Kafka zarządzanym przez Google, który będzie odczytywany i przetwarzany przez zadanie strumieniowania Dataproc. Na stronie z wymaganiami wstępnymi i na kolejnych stronach znajdziesz instrukcje wykonywania tych czynności.

Konfiguracja wstępna

  1. Znajdź nazwę i numer projektu. Więcej informacji znajdziesz w artykule Znajdowanie nazwy, numeru i identyfikatora projektu.
  2. Podsieć VPC. Umożliwi to połączenie maszyny wirtualnej GCE, klastra Kafka i klastra Dataproc. Aby wyświetlić istniejące podsieci za pomocą interfejsu wiersza poleceń gcloud, wykonaj te czynności. W razie potrzeby wykonaj instrukcje tworzenia sieci VPC w trybie automatycznym, aby utworzyć sieć VPC z podsiecią w każdym regionie Google Cloud. W tym przypadku użyjemy jednak podsieci tylko z jednego regionu.
  • W tej podsieci sprawdź, czy jest reguła zapory sieciowej zezwalająca na wszystkie połączenia przychodzące z adresu tcp:22, który jest wymagany do SSH. Podczas tworzenia sieci ta reguła będzie dostępna w sekcji Reguły zapory sieciowej. Pamiętaj, aby ją wybrać.
  1. Zasobnik GCS. Aby przechowywać zasoby zadań Dataproc i przetworzone dane, musisz mieć dostęp do zasobnika Cloud Storage. Jeśli go nie masz, możesz go utworzyć w projekcie GCP.

Wypełnianie zmiennych środowiskowych

W terminalu, w którym uruchamiasz interfejs wiersza poleceń gcloud, wypełnij te zmienne środowiskowe, aby można było ich używać później.

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

Zastąp następujące elementy:

  • <project-id> z nazwą skonfigurowanego przez Ciebie projektu GCP.
  • <project-number> nazwą numeru projektu z poziomu kroku 1 z warunków wstępnych.
  • <region> z nazwą regionu z dostępnych regionów i stref, którego chcesz użyć. Możemy na przykład użyć us-central1.
  • <zone> nazwą strefy z Dostępnych regionów i stref w wybranym wcześniej regionie. Jeśli np. jako region wybierzesz us-central1, jako strefę możesz użyć us-central1-f. Ta strefa zostanie użyta do utworzenia maszyny wirtualnej GCE, która symuluje urządzenia IoT. Upewnij się, że strefa znajduje się w wybranym regionie.
  • <subnet-path> z pełną ścieżką podsieci z kroku 2. Wartość musi mieć format: projects/<project-id>/regions/<region>/subnetworks/<subnet-name>.
  • <bucket-name> z nazwą zasobnika GCS z kroku 3 w sekcji Wymagania wstępne.

3. Konfigurowanie usługi zarządzanej Kafka przez Google

W tej sekcji konfigurujesz klaster Kafka zarządzany przez Google, który wdraża serwer Kafka i tworzy na nim temat, z którego po subskrypcji można publikować i odczytywać dane IoT. DemoIOT Solutions może skonfigurować ten klaster tak, aby wszystkie jego urządzenia publikowały w nim dane.

Tworzenie klastra Managed Kafka

  • Utwórz klaster zarządzany Kafka. Nazwa klastra to kafka-iot.
gcloud managed-kafka clusters create kafka-iot \
   
--project=$PROJECT_ID \
   
--location=$REGION \
   
--cpu=3 \
   
--memory=12GiB \
   
--subnets=$SUBNET_PATH \
   
--auto-rebalance

Powinna wyświetlić się odpowiedź podobna do tej:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                        
Created cluster [kafka-iot].

Tworzenie klastra trwa 20–30 minut. Zaczekaj na zakończenie tej operacji.

Tworzenie tematu

  • Utwórz temat Managed Kafka na klastrze. W tym przypadku nazwa tematu to kafka-iot-topic.
gcloud managed-kafka topics create kafka-iot-topic \
   
--cluster=kafka-iot \
   
--location=$REGION \
   
--partitions=10 \
   
--replication-factor=3

Powinny pojawić się dane wyjściowe podobne do tych:

Created topic [kafka-iot-topic].

4. Konfigurowanie serwera publikującego

Aby publikować dane w klastrze zarządzanym Kafka, skonfigurowaliśmy instancję maszyny wirtualnej Google Compute Engine, która może uzyskiwać dostęp do VPC zawierającego podsieć używaną przez klaster zarządzany Kafka. Wirtualna maszyna symuluje urządzenia z czujnikiem dostarczane przez DemoIOT Solutions.

Kroki

  1. Utwórz instancję maszyny wirtualnej Google Compute Engine. Tutaj nazwa maszyny wirtualnej GCE to publisher-instance.
gcloud compute instances create publisher-instance \
   
--scopes=https://www.googleapis.com/auth/cloud-platform \
   
--subnet=$SUBNET_PATH \
   
--zone=$ZONE
  1. Przydziel domyślnemu kontu usługi Google Compute Engine uprawnienia do korzystania z usługi zarządzanej Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. Użyj SSH, aby połączyć się z maszyną wirtualną. Możesz też użyć konsoli Google Cloud do połączenia SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. Zainstaluj Java, aby uruchamiać narzędzia wiersza poleceń Kafka, i pobierz plik binarny Kafka za pomocą tych poleceń.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. Pobierz bibliotekę uwierzytelniania Managed Kafka i jej zależności oraz skonfiguruj właściwości klienta Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

Więcej informacji o konfigurowaniu komputera wydawcy znajdziesz w artykule Konfigurowanie komputera klienta.

5. Publikowanie w usłudze Managed Kafka

Teraz, gdy usługa wydawcy jest skonfigurowana, możemy użyć w niej wiersza poleceń Kafka, aby opublikować fikcyjne dane z VM GCE (symulowanie urządzeń IoT przez DemoIOT Solutions) na klastrze zarządzanym Kafka.

  1. Ponieważ połączyliśmy się przez SSH z instancją maszyny wirtualnej GCE, musimy ponownie wypełnić zmienną PROJECT_ID:
export PROJECT_ID=<project-id>
export REGION=<region>

Zastąp następujące elementy:

  • <project-id> z nazwą skonfigurowanego przez Ciebie projektu GCP.
  • <region> z regionem, w którym utworzono klaster Kafka
  1. Użyj polecenia managed-kafka clusters describe, aby uzyskać adres IP serwera wczytywania Kafka. Za pomocą tego adresu możesz połączyć się z klasterem Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Wymień tematy w klastrze:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

Powinny się wyświetlić dane wyjściowe zawierające utworzony wcześniej temat kafka-iot-topic.

__remote_log_metadata
kafka
-iot-topic
  1. Skopiuj i wklej ten skrypt do nowego pliku publish_iot_data.sh. Aby utworzyć nowy plik na maszynie wirtualnej GCE, możesz użyć narzędzia takiego jak vim lub nano.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data
.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data
() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data
() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status
() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka
() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

Wyjaśnienie

  • Ten skrypt tworzy wiadomości JSON z symulowanymi odczytami czujników, które zawierają identyfikator urządzenia, sygnaturę czasową, dane czujnika (temperaturę, wilgotność, ciśnienie, światło), informacje o lokalizacji (szerokość i długość geograficzna), stan urządzenia (bateria, sygnał, typ połączenia) oraz niektóre metadane.
  • Generuje on nieprzerwany strumień wiadomości z określonej liczby unikalnych urządzeń, z których każde wysyła dane w określonym odstępie czasu, naśladując w ten sposób działanie rzeczywistych urządzeń IoT. Publikujemy tu dane z 10 urządzeń, z których każde generuje 20 wyników w interwale 10 sekund.
  • Publikuje on również wszystkie wygenerowane dane w temacie Kafka za pomocą narzędzia wiersza poleceń producenta Kafka.
  1. Zainstaluj niektóre zależności używane przez skrypt – pakiet bc do obliczeń matematycznych i pakiet jq do przetwarzania danych JSON.
sudo apt-get install bc jq
  1. Zmodyfikuj skrypt, aby był wykonywalny, i uruchom go. Jego wykonanie zajmie około 2 minut.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

Aby sprawdzić, czy zdarzenia zostały opublikowane, uruchom to polecenie, które wydrukuje wszystkie zdarzenia. Aby zamknąć okno, naciśnij <control-c>.

kafka-console-consumer.sh \
   
--topic kafka-iot-topic \
   
--from-beginning \
   
--bootstrap-server $BOOTSTRAP \
   
--consumer.config client.properties

6. Konfigurowanie klastra Dataproc

W tej sekcji utworzysz klaster Dataproc w podsieci VPC, w której znajduje się zarządzany klaster Kafka. Ten klaster będzie używany do wykonywania zadań, które generują statystyki i statystyki w czasie rzeczywistym potrzebne do działania DemoIOT Solutions.

  1. Utwórz klaster Dataproc. Nazwa klastra to dataproc-iot.
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

Powinna wyświetlić się odpowiedź podobna do tej:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

Tworzenie klastra może potrwać 10–15 minut. Zaczekaj na zakończenie tej operacji i sprawdź, czy klaster jest w stanie RUNNING, opisując go.

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. Przetwarzanie wiadomości Kafka za pomocą Dataproc

W tej ostatniej sekcji prześlesz zadanie Dataproc, które przetwarza opublikowane wiadomości za pomocą Spark Streaming. To zadanie generuje statystyki i trendy w czasie rzeczywistym, których można używać w DemoIOT Solutions.

  1. Uruchom to polecenie, aby utworzyć lokalnie plik zadania PySpark do strumieniowego przesyłania o nazwie process_iot.py.
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
   
StructField("device_id", StringType()),
   
StructField("timestamp", StringType()),
   
StructField(
       
"location",
       
StructType([
           
StructField("latitude", FloatType()),
           
StructField("longitude", FloatType()),
       
]),
   
),
   
StructField(
       
"sensor_data",
       
StructType([
           
StructField("temperature", FloatType()),
           
StructField("humidity", FloatType()),
           
StructField("pressure", FloatType()),
           
StructField("light_level", IntegerType()),
       
]),
   
),
   
StructField(
       
"device_status",
       
StructType([
           
StructField("battery_level", IntegerType()),
           
StructField("signal_strength", IntegerType()),
           
StructField("connection_type", StringType()),
       
]),
   
),
   
StructField(
       
"metadata",
       
StructType([
           
StructField("sensor_type", StringType()),
           
StructField("unit_temperature", StringType()),
           
StructField("unit_humidity", StringType()),
           
StructField("unit_pressure", StringType()),
           
StructField("unit_light_level", StringType()),
           
StructField("firmware_version", StringType()),
       
]),
   
),
])
CLIENT_PROPERTY_KEYS = [
   
"security.protocol",
   
"sasl.mechanism",
   
"sasl.login.callback.handler.class",
   
"sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
   
# Parse client properties file
   
parsed_path = urlparse(client_properties_path)
   
if parsed_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

   
bucket_name = parsed_path.netloc
   
blob_name = parsed_path.path.lstrip("/")

   
client = storage.Client()
   
bucket = client.bucket(bucket_name)
   
blob = bucket.blob(blob_name)
   
file_content = "[DEFAULT]\n" + blob.download_as_text()

   
config = configparser.ConfigParser()
   
config.read_string(file_content)

   
client_properties = dict()
   
for key in CLIENT_PROPERTY_KEYS:
       
client_properties[key] = config.get("DEFAULT", key)

   
print(f"Client properties: {client_properties}")
   
return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
   
print("Starting initial data processing...")
   
initial_rows = (
       
spark.readStream.format("kafka")
       
.option("kafka.bootstrap.servers", bootstrap_server_address)
       
.option("startingOffsets", "earliest")
       
.option("subscribe", "kafka-iot-topic")
       
.option("kafka.security.protocol", client_properties["security.protocol"])
       
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
       
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
       
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
       
.load()
   
)

   
initial_rows = (
       
initial_rows.selectExpr("CAST(value AS STRING)")
       
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
       
.select("data.*")
   
)
   
   
# Print first 20 rows
   
def print_top_rows(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
           
batch_df.limit(20).show(truncate=False)

   
initial_query_print = initial_rows.writeStream \
       
.foreachBatch(print_top_rows) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_print)

   
# Calculate and print average temperatures
   
def process_initial_avg_temp(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
           
current_averages = (
               
batch_df.select("device_id", "sensor_data.temperature")
               
.groupBy("device_id")
               
.agg(avg("temperature").alias("average_temperature"))
               
.orderBy("device_id")
           
)
           
current_averages.show(truncate=False)

   
initial_query_avg_temp = initial_rows.writeStream \
       
.foreachBatch(process_initial_avg_temp) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_avg_temp)
   
   
# Write data to GCS
   
initial_data_gcs_writer = (
       
initial_rows.writeStream.outputMode("append")
       
.format("avro")
       
.option("path", store_data_gcs_path+"/tables/iot_avro/")
       
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
       
.trigger(once=True) \
       
.start()
   
)
   
queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

   
client_properties = get_client_properties(client_properties_path)

   
# Create SparkSession
   
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
   
   
queries_to_await = []

   
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
   
   
# Wait for all queries to terminate
   
for query in queries_to_await:
       
try:
           
query.awaitTermination()
       
except Exception as e:
           
print(f"Error awaiting query: {e}")
       
finally:
           
query.stop()

   
spark.stop()

if __name__ == "__main__":
   
if len(sys.argv) < 4:
       
print("Invalid number of arguments passed ", len(sys.argv))
       
print(
           
"Usage: ",
           
sys.argv[0],
           
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
       
)
       
raise ValueError("Invalid number of arguments passed.")

   
parsed_data_path = urlparse(sys.argv[3])
   
if parsed_data_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


   
main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

Wyjaśnienie

  • Ten kod konfiguruje zadanie PySpark Structured Streaming do odczytywania danych z określonego tematu usługi Kafka. Do nawiązywania połączenia i uwierzytelniania się na brokerze Kafka używa adresu bootstrap serwera Kafka i konfiguracji Kafka załadowanych z pliku konfiguracyjnego GCS.
  • Najpierw odczytuje dane nieprzetworzone z Kafka jako strumień tablic bajtów, a potem rzutuje te tablice bajtów na ciągi znaków. Następnie stosuje funkcję json_schema, używając typu strukturalnego Sparka, aby określić strukturę danych (identyfikator urządzenia, sygnaturę czasową, lokalizację, dane czujnika itp.).
  • Wypisuje pierwsze 10 wierszy w konsoli na potrzeby podglądu, oblicza średnią temperaturę dla każdego czujnika i zapisuje wszystkie dane w zasośniku GCS w formacie avro. Avro to system serializacji danych oparty na wierszach, który efektywnie przechowuje uporządkowane dane w skompresowanym formacie binarnym zdefiniowanym przez schemat. Zapewnia on ewolucję schematu, neutralność językową i wysoką kompresję na potrzeby przetwarzania danych na dużą skalę.
  1. Utwórz plik client.properties i wypełnij zmienną środowiskową adresem bootstrap serwera Kafka.
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Prześlij pliki process_iot.py i client.properties do zasobnika Google Cloud Storage, aby można było ich używać w zadaniu Dataproc.
gsutil cp process_iot.py client.properties $BUCKET
  1. Skopiuj do zasobnika GCS niektóre pliki JAR z zależnościami do zadania Dataproc. Ten katalog zawiera pliki jar wymagane do uruchamiania zadań Spark Streaming z Kafka oraz bibliotekę uwierzytelniania Managed Kafka i jej zależności, które pochodzą z konfiguracji maszyny klienta.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. Prześlij zadanie Spark do klastra Dataproc.
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

Logi sterownika Spark zostaną wydrukowane. Te tabele powinny być też widoczne w konsoli i dane powinny być przechowywane w zasobniku GCS.

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. Czyszczenie danych

Po zakończeniu pracy z tym ćwiczeniem usuń zasoby.

  1. Usuń zarządzany klaster Kafka, maszynę wirtualną GCE wydawcy i klaster Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. Usuń sieć i podsieć VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. Usuń zasobnik GCS, jeśli nie chcesz już używać danych.
gcloud storage rm --recursive $BUCKET

9. Gratulacje

Gratulacje! Udało Ci się utworzyć potok przetwarzania danych IoT za pomocą Manage Kafka i Dataproc, który pomaga firmie DemoIOT Solutions uzyskiwać w czasie rzeczywistym statystyki dotyczące danych publikowanych przez jej urządzenia.

Utworzyłeś klaster zarządzanej usługi Kafka, opublikował w nim zdarzenia IoT i uruchomił zadanie Dataproc, które używało strumieniowego przetwarzania Spark do przetwarzania tych zdarzeń w czasie rzeczywistym. Znasz już najważniejsze kroki wymagane do tworzenia ścieżek danych przy użyciu usługi Managed Kafka i Dataproc.

Dokumenty referencyjne