Przetwarzanie danych IoT w czasie rzeczywistym za pomocą Dataproc i usługi zarządzanej Google dla Apache Kafka

1. Wprowadzenie

2efacab8643a653b.png

Last Updated: 2024-06-10

Informacje ogólne

Urządzenia internetu rzeczy (IoT), od rozwiązań dla inteligentnego domu po czujniki przemysłowe, generują ogromne ilości danych na obrzeżach sieci. Te dane są niezwykle cenne w różnych zastosowaniach, takich jak monitorowanie urządzeń, śledzenie, diagnostyka, nadzór, personalizacja, optymalizacja floty i wiele innych. Usługa zarządzana Google dla Apache Kafka zapewnia skalowalny i trwały sposób na pozyskiwanie i przechowywanie tego ciągłego strumienia danych w sposób zgodny z OSS, łatwy w użyciu i bezpieczny, a Google Cloud Dataproc umożliwia przetwarzanie tych dużych zbiorów danych na potrzeby analityki danych za pomocą klastrów Apache Spark i Hadoop.

Co utworzysz

W tym ćwiczeniu utworzysz potok przetwarzania danych IoT za pomocą usługi zarządzanej Google dla Apache Kafka, Dataproc, Pythona i Apache Spark, który przeprowadza analizę w czasie rzeczywistym. Potok:

  • Publikowanie danych z urządzeń IoT w klastrze zarządzanym Kafka za pomocą maszyn wirtualnych GCE
  • Przesyłanie strumieniowe danych z klastra usługi zarządzanej Kafka do klastra Dataproc
  • Przetwarzanie danych za pomocą zadania Spark Streaming w Dataproc

Czego się nauczysz

  • Jak tworzyć klastry Google Managed Kafka i Dataproc
  • Jak uruchamiać zadania strumieniowe za pomocą Dataproc

Czego potrzebujesz

2. Przegląd

W tym ćwiczeniu z programowania będziemy śledzić historię fikcyjnej firmy DemoIOT Solutions. DemoIOT Solutions dostarcza urządzenia z czujnikami, które mierzą i przesyłają dane dotyczące temperatury, wilgotności, ciśnienia, natężenia oświetlenia i lokalizacji. Chcą skonfigurować potoki przetwarzania tych danych, aby wyświetlać klientom statystyki w czasie rzeczywistym. Dzięki takim potokom mogą oferować klientom szeroki zakres usług, takich jak monitorowanie, automatyczne sugestie, alerty i statystyki dotyczące miejsc, w których klienci zainstalowali czujniki.

W tym celu użyjemy maszyny wirtualnej GCE do symulacji urządzenia IoT. Urządzenie będzie publikować dane w temacie Kafka w klastrze Kafka zarządzanym przez Google, które będą odczytywane i przetwarzane przez zadanie strumieniowe Dataproc. Wymagania wstępne i kolejne strony pomogą Ci wykonać wszystkie te czynności.

Konfiguracja wstępna

  1. Znajdź nazwę i numer projektu. Więcej informacji znajdziesz w artykule Znajdowanie nazwy, numeru i identyfikatora projektu.
  2. Podsieć VPC. Umożliwi to połączenie między maszyną wirtualną GCE, klastrem Kafka i klastrem Dataproc. Aby wyświetlić listę istniejących podsieci za pomocą gcloud CLI, wykonaj te czynności. W razie potrzeby wykonaj czynności opisane w artykule tworzenie sieci VPC w trybie automatycznym, aby utworzyć sieć VPC z podsiecią w każdym regionie Google Cloud. Na potrzeby tego ćwiczenia w Codelabs użyjemy jednak podsieci z tylko jednego regionu.
  • W tej podsieci sprawdź, czy istnieje reguła zapory sieciowej zezwalająca na cały ruch przychodzący z tcp:22, który jest wymagany w przypadku SSH. Ta reguła będzie dostępna do wyboru w sekcji Reguły zapory sieciowej podczas tworzenia sieci, więc pamiętaj, aby ją wybrać.
  1. zasobnik GCS. Aby przechowywać zasoby zadań Dataproc i utrwalać przetworzone dane, musisz mieć dostęp do zasobnika Google Cloud Storage. Jeśli go nie masz, możesz go utworzyć w projekcie GCP.

Wypełnianie zmiennych środowiskowych

W terminalu, w którym uruchamiasz gcloud CLI, wypełnij te zmienne środowiskowe, aby można było ich później użyć.

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

Zastąp następujące elementy:

  • <project-id> z nazwą skonfigurowanego projektu GCP.
  • <project-number> nazwą numeru projektu z kroku 1 w sekcji Wymagania wstępne.
  • <region>, gdzie nazwa regionu pochodzi z listy Dostępne regiony i strefy. Możemy na przykład użyć us-central1.
  • <zone> nazwą strefy z sekcji Dostępne regiony i strefy w wybranym wcześniej regionie. Jeśli na przykład jako region wybierzesz us-central1, możesz użyć us-central1-f jako strefy. Ta strefa będzie używana do tworzenia maszyny wirtualnej GCE, która symuluje urządzenia IoT. Upewnij się, że Twoja strefa znajduje się w wybranym regionie.
  • <subnet-path> z pełną ścieżką podsieci z kroku 2 w sekcji Wymagania wstępne. Wartość musi mieć format: projects/<project-id>/regions/<region>/subnetworks/<subnet-name>.
  • <bucket-name> z nazwą zasobnika GCS z kroku 3 w sekcji Wymagania wstępne.

3. Konfigurowanie usługi Google Managed Kafka

W tej sekcji skonfigurujemy klaster Kafka zarządzany przez Google, który wdraża serwer Kafka i tworzy na nim temat, do którego można publikować dane IoT i z którego można je odczytywać po zasubskrybowaniu. Firma DemoIOT Solutions może skonfigurować ten klaster tak, aby wszystkie jej urządzenia publikowały w nim dane.

Tworzenie zarządzanego klastra Kafka

  • Utwórz zarządzany klaster Kafka. W tym przypadku nazwa klastra to kafka-iot.
gcloud managed-kafka clusters create kafka-iot \
    --project=$PROJECT_ID \
    --location=$REGION \
    --cpu=3 \
    --memory=12GiB \
    --subnets=$SUBNET_PATH \
    --auto-rebalance

Powinna pojawić się odpowiedź podobna do tej:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                         
Created cluster [kafka-iot].

Tworzenie klastra trwa 20–30 minut. Poczekaj na zakończenie tej operacji.

Tworzenie tematu

  • Utwórz w klastrze temat Kafka zarządzany. W tym przypadku nazwa tematu to kafka-iot-topic.
gcloud managed-kafka topics create kafka-iot-topic \
    --cluster=kafka-iot \
    --location=$REGION \
    --partitions=10 \
    --replication-factor=3

Wynik powinien wyglądać podobnie do tego:

Created topic [kafka-iot-topic].

4. Konfigurowanie wydawcy

Aby publikować w klastrze Managed Kafka, skonfigurujemy instancję maszyny wirtualnej Google Compute Engine, która ma dostęp do sieci VPC zawierającej podsieć używaną przez klaster Managed Kafka. Ta maszyna wirtualna symuluje urządzenia z czujnikami dostarczane przez DemoIOT Solutions.

Kroki

  1. Utwórz instancję maszyny wirtualnej Google Compute Engine. W tym przypadku nazwa maszyny wirtualnej GCE to publisher-instance.
gcloud compute instances create publisher-instance \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --subnet=$SUBNET_PATH \
    --zone=$ZONE
  1. Przyznaj domyślnemu kontu usługi Google Compute Engine uprawnienia do korzystania z usługi zarządzanej dla Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. Połącz się z maszyną wirtualną za pomocą SSH. Możesz też użyć konsoli Google Cloud do połączenia SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. Zainstaluj Javę, aby uruchamiać narzędzia wiersza poleceń Kafka, i pobierz plik binarny Kafka za pomocą tych poleceń.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. Pobierz bibliotekę uwierzytelniania usługi zarządzanej Kafka i jej zależności oraz skonfiguruj właściwości klienta Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

Więcej informacji o konfigurowaniu komputera wydawcy znajdziesz w artykule Konfigurowanie komputera klienta.

5. Publikowanie w usłudze Managed Kafka

Po skonfigurowaniu wydawcy możemy użyć wiersza poleceń Kafka, aby opublikować w zarządzanym klastrze Kafka przykładowe dane z maszyny wirtualnej GCE (symulującej urządzenia IoT firmy DemoIOT Solutions).

  1. Po nawiązaniu połączenia SSH z instancją maszyny wirtualnej GCE musimy ponownie wypełnić zmienną PROJECT_ID:
export PROJECT_ID=<project-id>
export REGION=<region>

Zastąp następujące elementy:

  • <project-id> z nazwą skonfigurowanego projektu GCP.
  • <region> z regionem, w którym utworzono klaster Kafka
  1. Użyj polecenia managed-kafka clusters describe, aby uzyskać adres IP serwera wczytywania Kafka. Ten adres może służyć do łączenia się z klastrem Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Wyświetl listę tematów w klastrze:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

Powinny się wyświetlić te dane wyjściowe, zawierające utworzony wcześniej temat kafka-iot-topic.

__remote_log_metadata
kafka-iot-topic
  1. Skopiuj ten skrypt i wklej go do nowego pliku publish_iot_data.sh. Aby utworzyć nowy plik na maszynie wirtualnej GCE, możesz użyć narzędzia takiego jak vim lub nano.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

Wyjaśnienie

  • Ten skrypt tworzy wiadomości JSON z symulowanymi odczytami czujników, które zawierają identyfikator urządzenia, sygnaturę czasową, dane z czujników (temperatura, wilgotność, ciśnienie, światło), informacje o lokalizacji (szerokość i długość geograficzna), stan urządzenia (bateria, sygnał, typ połączenia) i niektóre metadane.
  • Generuje ciągły strumień wiadomości z określonej liczby unikalnych urządzeń, z których każde wysyła dane w określonych odstępach czasu, imitując rzeczywiste urządzenia IoT. Publikujemy tu dane z 10 urządzeń, z których każde generuje 20 odczytów w 10-sekundowych odstępach czasu.
  • Publikuje też wszystkie wygenerowane dane w temacie Kafka za pomocą narzędzia wiersza poleceń producenta Kafka.
  1. Zainstaluj niektóre zależności używane przez skrypt: pakiet bc do obliczeń matematycznych i pakiet jq do przetwarzania JSON.
sudo apt-get install bc jq
  1. Zmodyfikuj skrypt, aby był wykonywalny, i uruchom go. Potrwa to około 2 minut.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

Aby sprawdzić, czy zdarzenia zostały opublikowane, uruchom to polecenie, które wyświetli wszystkie zdarzenia. Aby wyjść, naciśnij <control-c>.

kafka-console-consumer.sh \
    --topic kafka-iot-topic \
    --from-beginning \
    --bootstrap-server $BOOTSTRAP \
    --consumer.config client.properties

6. Konfigurowanie klastra Dataproc

W tej sekcji utworzysz klaster Dataproc w podsieci VPC, w której znajduje się zarządzany klaster Kafka. Ten klaster będzie używany do uruchamiania zadań, które generują statystyki i informacje w czasie rzeczywistym potrzebne firmie DemoIOT Solutions.

  1. Utwórz klaster Dataproc. Nazwa klastra to dataproc-iot.
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

Powinna pojawić się odpowiedź podobna do tej:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

Tworzenie klastra może potrwać od 10 do 15 minut. Poczekaj na zakończenie tej operacji i sprawdź, czy klaster jest w stanie RUNNING, opisując go.

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. Przetwarzanie wiadomości Kafka za pomocą Dataproc

W tej ostatniej sekcji prześlesz zadanie Dataproc, które przetwarza opublikowane wiadomości za pomocą Spark Streaming. To zadanie generuje statystyki i informacje w czasie rzeczywistym, które mogą być wykorzystywane przez DemoIOT Solutions.

  1. Uruchom to polecenie, aby lokalnie utworzyć plik zadania PySpark do przesyłania strumieniowego o nazwie process_iot.py.
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
    StructField("device_id", StringType()),
    StructField("timestamp", StringType()),
    StructField(
        "location",
        StructType([
            StructField("latitude", FloatType()),
            StructField("longitude", FloatType()),
        ]),
    ),
    StructField(
        "sensor_data",
        StructType([
            StructField("temperature", FloatType()),
            StructField("humidity", FloatType()),
            StructField("pressure", FloatType()),
            StructField("light_level", IntegerType()),
        ]),
    ),
    StructField(
        "device_status",
        StructType([
            StructField("battery_level", IntegerType()),
            StructField("signal_strength", IntegerType()),
            StructField("connection_type", StringType()),
        ]),
    ),
    StructField(
        "metadata",
        StructType([
            StructField("sensor_type", StringType()),
            StructField("unit_temperature", StringType()),
            StructField("unit_humidity", StringType()),
            StructField("unit_pressure", StringType()),
            StructField("unit_light_level", StringType()),
            StructField("firmware_version", StringType()),
        ]),
    ),
])
CLIENT_PROPERTY_KEYS = [
    "security.protocol",
    "sasl.mechanism",
    "sasl.login.callback.handler.class",
    "sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
    # Parse client properties file
    parsed_path = urlparse(client_properties_path)
    if parsed_path.scheme != "gs":
        raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

    bucket_name = parsed_path.netloc
    blob_name = parsed_path.path.lstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    file_content = "[DEFAULT]\n" + blob.download_as_text()

    config = configparser.ConfigParser()
    config.read_string(file_content)

    client_properties = dict()
    for key in CLIENT_PROPERTY_KEYS:
        client_properties[key] = config.get("DEFAULT", key)

    print(f"Client properties: {client_properties}")
    return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
    print("Starting initial data processing...")
    initial_rows = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_server_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", "kafka-iot-topic")
        .option("kafka.security.protocol", client_properties["security.protocol"])
        .option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
        .option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
        .option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
        .load()
    )

    initial_rows = (
        initial_rows.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), JSON_SCHEMA).alias("data"))
        .select("data.*")
    )
    
    # Print first 20 rows
    def print_top_rows(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
            batch_df.limit(20).show(truncate=False)

    initial_query_print = initial_rows.writeStream \
        .foreachBatch(print_top_rows) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_print)

    # Calculate and print average temperatures
    def process_initial_avg_temp(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
            current_averages = (
                batch_df.select("device_id", "sensor_data.temperature")
                .groupBy("device_id")
                .agg(avg("temperature").alias("average_temperature"))
                .orderBy("device_id")
            )
            current_averages.show(truncate=False)

    initial_query_avg_temp = initial_rows.writeStream \
        .foreachBatch(process_initial_avg_temp) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_avg_temp)
    
    # Write data to GCS
    initial_data_gcs_writer = (
        initial_rows.writeStream.outputMode("append")
        .format("avro")
        .option("path", store_data_gcs_path+"/tables/iot_avro/")
        .option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
        .trigger(once=True) \
        .start()
    )
    queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

    client_properties = get_client_properties(client_properties_path)

    # Create SparkSession
    spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
    
    queries_to_await = []

    process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
    
    # Wait for all queries to terminate
    for query in queries_to_await:
        try:
            query.awaitTermination()
        except Exception as e:
            print(f"Error awaiting query: {e}")
        finally:
            query.stop()

    spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Invalid number of arguments passed ", len(sys.argv))
        print(
            "Usage: ",
            sys.argv[0],
            " <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
        )
        raise ValueError("Invalid number of arguments passed.")

    parsed_data_path = urlparse(sys.argv[3])
    if parsed_data_path.scheme != "gs":
        raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


    main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

Wyjaśnienie

  • Ten kod konfiguruje zadanie strumieniowania strukturalnego PySpark do odczytywania danych z określonego tematu Kafka. Używa podanego adresu początkowego serwera Kafka i konfiguracji Kafka wczytanych z pliku konfiguracyjnego GCS, aby połączyć się z brokerem Kafka i uwierzytelnić się w nim.
  • Najpierw odczytuje surowe dane z Kafki jako strumień tablic bajtów, a następnie przekształca te tablice w ciągi znaków i stosuje json_schema za pomocą StructType Sparka, aby określić strukturę danych (identyfikator urządzenia, sygnatura czasowa, lokalizacja, dane z czujników itp.).
  • Wyświetla w konsoli pierwszych 10 wierszy, aby umożliwić podgląd, oblicza średnią temperaturę dla każdego czujnika i zapisuje wszystkie dane w zasobniku GCS w formacie avro. Avro to system serializacji danych wierszowych, który skutecznie przechowuje dane strukturalne w kompaktowym, zdefiniowanym przez schemat formacie binarnym. Umożliwia ewolucję schematu, jest niezależny od języka i zapewnia wysoką kompresję na potrzeby przetwarzania danych na dużą skalę.
  1. Utwórz plik client.properties i wypełnij zmienną środowiskową adresem początkowym serwera Kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Prześlij pliki process_iot.pyclient.properties do zasobnika Cloud Storage w Google Cloud, aby można było ich używać w zadaniu Dataproc.
gsutil cp process_iot.py client.properties $BUCKET
  1. Skopiuj do zasobnika GCS niektóre pliki JAR zależności na potrzeby zadania Dataproc. Ten katalog zawiera pliki JAR wymagane do uruchamiania zadań Spark Streaming z Kafka oraz bibliotekę uwierzytelniania usługi zarządzanej Kafka i jej zależności pobrane z artykułu Konfigurowanie maszyny klienta.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. Prześlij zadanie Spark do klastra Dataproc.
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

Logi sterownika Spark zostaną wydrukowane. Powinny być też widoczne tabele zarejestrowane w konsoli oraz dane przechowywane w zasobniku GCS.

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. Czyszczenie danych

Po ukończeniu ćwiczenia postępuj zgodnie z instrukcjami, aby usunąć zasoby.

  1. Usuń zarządzany klaster Kafka, maszynę wirtualną GCE wydawcy i klaster Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. Usuń podsieć i sieć VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. Jeśli nie chcesz już używać danych, usuń zasobnik GCS.
gcloud storage rm --recursive $BUCKET

9. Gratulacje

Gratulacje! Udało Ci się utworzyć potok przetwarzania danych IoT z użyciem usługi zarządzanej Kafka i Dataproc, który pomaga firmie DemoIOT Solutions uzyskiwać wgląd w czasie rzeczywistym w dane publikowane przez jej urządzenia.

Utworzono klaster Managed Kafka, opublikowano w nim zdarzenia IoT i uruchomiono zadanie Dataproc, które wykorzystywało strumieniowanie Spark do przetwarzania tych zdarzeń w czasie rzeczywistym. Znasz już najważniejsze kroki wymagane do tworzenia potoków danych za pomocą zarządzanej usługi Kafka i Dataproc.

Dokumentacja