Apache Kafka için Dataproc ve Google Managed Service'i kullanarak gerçek zamanlı IoT veri işleme

1. Giriş

2efacab8643a653b.png

Last Updated: 2024-06-10

Arka plan

Akıllı ev çözümlerinden endüstriyel sensörlere kadar çeşitli Nesnelerin İnterneti (IoT) cihazları, ağın uç noktasında büyük miktarda veri üretir. Bu veriler; cihaz izleme, takip, teşhis, gözetim, kişiselleştirme, filo optimizasyonu gibi çeşitli kullanım alanları için paha biçilmezdir. Google Managed Service for Apache Kafka, bu sürekli veri akışını OSS uyumlu, kullanımı kolay ve güvenli bir şekilde alma ve depolama için ölçeklenebilir ve dayanıklı bir yöntem sunarken Google Cloud Dataproc, Apache Spark ve Hadoop kümelerini kullanarak veri analizi için bu büyük veri kümelerinin işlenmesine olanak tanır.

Ne oluşturacaksınız?

Bu codelab'de, Google Managed Service for Apache Kafka, Dataproc, Python ve Apache Spark kullanarak gerçek zamanlı analiz yapan bir IoT veri işleme ardışık düzeni oluşturacaksınız. Ardışık düzeniniz:

  • GCE sanal makinelerini kullanarak IoT cihazlarından alınan verileri yönetilen bir Kafka kümesinde yayınlama
  • Yönetilen Kafka kümesinden Dataproc kümesine veri akışı gerçekleştirme
  • Dataproc Spark Streaming işini kullanarak verileri işleme

Neler öğreneceksiniz?

  • Google tarafından yönetilen Kafka ve Dataproc kümeleri oluşturma
  • Dataproc kullanarak akış işlerini çalıştırma

Gerekenler

2. Genel Bakış

Bu codelab'de, DemoIOT Solutions adlı sahte bir şirketin hikayesini takip edeceğiz. DemoIOT Solutions, sıcaklık, nem, basınç, ışık seviyesi ve konum verilerini ölçüp ileten sensör cihazları sağlar. Müşterilerine gerçek zamanlı istatistikler göstermek için bu verileri işleyen ardışık düzenler oluşturmak istiyorlar. Bu tür işlem hatlarını kullanarak müşterilerine, sensörlerini kurdukları yerlerle ilgili izleme, otomatik öneriler, uyarılar ve analizler gibi çeşitli hizmetler sunabilirler.

Bunun için IoT cihazını simüle etmek üzere bir GCE sanal makinesi kullanacağız. Cihaz, Google tarafından yönetilen Kafka kümesindeki bir Kafka konusuna veri yayınlar. Bu veriler, bir Dataproc akış işi tarafından okunup işlenir. Ön koşul kurulumu ve sonraki sayfalar, tüm bu adımları uygulamanıza yardımcı olur.

Ön koşul kurulumu

  1. Projenizin adını ve numarasını bulun. Referans için Proje adını, numarasını ve kimliğini bulma başlıklı makaleyi inceleyin.
  2. VPC alt ağı. Bu işlem, GCE VM, Kafka kümesi ve Dataproc kümesi arasında bağlantı kurulmasına olanak tanır. Mevcut alt ağları gcloud CLI'yı kullanarak listelemek için buradaki adımları uygulayın. Gerekirse her Google Cloud bölgesinde alt ağ içeren bir VPC ağı oluşturacak olan otomatik mod VPC ağı oluşturma bölümündeki adımları uygulayın. Ancak bu codelab'in amacı doğrultusunda yalnızca tek bir bölgedeki bir alt ağı kullanacağız.
  • Bu alt ağda, SSH için gerekli olan tcp:22'den gelen tüm girişlere izin veren bir güvenlik duvarı kuralı olduğundan emin olun. Bu kural, ağ oluşturulurken Güvenlik duvarı kuralları bölümünde seçilebilir. Bu nedenle, kuralı seçtiğinizden emin olun.
  1. GCS paketi. Dataproc iş kaynaklarını depolamak ve işlenmiş verileri kalıcı hale getirmek için bir Google Cloud Storage paketine erişmeniz gerekir. Hesabınız yoksa GCP projenizde oluşturabilirsiniz.

Ortam değişkenlerini doldurma

gcloud CLI'yı çalıştırdığınız terminalinizde bu ortam değişkenlerini daha sonra kullanılabilmeleri için doldurun.

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

Aşağıdakini değiştirin:

  • <project-id> kısmını, ayarladığınız GCP projesinin adıyla değiştirin.
  • <project-number> yerine Ön koşullar bölümündeki 1. adımda belirtilen proje numarasının adını yazın.
  • <region> yerine, Kullanılabilir bölgeler ve alt bölgeler listesinden kullanmak istediğiniz bir bölgenin adını yazın. Örneğin, us-central1 kullanabiliriz.
  • <zone> kısmını, daha önce seçtiğiniz bölgenin altındaki Kullanılabilir bölgeler ve alt bölgeler bölümünde yer alan alt bölgenin adıyla değiştirin. Örneğin, bölge olarak us-central1'yı seçtiyseniz alt bölge olarak us-central1-f'yı kullanabilirsiniz. Bu bölge, IoT cihazlarını simüle eden GCE sanal makinesini oluşturmak için kullanılır. Bölgenizin seçtiğiniz bölgede olduğundan emin olun.
  • <subnet-path> ile ön koşul adımındaki 2. alt ağın tam yolu. Bu özelliğin değeri şu biçimde olmalıdır: projects/<project-id>/regions/<region>/subnetworks/<subnet-name>.
  • <bucket-name> ile Ön koşullar bölümündeki 3. adımda belirtilen GCS paketinin adını girin.

3. Google Managed Kafka'yı ayarlama

Bu bölümde, Kafka sunucusunu dağıtan ve bu sunucuda IoT verilerinin yayınlanabileceği ve abone olunduktan sonra okunabileceği bir konu oluşturan Google Managed Kafka kümesi oluşturulur. DemoIOT Solutions, tüm cihazlarının veri yayınlayacağı şekilde bu kümeyi ayarlayabilir.

Yönetilen Kafka kümesi oluşturma

  • Yönetilen Kafka kümesini oluşturun. Burada kümenin adı kafka-iot'dır.
gcloud managed-kafka clusters create kafka-iot \
    --project=$PROJECT_ID \
    --location=$REGION \
    --cpu=3 \
    --memory=12GiB \
    --subnets=$SUBNET_PATH \
    --auto-rebalance

Şuna benzer bir yanıt alırsınız:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                         
Created cluster [kafka-iot].

Küme oluşturma işlemi 20-30 dakika sürer. Bu işlemin tamamlanmasını bekleyin.

Konu oluşturma

  • Kümede yönetilen bir Kafka konusu oluşturun. Burada konunun adı kafka-iot-topic.
gcloud managed-kafka topics create kafka-iot-topic \
    --cluster=kafka-iot \
    --location=$REGION \
    --partitions=10 \
    --replication-factor=3

Aşağıdakine benzer bir çıkış alırsınız:

Created topic [kafka-iot-topic].

4. Yayıncı oluşturma

Yönetilen Kafka kümesinde yayın yapmak için, Yönetilen Kafka kümesi tarafından kullanılan alt ağı içeren VPC'ye erişebilen bir Google Compute Engine VM örneği oluştururuz. Bu sanal makine, DemoIOT Solutions tarafından sağlanan sensör cihazlarını simüle eder.

Adımlar

  1. Google Compute Engine sanal makine örneğini oluşturun. Burada GCE sanal makinesinin adı publisher-instance'dır.
gcloud compute instances create publisher-instance \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --subnet=$SUBNET_PATH \
    --zone=$ZONE
  1. Google Compute Engine varsayılan hizmet hesabına, Managed Service for Apache Kafka'yı kullanma izni verin.
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. Sanal makineye bağlanmak için SSH kullanın. Alternatif olarak, SSH için Google Cloud Console'u kullanın.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. Kafka komut satırı araçlarını çalıştırmak için Java'yı yükleyin ve bu komutları kullanarak Kafka ikili programını indirin.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. Managed Kafka kimlik doğrulama kitaplığını ve bağımlılarını indirin ve Kafka istemci özelliklerini yapılandırın.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

Yayıncı makinesi kurulumu hakkında daha fazla bilgi için İstemci makine ayarlama başlıklı makaleyi inceleyin.

5. Yönetilen Kafka'da yayınlama

Yayıncı ayarlandığına göre, GCE VM'den (DemoIOT Solutions tarafından IoT cihazları simüle edilerek) Managed Kafka kümesine bazı sahte veriler yayınlamak için Kafka komut satırını kullanabiliriz.

  1. GCE sanal makine örneğine SSH ile bağlandığımız için PROJECT_ID değişkenini yeniden doldurmamız gerekiyor:
export PROJECT_ID=<project-id>
export REGION=<region>

Aşağıdakini değiştirin:

  • <project-id> kısmını, ayarladığınız GCP projesinin adıyla değiştirin.
  • Kafka kümesinin oluşturulduğu bölgeyi içeren <region>
  1. Kafka bootstrap sunucusunun IP adresini almak için managed-kafka clusters describe komutunu kullanın. Bu adres, Kafka kümesine bağlanmak için kullanılabilir.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Kümedeki konuları listeleyin:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

Daha önce oluşturduğumuz kafka-iot-topic konusunu içeren aşağıdaki çıkışı görebilirsiniz.

__remote_log_metadata
kafka-iot-topic
  1. Bu komut dosyasını kopyalayıp yeni bir dosyaya yapıştırın publish_iot_data.sh. GCE sanal makinesinde yeni bir dosya oluşturmak için vim veya nano gibi bir araç kullanabilirsiniz.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

Açıklama

  • Bu komut dosyası, cihaz kimliği, zaman damgası, sensör verileri (sıcaklık, nem, basınç, ışık), konum bilgileri (enlem, boylam), cihaz durumu (pil, sinyal, bağlantı türü) ve bazı meta verileri içeren, simüle edilmiş sensör okumalarıyla JSON mesajları oluşturur.
  • Belirli sayıda benzersiz cihazdan sürekli bir mesaj akışı oluşturur. Her cihaz, belirtilen zaman aralığında veri göndererek gerçek dünyadaki IoT cihazlarını taklit eder. Burada, her biri 10 saniye aralıklarla 20 okuma yapan 10 cihazdan elde edilen verileri yayınlıyoruz.
  • Ayrıca, oluşturulan tüm verileri Kafka üreticisi komut satırı aracını kullanarak Kafka konusuna yayınlar.
  1. Komut dosyası tarafından kullanılan bazı bağımlılıkları yükleyin: matematiksel hesaplamalar için bc paketi ve JSON işleme için jq paketi.
sudo apt-get install bc jq
  1. Komut dosyasını yürütülebilir olacak şekilde değiştirin ve komut dosyasını çalıştırın. Bu işlem yaklaşık 2 dakika sürer.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

Tüm etkinlikleri yazdıracak bu komutu çalıştırarak etkinliklerin başarıyla yayınlandığını kontrol edebilirsiniz. Çıkmak için <control-c> tuşuna basın.

kafka-console-consumer.sh \
    --topic kafka-iot-topic \
    --from-beginning \
    --bootstrap-server $BOOTSTRAP \
    --consumer.config client.properties

6. Dataproc kümesini ayarlama

Bu bölümde, yönetilen Kafka kümesinin bulunduğu VPC alt ağında bir Dataproc kümesi oluşturulur. Bu küme, DemoIOT Solutions'ın ihtiyaç duyduğu gerçek zamanlı istatistikleri ve analizleri oluşturan işleri çalıştırmak için kullanılacak.

  1. Dataproc kümesi oluşturun. Burada kümenin adı dataproc-iot.
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

Şuna benzer bir yanıt alırsınız:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

Küme oluşturma işlemi 10-15 dakika sürebilir. Bu işlemin başarıyla tamamlanmasını bekleyin ve kümeyi açıklayarak kümenin RUNNING durumunda olduğunu kontrol edin.

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. Dataproc kullanarak Kafka mesajlarını işleme

Bu son bölümde, yayınlanan mesajları Spark Streaming kullanarak işleyen bir Dataproc işi göndereceksiniz. Bu iş, DemoIOT Solutions tarafından kullanılabilecek bazı gerçek zamanlı istatistikler ve analizler oluşturur.

  1. process_iot.py adlı akış PySpark işi dosyasını yerel olarak oluşturmak için bu komutu çalıştırın.
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
    StructField("device_id", StringType()),
    StructField("timestamp", StringType()),
    StructField(
        "location",
        StructType([
            StructField("latitude", FloatType()),
            StructField("longitude", FloatType()),
        ]),
    ),
    StructField(
        "sensor_data",
        StructType([
            StructField("temperature", FloatType()),
            StructField("humidity", FloatType()),
            StructField("pressure", FloatType()),
            StructField("light_level", IntegerType()),
        ]),
    ),
    StructField(
        "device_status",
        StructType([
            StructField("battery_level", IntegerType()),
            StructField("signal_strength", IntegerType()),
            StructField("connection_type", StringType()),
        ]),
    ),
    StructField(
        "metadata",
        StructType([
            StructField("sensor_type", StringType()),
            StructField("unit_temperature", StringType()),
            StructField("unit_humidity", StringType()),
            StructField("unit_pressure", StringType()),
            StructField("unit_light_level", StringType()),
            StructField("firmware_version", StringType()),
        ]),
    ),
])
CLIENT_PROPERTY_KEYS = [
    "security.protocol",
    "sasl.mechanism",
    "sasl.login.callback.handler.class",
    "sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
    # Parse client properties file
    parsed_path = urlparse(client_properties_path)
    if parsed_path.scheme != "gs":
        raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

    bucket_name = parsed_path.netloc
    blob_name = parsed_path.path.lstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    file_content = "[DEFAULT]\n" + blob.download_as_text()

    config = configparser.ConfigParser()
    config.read_string(file_content)

    client_properties = dict()
    for key in CLIENT_PROPERTY_KEYS:
        client_properties[key] = config.get("DEFAULT", key)

    print(f"Client properties: {client_properties}")
    return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
    print("Starting initial data processing...")
    initial_rows = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_server_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", "kafka-iot-topic")
        .option("kafka.security.protocol", client_properties["security.protocol"])
        .option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
        .option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
        .option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
        .load()
    )

    initial_rows = (
        initial_rows.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), JSON_SCHEMA).alias("data"))
        .select("data.*")
    )
    
    # Print first 20 rows
    def print_top_rows(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
            batch_df.limit(20).show(truncate=False)

    initial_query_print = initial_rows.writeStream \
        .foreachBatch(print_top_rows) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_print)

    # Calculate and print average temperatures
    def process_initial_avg_temp(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
            current_averages = (
                batch_df.select("device_id", "sensor_data.temperature")
                .groupBy("device_id")
                .agg(avg("temperature").alias("average_temperature"))
                .orderBy("device_id")
            )
            current_averages.show(truncate=False)

    initial_query_avg_temp = initial_rows.writeStream \
        .foreachBatch(process_initial_avg_temp) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_avg_temp)
    
    # Write data to GCS
    initial_data_gcs_writer = (
        initial_rows.writeStream.outputMode("append")
        .format("avro")
        .option("path", store_data_gcs_path+"/tables/iot_avro/")
        .option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
        .trigger(once=True) \
        .start()
    )
    queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

    client_properties = get_client_properties(client_properties_path)

    # Create SparkSession
    spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
    
    queries_to_await = []

    process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
    
    # Wait for all queries to terminate
    for query in queries_to_await:
        try:
            query.awaitTermination()
        except Exception as e:
            print(f"Error awaiting query: {e}")
        finally:
            query.stop()

    spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Invalid number of arguments passed ", len(sys.argv))
        print(
            "Usage: ",
            sys.argv[0],
            " <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
        )
        raise ValueError("Invalid number of arguments passed.")

    parsed_data_path = urlparse(sys.argv[3])
    if parsed_data_path.scheme != "gs":
        raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


    main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

Açıklama

  • Bu kod, belirtilen bir Kafka konusundan veri okumak için bir PySpark Structured Streaming işi oluşturur. Kafka aracısıyla bağlantı kurup kimlik doğrulaması yapmak için sağlanan Kafka sunucusu başlatma adresini ve GCS yapılandırma dosyasından yüklenen Kafka yapılandırmalarını kullanır.
  • Öncelikle Kafka'daki ham verileri bayt dizileri akışı olarak okur, bu bayt dizilerini dizelere dönüştürür ve verilerin yapısını (cihaz kimliği, zaman damgası, konum, sensör verileri vb.) belirtmek için Spark'ın StructType'ını kullanarak json_schema uygular.
  • Önizleme için ilk 10 satırı konsola yazdırır, sensör başına ortalama sıcaklığı hesaplar ve tüm verileri avro biçiminde GCS paketine yazar. Avro, yapılandırılmış verileri kompakt ve şemayla tanımlanmış bir ikili biçimde verimli bir şekilde depolayan, satır tabanlı bir veri serileştirme sistemidir. Şema evrimi, dil bağımsızlığı ve büyük ölçekli veri işleme için yüksek sıkıştırma sunar.
  1. client.properties dosyasını oluşturun ve Kafka sunucusunun başlatma adresi için ortam değişkenini doldurun.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. process_iot.py ve client.properties dosyalarını Google Cloud Storage paketinize yükleyin. Böylece bu dosyalar Dataproc işinde kullanılabilir.
gsutil cp process_iot.py client.properties $BUCKET
  1. Dataproc işi için bazı bağımlılık JAR'larını GCS paketinize kopyalayın. Bu dizin, Kafka ile Spark Streaming işlerini çalıştırmak için gereken jar'ları ve Set up a client machine (İstemci makineyi ayarlama) bölümünden alınan Managed Kafka kimlik doğrulama kitaplığını ve bağımlılıklarını içerir.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. Spark işini Dataproc kümesine gönderin.
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

Spark sürücü günlükleri yazdırılır. Ayrıca bu tabloların konsola kaydedildiğini ve verilerin GCS paketinize depolandığını da görebilirsiniz.

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. Temizleme

Codelab'i tamamladıktan sonra kaynakları temizlemek için adımları uygulayın.

  1. Yönetilen Kafka kümesini, Publisher GCE VM'sini ve Dataproc kümesini silin.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. VPC alt ağınızı ve ağınızı silin.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. Verileri artık kullanmak istemiyorsanız GCS paketinizi silin.
gcloud storage rm --recursive $BUCKET

9. Tebrikler

Tebrikler. Manage Kafka ve Dataproc ile DemoIOT Solutions'ın cihazları tarafından yayınlanan veriler hakkında gerçek zamanlı bilgiler edinmesine yardımcı olan bir IoT veri işleme ardışık düzenini başarıyla oluşturdunuz.

Yönetilen bir Kafka kümesi oluşturdunuz, IoT etkinliklerini bu kümeye yayınladınız ve bu etkinlikleri gerçek zamanlı olarak işlemek için Spark akışını kullanan bir Dataproc işi çalıştırdınız. Artık Managed Kafka ve Dataproc kullanarak veri ardışık düzenleri oluşturmak için gereken temel adımları biliyorsunuz.

Referans belgeler