Dataproc और Apache Kafka के लिए Google की मैनेज की जाने वाली सेवा का इस्तेमाल करके, रीयल-टाइम IoT डेटा प्रोसेसिंग
इस कोडलैब (कोड बनाना सीखने के लिए ट्यूटोरियल) के बारे में जानकारी
1. परिचय
पिछली बार अपडेट किया गया: 10-06-2024
बैकग्राउंड
इंटरनेट ऑफ़ थिंग्स (IoT) डिवाइस, स्मार्ट होम से लेकर इंडस्ट्रियल सेंसर तक, एज ऑफ़ नेटवर्क पर काफ़ी ज़्यादा डेटा जनरेट करते हैं. यह डेटा, कई तरह के इस्तेमाल के उदाहरणों के लिए बहुत अहम है. जैसे, डिवाइस की निगरानी करना, ट्रैकिंग, गड़बड़ी की जानकारी, निगरानी करना, मनमुताबिक बनाना, फ़्लीट ऑप्टिमाइज़ेशन वगैरह. Apache Kafka के लिए Google की मैनेज की जाने वाली सेवा, डेटा की इस लगातार स्ट्रीम को ओएसएस के साथ काम करने वाले, आसानी से इस्तेमाल किए जा सकने वाले, और सुरक्षित तरीके से डालने और स्टोर करने का स्केलेबल और ड्यूरेबल तरीका उपलब्ध कराती है. वहीं, Google Cloud Dataproc, Apache Spark और Hadoop क्लस्टर का इस्तेमाल करके, डेटा के विश्लेषण के लिए इन बड़े डेटासेट को प्रोसेस करने की सुविधा देता है.
आपको क्या बनाना है
इस कोडलैब में, आपको Apache Kafka, Dataproc, Python, और Apache Spark के लिए Google की मैनेज की जाने वाली सेवा का इस्तेमाल करके, रीयल-टाइम में आंकड़े देने वाली एक IoT डेटा प्रोसेसिंग पाइपलाइन बनानी है. आपकी पाइपलाइन:
- GCE VM का इस्तेमाल करके, IoT डिवाइसों से मैनेज किए जा रहे Kafka क्लस्टर में डेटा पब्लिश करना
- Manage Kafka क्लस्टर से Dataproc क्लस्टर में डेटा स्ट्रीम करना
- Dataproc Spark Streaming जॉब का इस्तेमाल करके डेटा प्रोसेस करना
आपको क्या सीखने को मिलेगा
- Google के मैनेज किए जा रहे Kafka और Dataproc क्लस्टर बनाने का तरीका
- Dataproc का इस्तेमाल करके स्ट्रीमिंग जॉब चलाने का तरीका
आपको इन चीज़ों की ज़रूरत होगी
- चालू GCP खाता, जिसमें प्रोजेक्ट सेट अप हो. अगर आपके पास खाता नहीं है, तो मुफ़्त में आज़माने के लिए साइन अप करें.
- gcloud सीएलआई इंस्टॉल और कॉन्फ़िगर किया गया हो. अपने ओएस पर gcloud सीएलआई इंस्टॉल करने के लिए, निर्देशों का पालन करें.
- आपके GCP प्रोजेक्ट में, Google मैनेज किया जाने वाला Kafka और Dataproc के लिए चालू किए गए एपीआई.
2. खास जानकारी
इस कोडलैब के लिए, एक डमी कंपनी, DemoIOT Solutions की कहानी पर नज़र डालते हैं. DemoIOT Solutions, सेंसर डिवाइस उपलब्ध कराता है. ये डिवाइस, तापमान, नमी, दबाव, रोशनी के लेवल, और जगह की जानकारी का डेटा मेज़र करते हैं और उसे ट्रांसमिट करते हैं. वे ऐसे पाइपलाइन सेट अप करना चाहते हैं जो इस डेटा को प्रोसेस करके, उनके ग्राहकों को रीयल टाइम में आंकड़े दिखा सकें. ऐसी पाइपलाइन का इस्तेमाल करके, वे अपने ग्राहकों को कई तरह की सेवाएं दे सकते हैं. जैसे, निगरानी करना, अपने-आप सुझाव देना, सूचनाएं देना, और उन जगहों के बारे में अहम जानकारी देना जहां ग्राहकों ने अपने सेंसर इंस्टॉल किए हैं.
इसके लिए, हम IoT डिवाइस को सिम्युलेट करने के लिए GCE वर्चुअल मशीन का इस्तेमाल करेंगे. डिवाइस, Google के मैनेज किए जा रहे Kafka क्लस्टर में किसी Kafka विषय पर डेटा पब्लिश करेगा. इसे Dataproc स्ट्रीमिंग जॉब पढ़कर प्रोसेस करेगा. ज़रूरी शर्तों के सेटअप और नीचे दिए गए पेजों की मदद से, ये सभी चरण पूरे किए जा सकते हैं.
ज़रूरी शर्तों के मुताबिक सेटअप करना
- अपने प्रोजेक्ट का नाम और प्रोजेक्ट नंबर ढूंढें. रेफ़रंस के लिए, प्रोजेक्ट का नाम, नंबर, और आईडी ढूंढना लेख पढ़ें.
- वीपीसी सबनेटवर्क. इससे GCE VM, Kafka क्लस्टर, और Dataproc क्लस्टर को कनेक्ट किया जा सकेगा. gcloud सीएलआई का इस्तेमाल करके, मौजूदा सबनेट की सूची देखने के लिए, यह तरीका अपनाएं. अगर ज़रूरी हो, तो ऑटो मोड वाला VPC नेटवर्क बनाएं. इससे हर Google Cloud क्षेत्र में सबनेट वाला VPC नेटवर्क बन जाएगा. हालांकि, इस कोडलैब के लिए, हम सिर्फ़ एक क्षेत्र के सबनेट का इस्तेमाल करेंगे.
- इस सबनेटवर्क में, पक्का करें कि फ़ायरवॉल का कोई ऐसा नियम हो जिससे टीसीपी:22 से सभी इन्ग्रेस डेटा ट्रैफ़िक को आने की अनुमति दी गई हो. यह एसएसएच के लिए ज़रूरी है. नेटवर्क बनाते समय, यह नियम फ़ायरवॉल के नियमों वाले सेक्शन में उपलब्ध होगा. इसलिए, पक्का करें कि आपने इसे चुना हो.
- GCS बकेट. Dataproc जॉब के संसाधनों को सेव करने और प्रोसेस किए गए डेटा को सेव रखने के लिए, आपके पास Google Cloud Storage बकेट का ऐक्सेस होना चाहिए. अगर आपके पास कोई उपयोगकर्ता आईडी नहीं है, तो अपने GCP प्रोजेक्ट में जाकर एक बनाएं.
एनवायरमेंट वैरिएबल पॉप्युलेट करना
gcloud CLI को चलाने वाले टर्मिनल में, इन एनवायरमेंट वैरिएबल को पॉप्युलेट करें, ताकि उनका इस्तेमाल बाद में किया जा सके.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
इनकी जगह ये डालें:
<project-id>
में, सेट अप किए गए GCP प्रोजेक्ट का नाम डालें.<project-number>
की जगह, ज़रूरी शर्त के पहले चरण में दिए गए प्रोजेक्ट नंबर का नाम डालें.<region>
उपलब्ध इलाकों और ज़ोन में से उस इलाके का नाम डालें जिसका इस्तेमाल करना है. उदाहरण के लिए, हमus-central1
का इस्तेमाल कर सकते हैं.<zone>
, जो आपके चुने गए इलाके में उपलब्ध इलाकों और ज़ोन में से किसी ज़ोन का नाम हो. उदाहरण के लिए, अगर आपने इलाके के तौर परus-central1
चुना है, तो ज़ोन के तौर परus-central1-f
का इस्तेमाल किया जा सकता है. इस ज़ोन का इस्तेमाल, IoT डिवाइसों को सिम्युलेट करने वाली GCE वर्चुअल मशीन बनाने के लिए किया जाएगा. पक्का करें कि आपका ज़ोन, चुने गए इलाके में हो.<subnet-path>
, जिसमें ज़रूरी शर्त के दूसरे चरण में दिए गए सबनेट का पूरा पाथ शामिल हो. इसकी वैल्यू इस फ़ॉर्मैट में होनी चाहिए:projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
.<bucket-name>
में, ज़रूरी शर्त के तीसरे चरण में बताई गई जीसीएस बकेट का नाम डालें.
3. Google Managed Kafka को सेट अप करना
इस सेक्शन में, Google मैनेज किया जाने वाला Kafka क्लस्टर सेट अप किया जाता है. यह Kafka सर्वर को डिप्लॉय करता है और इस सर्वर पर एक विषय बनाता है. इस विषय पर, IoT डेटा को पब्लिश किया जा सकता है और सदस्यता लेने के बाद उसे पढ़ा जा सकता है. DemoIOT Solutions इस क्लस्टर को सेट अप कर सकता है, ताकि उसके सभी डिवाइस उसमें डेटा पब्लिश कर सकें.
मैनेज किया गया Kafka क्लस्टर बनाना
- मैनेज किया गया Kafka क्लस्टर बनाएं. यहां क्लस्टर का नाम
kafka-iot
है.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
आपको इस तरह का जवाब मिलना चाहिए:
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
क्लस्टर बनाने में 20 से 30 मिनट लगते हैं. इस कार्रवाई के पूरा होने का इंतज़ार करें.
कोई विषय बनाना
- क्लस्टर पर मैनेज किया जा सकने वाला Kafka विषय बनाएं. यहां, विषय का नाम
kafka-iot-topic
है.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
आपको इससे मिलता-जुलता आउटपुट दिखेगा:
Created topic [kafka-iot-topic].
4. पब्लिशर सेट अप करना
मैनेज किए जा रहे Kafka क्लस्टर पर पब्लिश करने के लिए, हमने Google Compute Engine का एक VM इंस्टेंस सेट अप किया है. यह इंस्टेंस, मैनेज किए जा रहे Kafka क्लस्टर के इस्तेमाल किए जा रहे सबनेट वाले वीपीसी को ऐक्सेस कर सकता है. यह VM, DemoIOT Solutions के दिए गए सेंसर डिवाइसों को सिम्युलेट करता है.
चरण
- Google Compute Engine VM इंस्टेंस बनाएं. यहां, GCE वीएम का नाम
publisher-instance
है.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Google Compute Engine के डिफ़ॉल्ट सेवा खाते को, Apache Kafka के लिए मैनेज की जाने वाली सेवा का इस्तेमाल करने की अनुमतियां दें.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- वीएम से कनेक्ट करने के लिए, एसएसएच का इस्तेमाल करें. इसके अलावा, एसएसएच के लिए Google Cloud Console का इस्तेमाल करें.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Kafka के कमांड लाइन टूल चलाने के लिए, Java इंस्टॉल करें. इसके बाद, इन कमांड का इस्तेमाल करके Kafka बाइनरी डाउनलोड करें.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- मैनेज की गई Kafka की पुष्टि करने वाली लाइब्रेरी और उसकी डिपेंडेंसी डाउनलोड करें. साथ ही, Kafka क्लाइंट प्रॉपर्टी को कॉन्फ़िगर करें.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
पब्लिशर मशीन के सेटअप के बारे में ज़्यादा जानकारी के लिए, क्लाइंट मशीन सेट अप करना लेख पढ़ें.
5. मैनेज किए गए Kafka पर पब्लिश करना
पब्लिशर सेट अप हो जाने के बाद, हम उस पर Kafka कमांड लाइन का इस्तेमाल करके, GCE VM (DemoIOT Solutions की मदद से IoT डिवाइसों को सिम्युलेट करना) से मैनेज किए जा रहे Kafka क्लस्टर में कुछ डमी डेटा पब्लिश कर सकते हैं.
- हमने GCE VM इंस्टेंस में एसएसएच किया है, इसलिए हमें
PROJECT_ID
वैरिएबल को फिर से पॉप्युलेट करना होगा:
export PROJECT_ID=<project-id>
export REGION=<region>
इनकी जगह ये डालें:
<project-id>
में, सेट अप किए गए GCP प्रोजेक्ट का नाम डालें.<region>
, उस क्षेत्र के साथ जिस क्षेत्र में Kafka क्लस्टर बनाया गया था
- Kafka बूटस्ट्रैप सर्वर का आईपी पता पाने के लिए,
managed-kafka clusters describe
कमांड का इस्तेमाल करें. इस पते का इस्तेमाल, Kafka क्लस्टर से कनेक्ट करने के लिए किया जा सकता है.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- क्लस्टर में मौजूद विषयों की सूची बनाएं:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
आपको नीचे दिया गया आउटपुट दिखेगा. इसमें, kafka-iot-topic
विषय शामिल होगा, जिसे हमने पहले बनाया था.
__remote_log_metadata
kafka-iot-topic
- इस स्क्रिप्ट को कॉपी करके किसी नई फ़ाइल
publish_iot_data.sh
में चिपकाएं. GCE वर्चुअल मशीन पर नई फ़ाइल बनाने के लिए,vim
याnano
जैसे टूल का इस्तेमाल किया जा सकता है.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
एक्सप्लेनेशंस
- यह स्क्रिप्ट, सिम्युलेट किए गए सेंसर रीडिंग के साथ JSON मैसेज बनाती है. इनमें डिवाइस आईडी, टाइमस्टैंप, सेंसर डेटा (तापमान, आर्द्रता, दबाव, रोशनी), जगह की जानकारी (अक्षांश, देशांतर), डिवाइस की स्थिति (बैटरी, सिग्नल, कनेक्शन टाइप), और कुछ मेटाडेटा शामिल होता है.
- यह एक तय संख्या में यूनीक डिवाइसों से लगातार मैसेज जनरेट करता है. हर डिवाइस, तय समयावधि में डेटा भेजता है. यह असल दुनिया के IoT डिवाइसों की नकल करता है. यहां हम 10 डिवाइसों का डेटा पब्लिश करते हैं. हर डिवाइस 10 सेकंड के अंतराल पर 20 रीडिंग जनरेट करता है.
- यह Kafka प्रोड्यूसर कमांड-लाइन टूल का इस्तेमाल करके, जनरेट किए गए सभी डेटा को Kafka विषय में पब्लिश भी करता है.
- स्क्रिप्ट में इस्तेमाल की जाने वाली कुछ डिपेंडेंसी इंस्टॉल करें - गणितीय कैलकुलेशन के लिए
bc
पैकेज और JSON प्रोसेसिंग के लिएjq
पैकेज.
sudo apt-get install bc jq
- स्क्रिप्ट को एक्सीक्यूटेबल बनाने के लिए उसमें बदलाव करें और स्क्रिप्ट चलाएं. इसे चलाने में करीब दो मिनट लगेंगे.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
इस निर्देश को चलाकर यह देखा जा सकता है कि इवेंट पब्लिश हो गए हैं या नहीं. यह निर्देश सभी इवेंट को प्रिंट करेगा. बाहर निकलने के लिए <control-c>
दबाएं.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Dataproc क्लस्टर सेट अप करना
यह सेक्शन, उस VPC सबनेटवर्क में Dataproc क्लस्टर बनाता है जहां मैनेज किया जा रहा Kafka क्लस्टर मौजूद है. इस क्लस्टर का इस्तेमाल, ऐसी जॉब चलाने के लिए किया जाएगा जो DemoIOT Solutions के लिए रीयल टाइम के आंकड़े और अहम जानकारी जनरेट करती हैं.
- Dataproc क्लस्टर बनाएं. यहां क्लस्टर का नाम
dataproc-iot
है.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
आपको इस तरह का जवाब मिलना चाहिए:
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
क्लस्टर बनाने में 10 से 15 मिनट लग सकते हैं. इस कार्रवाई के पूरा होने का इंतज़ार करें और क्लस्टर के बारे में बताकर देखें कि क्लस्टर RUNNING
स्टेटस में है या नहीं.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Dataproc का इस्तेमाल करके, Kafka मैसेज प्रोसेस करना
इस आखिरी सेक्शन में, आपको एक Dataproc जॉब सबमिट करनी होगी. यह जॉब, Spark Streaming का इस्तेमाल करके पब्लिश किए गए मैसेज प्रोसेस करेगी. यह जॉब असल में कुछ रीयल टाइम आंकड़े और अहम जानकारी जनरेट करती है. इसका इस्तेमाल DemoIOT Solutions कर सकता है.
process_iot.py
नाम की स्ट्रीमिंग PySpark जॉब फ़ाइल बनाने के लिए, यह कमांड चलाएं.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
एक्सप्लेनेशंस
- यह कोड, किसी खास Kafka विषय से डेटा पढ़ने के लिए PySpark स्ट्रक्चर्ड स्ट्रीमिंग जॉब सेट अप करता है. यह Kafka ब्रोकर से कनेक्ट करने और उसकी पुष्टि करने के लिए, दिए गए Kafka सर्वर के बूटस्ट्रैप पते और GCS कॉन्फ़िगरेशन फ़ाइल से लोड किए गए Kafka कॉन्फ़िगरेशन का इस्तेमाल करता है.
- यह सबसे पहले Kafka से रॉ डेटा को बाइट ऐरे की स्ट्रीम के तौर पर पढ़ता है और उन बाइट ऐरे को स्ट्रिंग में कास्ट करता है. साथ ही, डेटा के स्ट्रक्चर (डिवाइस आईडी, टाइमस्टैंप, जगह की जानकारी, सेंसर डेटा वगैरह) की जानकारी देने के लिए, Spark के StructType का इस्तेमाल करके
json_schema
लागू करता है. - यह झलक देखने के लिए, कंसोल में पहली 10 पंक्तियां प्रिंट करता है. साथ ही, हर सेंसर के लिए औसत तापमान का हिसाब लगाता है और
avro
फ़ॉर्मैट में GCS बकेट में सारा डेटा लिखता है. Avro, पंक्ति पर आधारित डेटा सीरियलाइज़ेशन सिस्टम है. यह स्ट्रक्चर्ड डेटा को स्कीमा के हिसाब से तय किए गए छोटे, बाइनरी फ़ॉर्मैट में बेहतर तरीके से सेव करता है. साथ ही, बड़े पैमाने पर डेटा प्रोसेस करने के लिए, स्कीमा में बदलाव करने, भाषा के हिसाब से डेटा को सेव करने, और डेटा को ज़्यादा कंप्रेस करने की सुविधा देता है.
client.properties
फ़ाइल बनाएं और kafka सर्वर के बूटस्ट्रैप पते के लिए, एनवायरमेंट वैरिएबल को पॉप्युलेट करें.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
process_iot.py
औरclient.properties
फ़ाइलों को अपनी Google Cloud Storage बकेट में अपलोड करें, ताकि Dataproc जॉब उनका इस्तेमाल कर सके.
gsutil cp process_iot.py client.properties $BUCKET
- Dataproc जॉब के लिए, अपनी GCS बकेट में कुछ डिपेंडेंसी जर्स कॉपी करें. इस डायरेक्ट्री में ऐसे jar शामिल हैं जो Kafka के साथ Spark Streaming जॉब चलाने के लिए ज़रूरी हैं. साथ ही, इसमें क्लाइंट मशीन सेट अप करना से ली गई, मैनेज की जा रही Kafka की पुष्टि करने वाली लाइब्रेरी और उसकी डिपेंडेंसी भी शामिल हैं.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Dataproc क्लस्टर में Spark जॉब सबमिट करें.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
Spark ड्राइवर लॉग प्रिंट हो जाएंगे. आपको ये टेबल, कंसोल में लॉग की गई टेबल और GCS बकेट में सेव किए गए डेटा के तौर पर भी दिखेंगी.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. व्यवस्थित करें
कोडलैब पूरा करने के बाद, संसाधनों को हटाने के लिए यह तरीका अपनाएं.
- मैनेज किया जा रहा Kafka क्लस्टर, पब्लिशर GCE वर्चुअल मशीन, और Dataproc क्लस्टर मिटाएं.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- अपना वीपीसी सबनेट और नेटवर्क मिटाएं.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- अगर आपको अब डेटा का इस्तेमाल नहीं करना है, तो अपनी GCS बकेट मिटाएं.
gcloud storage rm --recursive $BUCKET
9. बधाई हो
बधाई हो, आपने Manage Kafka और Dataproc की मदद से, IoT डेटा प्रोसेसिंग पाइपलाइन बनाई है. इससे DemoIOT Solutions को अपने डिवाइसों से पब्लिश किए गए डेटा के बारे में रीयल टाइम में अहम जानकारी मिलती है!
आपने मैनेज किया जा सकने वाला Kafka क्लस्टर बनाया, उसमें IoT इवेंट पब्लिश किए, और Dataproc जॉब चलाया. इस जॉब ने रीयल टाइम में इन इवेंट को प्रोसेस करने के लिए, Spark स्ट्रीमिंग का इस्तेमाल किया. अब आपको मैनेज की गई Kafka और Dataproc का इस्तेमाल करके, डेटा पाइपलाइन बनाने के लिए ज़रूरी चरणों के बारे में पता है.