Apache Kafka için Dataproc ve Google Managed Service'i kullanarak gerçek zamanlı IoT veri işleme
Bu codelab hakkında
1. Giriş
Last Updated: 2024-06-10
Arka plan
Akıllı ev çözümlerinden endüstriyel sensörlere kadar Nesnelerin İnterneti (IoT) cihazları, ağ kenarında çok büyük miktarda veri oluşturur. Bu veriler; cihaz izleme, izleme, teşhis, gözetim, kişiselleştirme, filo optimizasyonu ve daha pek çok kullanım alanı için paha biçilmezdir. Apache Kafka için Google Managed Service, bu sürekli veri akışını açık kaynak yazılımla uyumlu, kullanımı kolay ve güvenli bir şekilde beslemenin ve depolamanın ölçeklenebilir ve dayanıklı bir yolunu sunarken Google Cloud Dataproc, Apache Spark ve Hadoop kümelerini kullanarak veri analizi için bu büyük veri kümelerinin işlenmesine olanak tanır.
Oluşturacağınız uygulama
Bu codelab'de, Apache Kafka, Dataproc, Python ve Apache Spark için Google Managed Service'i kullanarak gerçek zamanlı analizler yapan bir IoT Veri İşleme ardışık düzeni oluşturacaksınız. Ardışık düzeniniz:
- GCE sanal makinelerini kullanarak IoT cihazlarından Managed Kafka kümesine veri yayınlama
- Manage Kafka kümesinden bir Dataproc kümesine veri aktarma
- Dataproc Spark Streaming işi kullanarak verileri işleme
Neler öğreneceksiniz?
- Google Managed Kafka ve Dataproc kümeleri oluşturma
- Dataproc'u kullanarak akış işlerini çalıştırma
Gerekenler
- Projesi oluşturulmuş etkin bir GCP hesabı. Hesabınız yoksa ücretsiz deneme sürümüne kaydolabilirsiniz.
- gcloud CLI yüklendi ve yapılandırıldı. gcloud CLI'yi işletim sisteminize yükleme talimatlarını uygulayabilirsiniz.
- GCP projenizde Google Managed Kafka ve Dataproc için API'leri etkinleştirin.
2. Genel Bakış
Bu codelab'de, DemoIOT Solutions adlı hayali bir şirketin hikayesini inceleyelim. DemoIOT Solutions, sıcaklık, nem, basınç, ışık seviyesi ve konumla ilgili verileri ölçen ve aktaran sensör cihazları sağlar. Müşterilerine gerçek zamanlı istatistikler göstermek için bu verileri işleyen ardışık düzenler oluşturmak istiyorlar. Bu tür ardışık düzenlerden yararlanarak müşterilerine izleme, otomatik öneriler, uyarılar ve müşterilerin sensörlerini kurdukları yerlerle ilgili analizler gibi çok çeşitli hizmetler sunabilirler.
Bunun için IoT cihazını simüle etmek üzere bir GCE sanal makinesi kullanacağız. Cihaz, Google Managed Kafka kümesinde bir Kafka konusuna veri yayınlar. Bu veriler, bir Dataproc akış işi tarafından okunur ve işlenir. Ön koşul ayarlaması ve aşağıdaki sayfalar, tüm bu adımları uygulamanıza rehberlik eder.
Ön koşul kurulumu
- Projenizin adını ve proje numarasını bulun. Referans olarak Proje adını, numarasını ve kimliğini bulma başlıklı makaleyi inceleyin.
- VPC alt ağı. Bu, GCE sanal makinesi, Kafka kümesi ve Dataproc kümesi arasında bağlantı kurulmasına olanak tanır. gcloud KSA'yı kullanarak mevcut alt ağları listelemek için bu adımları uygulayın. Gerekirse otomatik mod VPC ağı oluşturma başlıklı makaleyi inceleyin. Bu makalede, her Google Cloud bölgesinde alt ağ içeren bir VPC ağı oluşturma hakkında bilgi verilmektedir. Ancak bu kod laboratuvarının amacı doğrultusunda yalnızca tek bir bölgeden bir alt ağ kullanacağız.
- Bu alt ağda, SSH için gerekli olan tcp:22 bağlantı noktasından gelen tüm girişlere izin veren bir güvenlik duvarı kuralı bulunduğundan emin olun. Bu kural, ağ oluştururken Güvenlik duvarı kuralları bölümünden seçilebilir. Bu nedenle, bu kuralı seçtiğinizden emin olun.
- GCS paketi. Dataproc iş kaynaklarını depolamak ve işlenmiş verileri kalıcı hale getirmek için bir Google Cloud Storage paketine erişiminiz olmalıdır. Hesabınız yoksa GCP projenizde hesap oluşturabilirsiniz.
Ortam değişkenlerini doldurma
gcloud CLI'yi çalıştırdığınız terminalinizde, bu ortam değişkenlerini daha sonra kullanılabilecekleri şekilde doldurun.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
Aşağıdakini değiştirin:
<project-id>
yerine, oluşturduğunuz GCP projesinin adını yazın.<project-number>
yerine, 1. adımdaki ön koşulda belirtilen proje numarasını yazın.<region>
yerine, Mevcut bölgeler ve alt bölgeler bölümünden kullanmak istediğiniz bir bölgenin adını yazın. Örneğin,us-central1
kullanabiliriz.<zone>
Kullanılabilir bölgeler ve alt bölgeler bölümünden, daha önce seçtiğiniz bölgenin alt bölgesinin adıyla değiştirin. Örneğin, bölge olarakus-central1
'ü seçtiyseniz alt bölge olarakus-central1-f
'ü kullanabilirsiniz. Bu bölge, IoT cihazlarını simüle eden GCE sanal makinesini oluşturmak için kullanılır. Bölgenizin, seçtiğiniz bölgede bulunduğundan emin olun.<subnet-path>
dosyasını, ön koşul 2. adımındaki alt ağın tam yolu ile birlikte gönderin. Bu özelliğin değeri şu biçimde olmalıdır:projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
.<bucket-name>
, Önkoşul 3. adımdaki GCS paketinin adıdır.
3. Google Managed Kafka'yı ayarlama
Bu bölümde, Kafka sunucusunu dağıtan ve bu sunucuda IoT verilerinin yayınlanabileceği ve abone olduktan sonra okunabileceği bir konu oluşturan bir Google Managed Kafka kümesi oluşturulur. DemoIOT Solutions, tüm cihazlarının bu kümeye veri yayınlaması için bu kümeyi ayarlayabilir.
Yönetilen Kafka kümesi oluşturma
- Yönetilen Kafka kümesini oluşturun. Burada kümenin adı
kafka-iot
.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
Şuna benzer bir yanıt alırsınız:
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
Küme oluşturma işlemi 20-30 dakika sürer. Bu işlemin tamamlanmasını bekleyin.
Konu oluşturma
- Kümede yönetilen bir Kafka konusu oluşturun. Burada, konunun adı
kafka-iot-topic
.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
Aşağıdakine benzer bir çıkış alırsınız:
Created topic [kafka-iot-topic].
4. Yayıncı oluşturma
Yönetilen Kafka kümesinde yayınlamak için, Yönetilen Kafka kümesi tarafından kullanılan alt ağı içeren VPC'ye erişebilen bir Google Compute Engine sanal makine örneği oluşturduk. Bu sanal makine, DemoIOT Solutions tarafından sağlanan sensör cihazlarını simüle eder.
Adımlar
- Google Compute Engine sanal makine örneğini oluşturun. Burada, GCE sanal makinesinin adı
publisher-instance
.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Google Compute Engine varsayılan hizmet hesabına, Apache Kafka için Managed Service'i kullanma izinlerini verin.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- Sanal makineye bağlanmak için SSH kullanın. Alternatif olarak SSH için Google Cloud Console'u kullanabilirsiniz.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Kafka komut satırı araçlarını çalıştırmak için Java'yı yükleyin ve aşağıdaki komutları kullanarak Kafka ikilisini indirin.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- Managed Kafka kimlik doğrulama kitaplığını ve bağımlılarını indirip Kafka istemci özelliklerini yapılandırın.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
Yayıncı makinesi kurulumu hakkında daha fazla bilgi için İstemci makinesi oluşturma başlıklı makaleyi inceleyin.
5. Managed Kafka'da yayınlama
Yayıncı kurulduktan sonra, GCE sanal makinesinden (DemoIOT Solutions tarafından IoT cihazlarını simüle eder) Managed Kafka kümesine bazı örnek veriler yayınlamak için Kafka komut satırını kullanabiliriz.
- GCE sanal makine örneğine SSH ile bağlandığımız için
PROJECT_ID
değişkenini yeniden doldurmamız gerekiyor:
export PROJECT_ID=<project-id>
export REGION=<region>
Aşağıdakini değiştirin:
<project-id>
yerine, oluşturduğunuz GCP projesinin adını yazın.- Kafka kümesinin oluşturulduğu bölgeyi içeren
<region>
- Kafka önyükleme sunucusunun IP adresini almak için
managed-kafka clusters describe
komutunu kullanın. Bu adres, Kafka kümesine bağlanmak için kullanılabilir.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Kümedeki konuları listeleyin:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
Daha önce oluşturduğumuz kafka-iot-topic
konusunu içeren aşağıdaki çıkışı görebilirsiniz.
__remote_log_metadata
kafka-iot-topic
- Bu komut dosyasını kopyalayıp yeni bir dosyaya
publish_iot_data.sh
yapıştırın. GCE sanal makinesinde yeni dosya oluşturmak içinvim
veyanano
gibi bir araç kullanabilirsiniz.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
Açıklama
- Bu komut dosyası, cihaz kimliği, zaman damgası, sensör verileri (sıcaklık, nem, basınç, ışık), konum bilgileri (enlem, boylam), cihaz durumu (pil, sinyal, bağlantı türü) ve bazı meta veriler içeren simüle edilmiş sensör okumalarıyla JSON mesajları oluşturur.
- Belirli sayıda benzersiz cihazdan kesintisiz bir mesaj akışı oluşturur. Her cihaz, gerçek dünyadaki IoT cihazlarını taklit ederek belirli bir zaman aralığında veri gönderir. Burada, her biri 10 saniyelik bir zaman aralığında 20 ölçüm yapan 10 cihazdan gelen verileri yayınlıyoruz.
- Ayrıca, Kafka üretici komut satırı aracını kullanarak oluşturulan tüm verileri Kafka konusuna yayınlar.
- Komut dosyası tarafından kullanılan bazı bağımlılıkları yükleyin: matematiksel hesaplamalar için
bc
paketi ve JSON işleme içinjq
paketi.
sudo apt-get install bc jq
- Komut dosyasını yürütülebilir olacak şekilde değiştirin ve çalıştırın. Bu işlem yaklaşık 2 dakika sürer.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
Tüm etkinlikleri yazdıran bu komutu çalıştırarak etkinliklerin başarıyla yayınlanıp yayınlanmadığını kontrol edebilirsiniz. Çıkmak için <control-c>
tuşuna basın.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Dataproc kümesini ayarlama
Bu bölümde, Managed Kafka kümesinin bulunduğu VPC alt ağında bir Dataproc kümesi oluşturulur. Bu küme, DemoIOT Solutions'ın ihtiyaç duyduğu gerçek zamanlı istatistikleri ve analizleri üreten işleri çalıştırmak için kullanılacaktır.
- Dataproc kümesi oluşturun. Burada kümenin adı
dataproc-iot
.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
Şuna benzer bir yanıt alırsınız:
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
Küme oluşturma işlemi 10-15 dakika sürebilir. Bu işlemin başarıyla tamamlanmasını bekleyin ve kümeyi açıklayarak kümenin RUNNING
durumunda olduğunu kontrol edin.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Dataproc'i kullanarak Kafka mesajlarını işleme
Bu son bölümde, Spark Streaming'i kullanarak yayınlanan mesajları işleyen bir Dataproc işi göndereceksiniz. Bu iş, DemoIOT Solutions tarafından kullanılabilecek bazı anlık istatistikler ve analizler oluşturur.
process_iot.py
adlı akış PySpark iş dosyasını yerel olarak oluşturmak için bu komutu çalıştırın.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
Açıklama
- Bu kod, belirtilen bir Kafka konusundan veri okumak için bir PySpark Yapılandırılmış Akış işi oluşturur. Kafka aracısına bağlanmak ve kimlik doğrulamak için sağlanan Kafka sunucu önyükleme adresini ve bir GCS yapılandırma dosyasından yüklenen Kafka yapılandırmalarını kullanır.
- Öncelikle Kafka'daki ham verileri bir bayt dizisi akışı olarak okur, bu bayt dizilerini dizelere dönüştürür ve verilerin yapısını (cihaz kimliği, zaman damgası, konum, sensör verileri vb.) belirtmek için Spark'ın StructType'sini kullanarak
json_schema
işlevini uygular. - İlk 10 satırı önizleme için konsola yazdırır, sensör başına ortalama sıcaklığı hesaplar ve tüm verileri
avro
biçiminde GCS paketine yazar. Avro, yapılandırılmış verileri kompakt, şema tanımlı bir ikili biçimde verimli bir şekilde depolayan, büyük ölçekli veri işleme için şema gelişimi, dil tarafsızlığı ve yüksek sıkıştırma sunan satır tabanlı bir veri serileştirme sistemidir.
client.properties
dosyasını oluşturun ve kafka sunucusunun önyükleme adresi için ortam değişkenini doldurun.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Dataproc işi tarafından kullanılabilmesi için
process_iot.py
veclient.properties
dosyalarını Google Cloud Storage paketinize yükleyin.
gsutil cp process_iot.py client.properties $BUCKET
- Dataproc işinin bazı bağımlılık jar dosyalarını GCS paketinize kopyalayın. Bu dizin, Spark Streaming işlerini Kafka ile çalıştırmak için gereken jar dosyalarını ve İstemci makinesi oluşturma bölümünden alınan Managed Kafka kimlik doğrulama kitaplığını ve bağımlılıklarını içerir.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Spark işini Dataproc kümesine gönderin.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
Spark sürücü günlükleri yazdırılır. Ayrıca, bu tabloların konsola kaydedildiğini ve verilerin GCS paketinizde depolandığını görebilirsiniz.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. Temizleme
Codelab'i tamamladıktan sonra kaynakları temizleme adımlarını uygulayın.
- Yönetilen Kafka kümesini, Yayıncı GCE sanal makinesini ve Dataproc kümesini silin.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- VPC alt ağınızı ve ağınızı silin.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- Artık verileri kullanmak istemiyorsanız GCS paketinizi silin.
gcloud storage rm --recursive $BUCKET
9. Tebrikler
Tebrikler, Manage Kafka ve Dataproc ile DemoIOT Solutions'ın cihazları tarafından yayınlanan verilerle ilgili gerçek zamanlı analizler elde etmesine yardımcı olacak bir IoT veri işleme ardışık düzenini başarıyla oluşturdunuz.
Yönetilen bir Kafka kümesi oluşturdunuz, bu kümeye IoT etkinlikleri yayınladınız ve bu etkinlikleri gerçek zamanlı olarak işlemek için Spark akışının kullanıldığı bir Dataproc işi çalıştırdınız. Managed Kafka ve Dataproc'i kullanarak veri ardışık düzenleri oluşturmak için gereken temel adımları öğrendiniz.