অ্যাপাচি কাফকার জন্য Dataproc এবং Google পরিচালিত পরিষেবা ব্যবহার করে রিয়েল-টাইম IoT ডেটা প্রসেসিং

অ্যাপাচি কাফকার জন্য Dataproc এবং Google পরিচালিত পরিষেবা ব্যবহার করে রিয়েল-টাইম IoT ডেটা প্রসেসিং

এই কোডল্যাব সম্পর্কে

subjectজুন ১৬, ২০২৫-এ শেষবার আপডেট করা হয়েছে
account_circleDevanshi Khatsuriya-এর লেখা

1. ভূমিকা

2efacab8643a653b.png

শেষ আপডেট: 2024-06-10

পটভূমি

ইন্টারনেট অফ থিংস (IoT) ডিভাইসগুলি, স্মার্ট হোম সলিউশন থেকে শুরু করে শিল্প সেন্সর পর্যন্ত, নেটওয়ার্কের প্রান্তে প্রচুর পরিমাণে ডেটা তৈরি করে৷ ডিভাইস পর্যবেক্ষণ, ট্র্যাকিং, ডায়াগনস্টিকস, নজরদারি, ব্যক্তিগতকরণ, ফ্লিট অপ্টিমাইজেশন এবং আরও অনেক কিছুর মতো বিভিন্ন ব্যবহারের ক্ষেত্রে এই ডেটা অমূল্য। Apache Kafka-এর জন্য Google পরিচালিত পরিষেবা একটি OSS সামঞ্জস্যপূর্ণ, সহজে ব্যবহারযোগ্য এবং নিরাপদ উপায়ে ডেটার এই ক্রমাগত স্ট্রিমকে ইনজেস্ট এবং সংরক্ষণ করার জন্য একটি স্কেলযোগ্য এবং টেকসই উপায় অফার করে, যখন Google Cloud Dataproc Apache Spark এবং Hadoop ক্লাস্টার ব্যবহার করে ডেটা বিশ্লেষণের জন্য এই বৃহৎ ডেটাসেটগুলির প্রক্রিয়াকরণের অনুমতি দেয়৷

আপনি কি নির্মাণ করবেন

এই কোডল্যাবে, আপনি Apache Kafka, Dataproc, Python এবং Apache Spark এর জন্য Google পরিচালিত পরিষেবা ব্যবহার করে একটি IoT ডেটা প্রসেসিং পাইপলাইন তৈরি করতে যাচ্ছেন যা রিয়েল-টাইম বিশ্লেষণ করে। আপনার পাইপলাইন হবে:

  • GCE VM ব্যবহার করে IoT ডিভাইস থেকে একটি পরিচালিত কাফকা ক্লাস্টারে ডেটা প্রকাশ করুন
  • ম্যানেজ কাফকা ক্লাস্টার থেকে ডেটাপ্রোক ক্লাস্টারে ডেটা স্ট্রিম করুন
  • ডেটাপ্রোক স্পার্ক স্ট্রিমিং কাজ ব্যবহার করে ডেটা প্রক্রিয়া করুন

আপনি কি শিখবেন

  • কীভাবে গুগল পরিচালিত কাফকা এবং ডেটাপ্রোক ক্লাস্টার তৈরি করবেন
  • কিভাবে Dataproc ব্যবহার করে স্ট্রিমিং কাজ চালানো যায়

আপনি কি প্রয়োজন হবে

2. ওভারভিউ

এই কোডল্যাবের জন্য, আসুন একটি ডামি কোম্পানি ডেমোআইওটি সলিউশনের গল্প অনুসরণ করি। DemoIOT সমাধানগুলি সেন্সর ডিভাইসগুলি সরবরাহ করে যা তাপমাত্রা, আর্দ্রতা, চাপ, আলোর স্তর এবং অবস্থানের জন্য ডেটা পরিমাপ করে এবং প্রেরণ করে। তারা তাদের গ্রাহকদের রিয়েল টাইম পরিসংখ্যান দেখানোর জন্য এই ডেটা প্রক্রিয়া করে এমন পাইপলাইন সেট আপ করতে চায়। এই ধরনের পাইপলাইন ব্যবহার করে, তারা তাদের গ্রাহকদের বিভিন্ন ধরনের পরিষেবা প্রদান করতে পারে, যেমন নজরদারি, স্বয়ংক্রিয় পরামর্শ, সতর্কতা এবং গ্রাহকরা তাদের সেন্সর ইনস্টল করেছেন এমন জায়গা সম্পর্কে অন্তর্দৃষ্টি।

এটি করার জন্য, আমরা IoT ডিভাইসটি অনুকরণ করতে একটি GCE VM ব্যবহার করব। ডিভাইসটি Google পরিচালিত কাফকা ক্লাস্টারে একটি কাফকা বিষয়ের ডেটা প্রকাশ করবে, যা একটি Dataproc স্ট্রিমিং কাজের দ্বারা পড়া এবং প্রক্রিয়া করা হবে। পূর্বশর্ত সেটআপ এবং নিম্নলিখিত পৃষ্ঠাগুলি আপনাকে এই সমস্ত পদক্ষেপগুলি সম্পাদন করতে পরিচালিত করবে।

পূর্বশর্ত সেটআপ

  1. আপনার প্রকল্পের জন্য প্রকল্পের নাম এবং প্রকল্প নম্বর খুঁজুন। রেফারেন্সের জন্য প্রকল্পের নাম, নম্বর এবং আইডি খুঁজুন দেখুন।
  2. ভিপিসি সাবনেটওয়ার্ক। এটি GCE VM, Kafka ক্লাস্টার এবং Dataproc ক্লাস্টারের মধ্যে সংযোগের অনুমতি দেবে। GCloud CLI ব্যবহার করে বিদ্যমান সাবনেট তালিকাভুক্ত করতে এটি অনুসরণ করুন। প্রয়োজনে, একটি অটো মোড VPC নেটওয়ার্ক তৈরি করুন যা প্রতিটি Google ক্লাউড অঞ্চলে সাবনেটওয়ার্ক সহ একটি VPC নেটওয়ার্ক তৈরি করবে৷ যদিও, এই কোডল্যাবের উদ্দেশ্যে, আমরা শুধুমাত্র একটি অঞ্চল থেকে একটি সাবনেটওয়ার্ক ব্যবহার করব।
  • এই সাবনেটওয়ার্কে, নিশ্চিত করুন যে একটি ফায়ারওয়াল নিয়ম রয়েছে যা tcp:22 থেকে সমস্ত প্রবেশের অনুমতি দেয়, যার জন্য SSH প্রয়োজন। এই নিয়মটি একটি নেটওয়ার্ক তৈরি করার সময় ফায়ারওয়াল নিয়ম বিভাগের অধীনে নির্বাচন করার জন্য উপলব্ধ হবে, তাই আপনি এটি নির্বাচন করেছেন তা নিশ্চিত করুন।
  1. GCS বালতি। Dataproc কাজের সংস্থানগুলি সঞ্চয় করতে এবং প্রক্রিয়াকৃত ডেটা বজায় রাখতে আপনার Google ক্লাউড স্টোরেজ বালতিতে অ্যাক্সেসের প্রয়োজন হবে। আপনার যদি এটি না থাকে তবে আপনি আপনার GCP প্রকল্পে একটি তৈরি করতে পারেন।

পরিবেশ ভেরিয়েবল পপুলেট করুন

আপনার টার্মিনালে যেখানে আপনি gcloud CLI চালান, সেখানে এই পরিবেশ ভেরিয়েবলগুলি পূরণ করুন যাতে সেগুলি পরে ব্যবহার করা যায়।

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

নিম্নলিখিতগুলি প্রতিস্থাপন করুন:

  • আপনার সেট আপ করা জিসিপি প্রকল্পের নামের সাথে <project-id>
  • পূর্বশর্ত পদক্ষেপ 1 থেকে প্রকল্প নম্বরের নামের সাথে <project-number>
  • <region> উপলভ্য অঞ্চল এবং অঞ্চল থেকে একটি অঞ্চলের নামের সাথে যা আপনি ব্যবহার করতে চান। উদাহরণস্বরূপ, আমরা us-central1 ব্যবহার করতে পারি।
  • <zone> আপনার পূর্বে নির্বাচিত অঞ্চলের অধীনে উপলব্ধ অঞ্চল এবং অঞ্চল থেকে জোনের নামের সাথে। উদাহরণস্বরূপ, আপনি যদি অঞ্চল হিসাবে us-central1 নির্বাচন করেন, আপনি অঞ্চল হিসাবে us-central1-f ব্যবহার করতে পারেন। এই জোনটি GCE VM তৈরি করতে ব্যবহার করা হবে যা IoT ডিভাইসগুলিকে অনুকরণ করে। নিশ্চিত করুন যে আপনার জোনটি আপনার বেছে নেওয়া অঞ্চলে রয়েছে
  • <subnet-path> পূর্বশর্ত পদক্ষেপ 2 থেকে সাবনেটের সম্পূর্ণ পাথ সহ। এর মান অবশ্যই ফরম্যাটে হতে হবে: projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
  • পূর্বশর্ত পদক্ষেপ 3 থেকে GCS বাকেটের নামের সাথে <bucket-name>

3. Google পরিচালিত কাফকা সেট আপ করুন

এই বিভাগটি একটি Google পরিচালিত কাফকা ক্লাস্টার সেট আপ করে, যা কাফকা সার্ভারকে স্থাপন করে এবং এই সার্ভারে একটি বিষয় তৈরি করে যেখানে IoT ডেটা প্রকাশ করা যেতে পারে এবং এটিতে সদস্যতা নেওয়ার পরে পড়তে পারে। DemoIOT সমাধানগুলি এই ক্লাস্টারটি সেট আপ করতে পারে যাতে তাদের সমস্ত ডিভাইস এতে ডেটা প্রকাশ করে৷

একটি পরিচালিত কাফকা ক্লাস্টার তৈরি করুন

  • পরিচালিত কাফকা ক্লাস্টার তৈরি করুন। এখানে, ক্লাস্টারটির নাম kafka-iot
gcloud managed-kafka clusters create kafka-iot \
   
--project=$PROJECT_ID \
   
--location=$REGION \
   
--cpu=3 \
   
--memory=12GiB \
   
--subnets=$SUBNET_PATH \
   
--auto-rebalance

আপনি নিম্নলিখিত অনুরূপ একটি প্রতিক্রিয়া পেতে হবে:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                        
Created cluster [kafka-iot].

ক্লাস্টার তৈরিতে 20-30 মিনিট সময় লাগে। এই অপারেশন সমাপ্তির জন্য অপেক্ষা করুন.

একটি বিষয় তৈরি করুন

  • ক্লাস্টারে একটি পরিচালিত কাফকা বিষয় তৈরি করুন। এখানে, বিষয়ের নাম kafka-iot-topic
gcloud managed-kafka topics create kafka-iot-topic \
   
--cluster=kafka-iot \
   
--location=$REGION \
   
--partitions=10 \
   
--replication-factor=3

আপনি নিম্নলিখিত অনুরূপ একটি আউটপুট পাওয়া উচিত:

Created topic [kafka-iot-topic].

4. একজন প্রকাশক সেট আপ করুন

পরিচালিত কাফকা ক্লাস্টারে প্রকাশ করার জন্য, আমরা একটি Google Compute Engine VM ইন্সট্যান্স সেট আপ করি যা পরিচালিত কাফকা ক্লাস্টার দ্বারা ব্যবহৃত সাবনেট সম্বলিত VPC অ্যাক্সেস করতে পারে। এই VM ডেমোআইওটি সলিউশন দ্বারা প্রদত্ত সেন্সর ডিভাইসগুলিকে অনুকরণ করে৷

ধাপ

  1. Google Compute Engine VM দৃষ্টান্ত তৈরি করুন। এখানে, GCE VM-এর নাম হল publisher-instance
gcloud compute instances create publisher-instance \
   
--scopes=https://www.googleapis.com/auth/cloud-platform \
   
--subnet=$SUBNET_PATH \
   
--zone=$ZONE
  1. Google Compute Engine ডিফল্ট পরিষেবা অ্যাকাউন্টকে Apache Kafka-এর জন্য পরিচালিত পরিষেবা ব্যবহার করার অনুমতি দিন৷
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. VM এর সাথে সংযোগ করতে SSH ব্যবহার করুন। বিকল্পভাবে, SSH এ Google ক্লাউড কনসোল ব্যবহার করুন
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. কাফকা কমান্ড লাইন সরঞ্জামগুলি চালানোর জন্য জাভা ইনস্টল করুন এবং এই কমান্ডগুলি ব্যবহার করে কাফকা বাইনারি ডাউনলোড করুন।
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. পরিচালিত কাফকা প্রমাণীকরণ লাইব্রেরি এবং এর নির্ভরতা ডাউনলোড করুন এবং কাফকা ক্লায়েন্ট বৈশিষ্ট্যগুলি কনফিগার করুন।
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

প্রকাশক মেশিন সেটআপ সম্পর্কে আরও বিশদের জন্য, একটি ক্লায়েন্ট মেশিন সেট আপ করুন দেখুন।

5. পরিচালিত কাফকার কাছে প্রকাশ করুন

এখন যেহেতু প্রকাশক সেট আপ করা হয়েছে, আমরা এটিতে কাফকা কমান্ড লাইন ব্যবহার করে GCE VM (DemoIOT সলিউশন দ্বারা IoT ডিভাইসের অনুকরণ) থেকে পরিচালিত কাফকা ক্লাস্টারে কিছু ডামি ডেটা প্রকাশ করতে পারি।

  1. যেহেতু আমরা GCE VM দৃষ্টান্তে SSH করেছি, তাই আমাদের PROJECT_ID ভেরিয়েবলটিকে পুনরায় পূরণ করতে হবে:
export PROJECT_ID=<project-id>
export REGION=<region>

নিম্নলিখিতগুলি প্রতিস্থাপন করুন:

  • আপনার সেট আপ করা জিসিপি প্রকল্পের নামের সাথে <project-id>
  • <region> যে অঞ্চলে কাফকা ক্লাস্টার তৈরি হয়েছিল তার সাথে
  1. কাফকা বুটস্ট্র্যাপ সার্ভারের আইপি ঠিকানা পেতে managed-kafka clusters describe কমান্ড ব্যবহার করুন। এই ঠিকানাটি কাফকা ক্লাস্টারের সাথে সংযোগ করতে ব্যবহার করা যেতে পারে।
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. ক্লাস্টারে বিষয়গুলি তালিকাভুক্ত করুন:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

আপনি নিম্নলিখিত আউটপুটটি দেখতে সক্ষম হবেন, যার মধ্যে আমরা আগে তৈরি করেছি kafka-iot-topic

__remote_log_metadata
kafka
-iot-topic
  1. একটি নতুন ফাইল publish_iot_data.sh এই স্ক্রিপ্টটি অনুলিপি করুন এবং পেস্ট করুন। GCE VM-এ একটি নতুন ফাইল তৈরি করতে, আপনি vim বা nano মতো একটি টুল ব্যবহার করতে পারেন।
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data
.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data
() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data
() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status
() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka
() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

ব্যাখ্যা

  • এই স্ক্রিপ্টটি সিমুলেটেড সেন্সর রিডিং সহ JSON বার্তা তৈরি করে যাতে ডিভাইস আইডি, টাইমস্ট্যাম্প, সেন্সর ডেটা (তাপমাত্রা, আর্দ্রতা, চাপ, আলো), অবস্থানের তথ্য (অক্ষাংশ, দ্রাঘিমাংশ), ডিভাইসের স্থিতি (ব্যাটারি, সংকেত, সংযোগের ধরন) এবং কিছু মেটাডেটা থাকে।
  • এটি একটি নির্দিষ্ট সংখ্যক অনন্য ডিভাইস থেকে বার্তাগুলির একটি অবিচ্ছিন্ন প্রবাহ তৈরি করে, প্রতিটি একটি নির্দিষ্ট সময়ের ব্যবধানে ডেটা প্রেরণ করে, বাস্তব-বিশ্বের IoT ডিভাইসগুলির অনুকরণ করে। এখানে, আমরা 10টি ডিভাইস থেকে ডেটা প্রকাশ করি যা প্রতিটি 10 ​​সেকেন্ডের ব্যবধানে 20টি রিডিং তৈরি করে।
  • এটি কাফকা প্রযোজক কমান্ড লাইন টুল ব্যবহার করে কাফকা বিষয়ে সমস্ত উত্পন্ন ডেটা প্রকাশ করে।
  1. স্ক্রিপ্ট দ্বারা ব্যবহৃত কিছু নির্ভরতা ইনস্টল করুন - গাণিতিক গণনার জন্য bc প্যাকেজ এবং JSON প্রক্রিয়াকরণের জন্য jq প্যাকেজ।
sudo apt-get install bc jq
  1. একটি এক্সিকিউটেবল হতে স্ক্রিপ্টটি পরিবর্তন করুন এবং স্ক্রিপ্টটি চালান। এটি চালানোর জন্য প্রায় 2 মিনিট সময় নেওয়া উচিত।
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

আপনি পরীক্ষা করতে পারেন যে এই কমান্ডটি চালানোর মাধ্যমে ইভেন্টগুলি সফলভাবে প্রকাশিত হয়েছে যা সমস্ত ইভেন্ট মুদ্রণ করবে। প্রস্থান করতে <control-c> টিপুন।

kafka-console-consumer.sh \
   
--topic kafka-iot-topic \
   
--from-beginning \
   
--bootstrap-server $BOOTSTRAP \
   
--consumer.config client.properties

6. Dataproc ক্লাস্টার সেট আপ করুন

এই বিভাগটি ভিপিসি সাবনেটওয়ার্কে একটি ডেটাপ্রোক ক্লাস্টার তৈরি করে যেখানে পরিচালিত কাফকা ক্লাস্টার উপস্থিত রয়েছে। এই ক্লাস্টারটি এমন কাজ চালানোর জন্য ব্যবহার করা হবে যা ডেমোআইওটি সলিউশনের প্রয়োজনীয় রিয়েল টাইম পরিসংখ্যান এবং অন্তর্দৃষ্টি তৈরি করে।

  1. একটি Dataproc ক্লাস্টার তৈরি করুন। এখানে ক্লাস্টারটির নাম dataproc-iot
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

আপনি নিম্নলিখিত অনুরূপ একটি প্রতিক্রিয়া পেতে হবে:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

ক্লাস্টার তৈরি হতে 10-15 মিনিট সময় লাগতে পারে। এই অপারেশনের সফল সমাপ্তির জন্য অপেক্ষা করুন এবং ক্লাস্টারটির বর্ণনা দিয়ে ক্লাস্টারটি RUNNING অবস্থায় আছে কিনা তা পরীক্ষা করুন।

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. Dataproc ব্যবহার করে কাফকা বার্তা প্রসেস করুন

এই শেষ বিভাগে, আপনি একটি ডেটাপ্রোক কাজ জমা দেবেন যা স্পার্ক স্ট্রিমিং ব্যবহার করে প্রকাশিত বার্তাগুলিকে প্রক্রিয়া করে। এই কাজটি আসলে কিছু বাস্তব সময়ের পরিসংখ্যান এবং অন্তর্দৃষ্টি তৈরি করে যা ডেমোআইওটি সলিউশন ব্যবহার করতে পারে।

  1. স্থানীয়ভাবে process_iot.py নামে স্ট্রিমিং PySpark জব ফাইল তৈরি করতে এই কমান্ডটি চালান।
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
   
StructField("device_id", StringType()),
   
StructField("timestamp", StringType()),
   
StructField(
       
"location",
       
StructType([
           
StructField("latitude", FloatType()),
           
StructField("longitude", FloatType()),
       
]),
   
),
   
StructField(
       
"sensor_data",
       
StructType([
           
StructField("temperature", FloatType()),
           
StructField("humidity", FloatType()),
           
StructField("pressure", FloatType()),
           
StructField("light_level", IntegerType()),
       
]),
   
),
   
StructField(
       
"device_status",
       
StructType([
           
StructField("battery_level", IntegerType()),
           
StructField("signal_strength", IntegerType()),
           
StructField("connection_type", StringType()),
       
]),
   
),
   
StructField(
       
"metadata",
       
StructType([
           
StructField("sensor_type", StringType()),
           
StructField("unit_temperature", StringType()),
           
StructField("unit_humidity", StringType()),
           
StructField("unit_pressure", StringType()),
           
StructField("unit_light_level", StringType()),
           
StructField("firmware_version", StringType()),
       
]),
   
),
])
CLIENT_PROPERTY_KEYS = [
   
"security.protocol",
   
"sasl.mechanism",
   
"sasl.login.callback.handler.class",
   
"sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
   
# Parse client properties file
   
parsed_path = urlparse(client_properties_path)
   
if parsed_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

   
bucket_name = parsed_path.netloc
   
blob_name = parsed_path.path.lstrip("/")

   
client = storage.Client()
   
bucket = client.bucket(bucket_name)
   
blob = bucket.blob(blob_name)
   
file_content = "[DEFAULT]\n" + blob.download_as_text()

   
config = configparser.ConfigParser()
   
config.read_string(file_content)

   
client_properties = dict()
   
for key in CLIENT_PROPERTY_KEYS:
       
client_properties[key] = config.get("DEFAULT", key)

   
print(f"Client properties: {client_properties}")
   
return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
   
print("Starting initial data processing...")
   
initial_rows = (
       
spark.readStream.format("kafka")
       
.option("kafka.bootstrap.servers", bootstrap_server_address)
       
.option("startingOffsets", "earliest")
       
.option("subscribe", "kafka-iot-topic")
       
.option("kafka.security.protocol", client_properties["security.protocol"])
       
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
       
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
       
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
       
.load()
   
)

   
initial_rows = (
       
initial_rows.selectExpr("CAST(value AS STRING)")
       
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
       
.select("data.*")
   
)
   
   
# Print first 20 rows
   
def print_top_rows(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
           
batch_df.limit(20).show(truncate=False)

   
initial_query_print = initial_rows.writeStream \
       
.foreachBatch(print_top_rows) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_print)

   
# Calculate and print average temperatures
   
def process_initial_avg_temp(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
           
current_averages = (
               
batch_df.select("device_id", "sensor_data.temperature")
               
.groupBy("device_id")
               
.agg(avg("temperature").alias("average_temperature"))
               
.orderBy("device_id")
           
)
           
current_averages.show(truncate=False)

   
initial_query_avg_temp = initial_rows.writeStream \
       
.foreachBatch(process_initial_avg_temp) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_avg_temp)
   
   
# Write data to GCS
   
initial_data_gcs_writer = (
       
initial_rows.writeStream.outputMode("append")
       
.format("avro")
       
.option("path", store_data_gcs_path+"/tables/iot_avro/")
       
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
       
.trigger(once=True) \
       
.start()
   
)
   
queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

   
client_properties = get_client_properties(client_properties_path)

   
# Create SparkSession
   
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
   
   
queries_to_await = []

   
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
   
   
# Wait for all queries to terminate
   
for query in queries_to_await:
       
try:
           
query.awaitTermination()
       
except Exception as e:
           
print(f"Error awaiting query: {e}")
       
finally:
           
query.stop()

   
spark.stop()

if __name__ == "__main__":
   
if len(sys.argv) < 4:
       
print("Invalid number of arguments passed ", len(sys.argv))
       
print(
           
"Usage: ",
           
sys.argv[0],
           
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
       
)
       
raise ValueError("Invalid number of arguments passed.")

   
parsed_data_path = urlparse(sys.argv[3])
   
if parsed_data_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


   
main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

ব্যাখ্যা

  • এই কোডটি একটি নির্দিষ্ট কাফকা বিষয় থেকে ডেটা পড়ার জন্য একটি PySpark স্ট্রাকচার্ড স্ট্রিমিং কাজ সেট আপ করে। এটি কাফকা ব্রোকারের সাথে সংযোগ এবং প্রমাণীকরণের জন্য একটি GCS কনফিগারেশন ফাইল থেকে লোড করা কাফকা সার্ভার বুটস্ট্র্যাপ ঠিকানা এবং কাফকা কনফিগারেশন ব্যবহার করে।
  • এটি প্রথমে বাইট অ্যারেগুলির একটি স্ট্রীম হিসাবে কাফকার কাঁচা ডেটা পড়ে, এবং সেই বাইট অ্যারেগুলিকে স্ট্রিংগুলিতে কাস্ট করে এবং ডেটার কাঠামো (ডিভাইস আইডি, টাইমস্ট্যাম্প, অবস্থান, সেন্সর ডেটা ইত্যাদি) নির্দিষ্ট করতে স্পার্কের স্ট্রাকটাইপ ব্যবহার করে json_schema প্রয়োগ করে।
  • এটি পূর্বরূপের জন্য কনসোলে প্রথম 10টি সারি প্রিন্ট করে, সেন্সর প্রতি গড় তাপমাত্রা গণনা করে এবং avro ফরম্যাটে GCS বালতিতে সমস্ত ডেটা লেখে। অভ্র একটি সারি-ভিত্তিক ডেটা সিরিয়ালাইজেশন সিস্টেম যা দক্ষতার সাথে একটি কম্প্যাক্ট, স্কিমা-সংজ্ঞায়িত বাইনারি ফর্ম্যাটে কাঠামোগত ডেটা সঞ্চয় করে, স্কিমা বিবর্তন, ভাষা নিরপেক্ষতা এবং বড় আকারের ডেটা প্রক্রিয়াকরণের জন্য উচ্চ কম্প্রেশন প্রদান করে।
  1. client.properties ফাইলটি তৈরি করুন এবং কাফকা সার্ভারের বুটস্ট্র্যাপ ঠিকানার জন্য পরিবেশ ভেরিয়েবলটি তৈরি করুন।
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. আপনার Google ক্লাউড স্টোরেজ বালতিতে process_iot.py এবং client.properties ফাইলগুলি আপলোড করুন, যাতে সেগুলি Dataproc কাজের দ্বারা ব্যবহার করা যায়৷
gsutil cp process_iot.py client.properties $BUCKET
  1. Dataproc কাজের জন্য কিছু নির্ভরতা জার আপনার GCS বালতিতে কপি করুন। এই ডিরেক্টরিতে এমন জার রয়েছে যা কাফকার সাথে স্পার্ক স্ট্রিমিং কাজ চালানোর জন্য প্রয়োজন, এবং পরিচালিত কাফকা প্রমাণীকরণ লাইব্রেরি এবং এর নির্ভরতা, সেট আপ একটি ক্লায়েন্ট মেশিন থেকে নেওয়া।
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. ডাটাপ্রোক ক্লাস্টারে স্পার্ক কাজ জমা দিন।
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

স্পার্ক ড্রাইভার লগ প্রিন্ট করা হবে. আপনি কনসোলে লগ করা এই টেবিলগুলি এবং আপনার GCS বালতিতে সংরক্ষিত ডেটা দেখতে সক্ষম হবেন।

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. পরিষ্কার করুন

কোডল্যাব সম্পূর্ণ করার পরে সংস্থানগুলি পরিষ্কার করার পদক্ষেপগুলি অনুসরণ করুন৷

  1. পরিচালিত কাফকা ক্লাস্টার, প্রকাশক GCE VM এবং Dataproc ক্লাস্টার মুছুন।
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. আপনার ভিপিসি সাবনেটওয়ার্ক এবং নেটওয়ার্ক মুছুন।
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. আপনি যদি আর ডেটা ব্যবহার করতে না চান তাহলে আপনার GCS বালতি মুছুন।
gcloud storage rm --recursive $BUCKET

9. অভিনন্দন

অভিনন্দন, আপনি সফলভাবে Kafka এবং Dataproc পরিচালনার সাথে একটি IoT ডেটা প্রসেসিং পাইপলাইন তৈরি করেছেন যা DemoIOT সমাধানগুলিকে তাদের ডিভাইসগুলির দ্বারা প্রকাশিত ডেটার রিয়েল টাইম অন্তর্দৃষ্টি পেতে সাহায্য করে!

আপনি একটি পরিচালিত কাফকা ক্লাস্টার তৈরি করেছেন, এতে IoT ইভেন্টগুলি প্রকাশ করেছেন এবং একটি Dataproc কাজ চালিয়েছেন যা এই ইভেন্টগুলিকে রিয়েল টাইমে প্রক্রিয়া করতে স্পার্ক স্ট্রিমিং ব্যবহার করে। Managed Kafka এবং Dataproc ব্যবহার করে ডেটা পাইপলাইন তৈরি করার জন্য প্রয়োজনীয় মূল পদক্ষেপগুলি আপনি এখন জানেন৷

রেফারেন্স ডক্স