Przetwarzanie danych IoT w czasie rzeczywistym za pomocą Dataproc i usługi zarządzanej Google dla Apache Kafka
Informacje o tym ćwiczeniu (w Codelabs)
1. Wprowadzenie
Ostatnia aktualizacja: 2024-06-10
Informacje ogólne
Urządzenia internetu rzeczy (IoT), od rozwiązań inteligentnego domu po czujniki przemysłowe, generują ogromne ilości danych na obrzeżach sieci. Te dane są nieocenione w różnych zastosowaniach, takich jak monitorowanie urządzeń, śledzenie, diagnostyka, nadzór, personalizacja, optymalizacja floty itp. Usługa zarządzana Google dla Apache Kafka zapewnia skalowalny i trwały sposób przechwytywania i przechowywania ciągłego strumienia danych w sposób zgodny z OSS, łatwy w użyciu i bezpieczny, a Google Cloud Dataproc umożliwia przetwarzanie tych dużych zbiorów danych na potrzeby analizy danych za pomocą klastrów Apache Spark i Hadoop.
Co utworzysz
W tym laboratorium kodu zbudujesz potok przetwarzania danych IoT za pomocą usługi zarządzanej Google dla Apache Kafka, Dataproc, Python i Apache Spark, która wykonuje analizę w czasie rzeczywistym. Twój potok:
- Publikowanie danych z urządzeń IoT w klastrze Managed Kafka za pomocą maszyn wirtualnych GCE
- Przesyłanie danych z klastra zarządzanego Kafka do klastra Dataproc
- Przetwarzanie danych za pomocą zadania Dataproc Spark Streaming
Czego się nauczysz
- Jak utworzyć klastry zarządzanej usługi Google Kafka i Dataproc
- Jak uruchamiać zadania strumieniowe za pomocą Dataproc
Czego potrzebujesz
- Aktywny projekt GCP z ustawionym projektem. Jeśli go nie masz, możesz zarejestrować się na bezpłatny okres próbny.
- Zainstalowany i skonfigurowany interfejs wiersza poleceń gcloud. Możesz postępować zgodnie z instrukcjami instalowania gcloud CLI w swoim systemie operacyjnym.
- Włącz interfejsy API Kafka zarządzana przez Google i Dataproc w projekcie GCP.
2. Omówienie
W tym ćwiczeniu z programowania poznasz fikcyjną firmę DemoIOT Solutions. DemoIOT Solutions dostarcza urządzenia z czujnikami, które mierzą i przekazują dane o temperaturze, wilgotności, ciśnieniu, natężeniu oświetlenia i lokalizacji. Chcą skonfigurować potoki, które przetwarzają te dane, aby wyświetlać klientom statystyki w czasie rzeczywistym. Dzięki tym systemom mogą oni świadczyć klientom wiele usług, takich jak monitorowanie, automatyczne sugestie, alerty i statystyki dotyczące miejsc, w których klienci zainstalowali czujniki.
W tym celu użyjemy maszyny wirtualnej GCE do symulowania urządzenia IoT. Urządzenie będzie publikować dane w temacie Kafka w klastrze Kafka zarządzanym przez Google, który będzie odczytywany i przetwarzany przez zadanie strumieniowania Dataproc. Na stronie z wymaganiami wstępnymi i na kolejnych stronach znajdziesz instrukcje wykonywania tych czynności.
Konfiguracja wstępna
- Znajdź nazwę i numer projektu. Więcej informacji znajdziesz w artykule Znajdowanie nazwy, numeru i identyfikatora projektu.
- Podsieć VPC. Umożliwi to połączenie maszyny wirtualnej GCE, klastra Kafka i klastra Dataproc. Aby wyświetlić istniejące podsieci za pomocą interfejsu wiersza poleceń gcloud, wykonaj te czynności. W razie potrzeby wykonaj instrukcje tworzenia sieci VPC w trybie automatycznym, aby utworzyć sieć VPC z podsiecią w każdym regionie Google Cloud. W tym przypadku użyjemy jednak podsieci tylko z jednego regionu.
- W tej podsieci sprawdź, czy jest reguła zapory sieciowej zezwalająca na wszystkie połączenia przychodzące z adresu tcp:22, który jest wymagany do SSH. Podczas tworzenia sieci ta reguła będzie dostępna w sekcji Reguły zapory sieciowej. Pamiętaj, aby ją wybrać.
- Zasobnik GCS. Aby przechowywać zasoby zadań Dataproc i przetworzone dane, musisz mieć dostęp do zasobnika Cloud Storage. Jeśli go nie masz, możesz go utworzyć w projekcie GCP.
Wypełnianie zmiennych środowiskowych
W terminalu, w którym uruchamiasz interfejs wiersza poleceń gcloud, wypełnij te zmienne środowiskowe, aby można było ich używać później.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
Zastąp następujące elementy:
<project-id>
z nazwą skonfigurowanego przez Ciebie projektu GCP.<project-number>
nazwą numeru projektu z poziomu kroku 1 z warunków wstępnych.<region>
z nazwą regionu z dostępnych regionów i stref, którego chcesz użyć. Możemy na przykład użyćus-central1
.<zone>
nazwą strefy z Dostępnych regionów i stref w wybranym wcześniej regionie. Jeśli np. jako region wybierzeszus-central1
, jako strefę możesz użyćus-central1-f
. Ta strefa zostanie użyta do utworzenia maszyny wirtualnej GCE, która symuluje urządzenia IoT. Upewnij się, że strefa znajduje się w wybranym regionie.<subnet-path>
z pełną ścieżką podsieci z kroku 2. Wartość musi mieć format:projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
.<bucket-name>
z nazwą zasobnika GCS z kroku 3 w sekcji Wymagania wstępne.
3. Konfigurowanie usługi zarządzanej Kafka przez Google
W tej sekcji konfigurujesz klaster Kafka zarządzany przez Google, który wdraża serwer Kafka i tworzy na nim temat, z którego po subskrypcji można publikować i odczytywać dane IoT. DemoIOT Solutions może skonfigurować ten klaster tak, aby wszystkie jego urządzenia publikowały w nim dane.
Tworzenie klastra Managed Kafka
- Utwórz klaster zarządzany Kafka. Nazwa klastra to
kafka-iot
.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
Powinna wyświetlić się odpowiedź podobna do tej:
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
Tworzenie klastra trwa 20–30 minut. Zaczekaj na zakończenie tej operacji.
Tworzenie tematu
- Utwórz temat Managed Kafka na klastrze. W tym przypadku nazwa tematu to
kafka-iot-topic
.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
Powinny pojawić się dane wyjściowe podobne do tych:
Created topic [kafka-iot-topic].
4. Konfigurowanie serwera publikującego
Aby publikować dane w klastrze zarządzanym Kafka, skonfigurowaliśmy instancję maszyny wirtualnej Google Compute Engine, która może uzyskiwać dostęp do VPC zawierającego podsieć używaną przez klaster zarządzany Kafka. Wirtualna maszyna symuluje urządzenia z czujnikiem dostarczane przez DemoIOT Solutions.
Kroki
- Utwórz instancję maszyny wirtualnej Google Compute Engine. Tutaj nazwa maszyny wirtualnej GCE to
publisher-instance
.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Przydziel domyślnemu kontu usługi Google Compute Engine uprawnienia do korzystania z usługi zarządzanej Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- Użyj SSH, aby połączyć się z maszyną wirtualną. Możesz też użyć konsoli Google Cloud do połączenia SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Zainstaluj Java, aby uruchamiać narzędzia wiersza poleceń Kafka, i pobierz plik binarny Kafka za pomocą tych poleceń.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- Pobierz bibliotekę uwierzytelniania Managed Kafka i jej zależności oraz skonfiguruj właściwości klienta Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
Więcej informacji o konfigurowaniu komputera wydawcy znajdziesz w artykule Konfigurowanie komputera klienta.
5. Publikowanie w usłudze Managed Kafka
Teraz, gdy usługa wydawcy jest skonfigurowana, możemy użyć w niej wiersza poleceń Kafka, aby opublikować fikcyjne dane z VM GCE (symulowanie urządzeń IoT przez DemoIOT Solutions) na klastrze zarządzanym Kafka.
- Ponieważ połączyliśmy się przez SSH z instancją maszyny wirtualnej GCE, musimy ponownie wypełnić zmienną
PROJECT_ID
:
export PROJECT_ID=<project-id>
export REGION=<region>
Zastąp następujące elementy:
<project-id>
z nazwą skonfigurowanego przez Ciebie projektu GCP.<region>
z regionem, w którym utworzono klaster Kafka
- Użyj polecenia
managed-kafka clusters describe
, aby uzyskać adres IP serwera wczytywania Kafka. Za pomocą tego adresu możesz połączyć się z klasterem Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Wymień tematy w klastrze:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
Powinny się wyświetlić dane wyjściowe zawierające utworzony wcześniej temat kafka-iot-topic
.
__remote_log_metadata
kafka-iot-topic
- Skopiuj i wklej ten skrypt do nowego pliku
publish_iot_data.sh
. Aby utworzyć nowy plik na maszynie wirtualnej GCE, możesz użyć narzędzia takiego jakvim
lubnano
.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
Wyjaśnienie
- Ten skrypt tworzy wiadomości JSON z symulowanymi odczytami czujników, które zawierają identyfikator urządzenia, sygnaturę czasową, dane czujnika (temperaturę, wilgotność, ciśnienie, światło), informacje o lokalizacji (szerokość i długość geograficzna), stan urządzenia (bateria, sygnał, typ połączenia) oraz niektóre metadane.
- Generuje on nieprzerwany strumień wiadomości z określonej liczby unikalnych urządzeń, z których każde wysyła dane w określonym odstępie czasu, naśladując w ten sposób działanie rzeczywistych urządzeń IoT. Publikujemy tu dane z 10 urządzeń, z których każde generuje 20 wyników w interwale 10 sekund.
- Publikuje on również wszystkie wygenerowane dane w temacie Kafka za pomocą narzędzia wiersza poleceń producenta Kafka.
- Zainstaluj niektóre zależności używane przez skrypt – pakiet
bc
do obliczeń matematycznych i pakietjq
do przetwarzania danych JSON.
sudo apt-get install bc jq
- Zmodyfikuj skrypt, aby był wykonywalny, i uruchom go. Jego wykonanie zajmie około 2 minut.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
Aby sprawdzić, czy zdarzenia zostały opublikowane, uruchom to polecenie, które wydrukuje wszystkie zdarzenia. Aby zamknąć okno, naciśnij <control-c>
.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Konfigurowanie klastra Dataproc
W tej sekcji utworzysz klaster Dataproc w podsieci VPC, w której znajduje się zarządzany klaster Kafka. Ten klaster będzie używany do wykonywania zadań, które generują statystyki i statystyki w czasie rzeczywistym potrzebne do działania DemoIOT Solutions.
- Utwórz klaster Dataproc. Nazwa klastra to
dataproc-iot
.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
Powinna wyświetlić się odpowiedź podobna do tej:
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
Tworzenie klastra może potrwać 10–15 minut. Zaczekaj na zakończenie tej operacji i sprawdź, czy klaster jest w stanie RUNNING
, opisując go.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Przetwarzanie wiadomości Kafka za pomocą Dataproc
W tej ostatniej sekcji prześlesz zadanie Dataproc, które przetwarza opublikowane wiadomości za pomocą Spark Streaming. To zadanie generuje statystyki i trendy w czasie rzeczywistym, których można używać w DemoIOT Solutions.
- Uruchom to polecenie, aby utworzyć lokalnie plik zadania PySpark do strumieniowego przesyłania o nazwie
process_iot.py
.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
Wyjaśnienie
- Ten kod konfiguruje zadanie PySpark Structured Streaming do odczytywania danych z określonego tematu usługi Kafka. Do nawiązywania połączenia i uwierzytelniania się na brokerze Kafka używa adresu bootstrap serwera Kafka i konfiguracji Kafka załadowanych z pliku konfiguracyjnego GCS.
- Najpierw odczytuje dane nieprzetworzone z Kafka jako strumień tablic bajtów, a potem rzutuje te tablice bajtów na ciągi znaków. Następnie stosuje funkcję
json_schema
, używając typu strukturalnego Sparka, aby określić strukturę danych (identyfikator urządzenia, sygnaturę czasową, lokalizację, dane czujnika itp.). - Wypisuje pierwsze 10 wierszy w konsoli na potrzeby podglądu, oblicza średnią temperaturę dla każdego czujnika i zapisuje wszystkie dane w zasośniku GCS w formacie
avro
. Avro to system serializacji danych oparty na wierszach, który efektywnie przechowuje uporządkowane dane w skompresowanym formacie binarnym zdefiniowanym przez schemat. Zapewnia on ewolucję schematu, neutralność językową i wysoką kompresję na potrzeby przetwarzania danych na dużą skalę.
- Utwórz plik
client.properties
i wypełnij zmienną środowiskową adresem bootstrap serwera Kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Prześlij pliki
process_iot.py
iclient.properties
do zasobnika Google Cloud Storage, aby można było ich używać w zadaniu Dataproc.
gsutil cp process_iot.py client.properties $BUCKET
- Skopiuj do zasobnika GCS niektóre pliki JAR z zależnościami do zadania Dataproc. Ten katalog zawiera pliki jar wymagane do uruchamiania zadań Spark Streaming z Kafka oraz bibliotekę uwierzytelniania Managed Kafka i jej zależności, które pochodzą z konfiguracji maszyny klienta.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Prześlij zadanie Spark do klastra Dataproc.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
Logi sterownika Spark zostaną wydrukowane. Te tabele powinny być też widoczne w konsoli i dane powinny być przechowywane w zasobniku GCS.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. Czyszczenie danych
Po zakończeniu pracy z tym ćwiczeniem usuń zasoby.
- Usuń zarządzany klaster Kafka, maszynę wirtualną GCE wydawcy i klaster Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- Usuń sieć i podsieć VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- Usuń zasobnik GCS, jeśli nie chcesz już używać danych.
gcloud storage rm --recursive $BUCKET
9. Gratulacje
Gratulacje! Udało Ci się utworzyć potok przetwarzania danych IoT za pomocą Manage Kafka i Dataproc, który pomaga firmie DemoIOT Solutions uzyskiwać w czasie rzeczywistym statystyki dotyczące danych publikowanych przez jej urządzenia.
Utworzyłeś klaster zarządzanej usługi Kafka, opublikował w nim zdarzenia IoT i uruchomił zadanie Dataproc, które używało strumieniowego przetwarzania Spark do przetwarzania tych zdarzeń w czasie rzeczywistym. Znasz już najważniejsze kroki wymagane do tworzenia ścieżek danych przy użyciu usługi Managed Kafka i Dataproc.