১. ভূমিকা

সর্বশেষ হালনাগাদ: ২০২৫-০৬-১৯
চেঞ্জ ডেটা ক্যাপচার কী?
চেঞ্জ ডেটা ক্যাপচার (সিডিএস) হলো একগুচ্ছ সফটওয়্যার ডিজাইন প্যাটার্ন, যা একটি ডাটাবেসে পরিবর্তিত ডেটা শনাক্ত ও ট্র্যাক করতে ব্যবহৃত হয়। সহজ ভাষায় বলতে গেলে, এটি ডেটার পরিবর্তনগুলো ধারণ ও রেকর্ড করার একটি উপায়, যাতে সেই পরিবর্তনগুলো অন্যান্য সিস্টেমেও প্রতিলিপি করা যায়।
ডেটা মাইগ্রেশন, রিয়েল-টাইম ডেটা ওয়্যারহাউজিং ও অ্যানালিটিক্স, ডিজাস্টার রিকভারি ও হাই অ্যাভেইলেবিলিটি, অডিট ও কমপ্লায়েন্স ইত্যাদির মতো বিভিন্ন ডেটা-নির্ভর ক্ষেত্রে চেঞ্জ ডেটা ক্যাপচার (সিডিসি) অত্যন্ত কার্যকর।
ডেটা মাইগ্রেশন
সিডিসি পর্যায়ক্রমিক ডেটা স্থানান্তরের সুযোগ দিয়ে, ডাউনটাইম কমিয়ে এবং বিঘ্ন ন্যূনতম করে ডেটা মাইগ্রেশন প্রকল্পগুলোকে সহজ করে তোলে।
রিয়েল-টাইম ডেটা ওয়্যারহাউজিং এবং অ্যানালিটিক্স
সিডিসি নিশ্চিত করে যে ডেটা ওয়্যারহাউস এবং অ্যানালিটিক্যাল সিস্টেমগুলো অপারেশনাল ডেটাবেস থেকে প্রাপ্ত সর্বশেষ পরিবর্তনগুলোর মাধ্যমে ক্রমাগত আপডেট করা হয়।
এর ফলে ব্যবসা প্রতিষ্ঠানগুলো রিয়েল-টাইম তথ্যের ভিত্তিতে সিদ্ধান্ত নিতে পারে।
দুর্যোগ পুনরুদ্ধার এবং উচ্চ প্রাপ্যতা
দুর্যোগ পুনরুদ্ধারের উদ্দেশ্যে সিডিসি সেকেন্ডারি ডেটাবেসে রিয়েল-টাইমে ডেটা প্রতিলিপি তৈরি করতে সক্ষম করে। কোনো ব্যর্থতার ক্ষেত্রে, সিডিসি দ্রুত একটি সেকেন্ডারি ডেটাবেসে ফেইলওভারের সুযোগ দেয়, যার ফলে ডাউনটাইম এবং ডেটা হারানোর ঝুঁকি কমে আসে।
নিরীক্ষা এবং সম্মতি
সিডিসি ডেটা পরিবর্তনের একটি বিশদ নিরীক্ষা বিবরণী প্রদান করে, যা সম্মতি এবং নিয়ন্ত্রক প্রয়োজনীয়তার জন্য অপরিহার্য।
আপনি যা তৈরি করবেন
এই কোডল্যাবে, আপনি ক্লাউড পাব/সাব, ডেটাপ্রক, পাইথন এবং অ্যাপাচি স্পার্ক ব্যবহার করে একটি চেঞ্জ-ডেটা-ক্যাপচার (CDC) ডেটা পাইপলাইন তৈরি করবেন। আপনার পাইপলাইনটি যা করবে:
- ডাটাবেস পরিবর্তনগুলি সিমুলেট করুন এবং সেগুলিকে ক্লাউড পাব/সাব-এ ইভেন্ট হিসাবে প্রকাশ করুন, যা একটি পরিমাপযোগ্য এবং নির্ভরযোগ্য মেসেজিং পরিষেবা।
- এই ইভেন্টগুলোকে রিয়েল-টাইমে প্রসেস করতে গুগল ক্লাউডের পরিচালিত স্পার্ক ও হ্যাডুপ পরিষেবা ডেটাপ্রক-এর শক্তিকে কাজে লাগান।
এই পরিষেবাগুলোকে সংযুক্ত করার মাধ্যমে, আপনি একটি শক্তিশালী পাইপলাইন তৈরি করবেন যা ডেটার পরিবর্তনগুলো ঘটার সাথে সাথেই গ্রহণ ও প্রক্রিয়াকরণ করতে সক্ষম হবে এবং রিয়েল-টাইম অ্যানালিটিক্স, ডেটা ওয়্যারহাউজিং ও অন্যান্য গুরুত্বপূর্ণ অ্যাপ্লিকেশনের জন্য একটি ভিত্তি প্রদান করবে।
আপনি যা শিখবেন
- কীভাবে একটি মৌলিক পরিবর্তন ডেটা ক্যাপচার পাইপলাইন তৈরি করবেন
- স্ট্রিম প্রক্রিয়াকরণের জন্য ডেটাপ্রক
- রিয়েল-টাইম মেসেজিংয়ের জন্য ক্লাউড পাব/সাব
- অ্যাপাচি স্পার্কের মৌলিক বিষয়াবলী
এই কোডল্যাবটি ডেটাপ্রক এবং ক্লাউড পাব/সাব-এর উপর কেন্দ্র করে তৈরি। অপ্রাসঙ্গিক ধারণা এবং কোড ব্লকগুলো সংক্ষেপে আলোচনা করা হয়েছে এবং এগুলো শুধু কপি-পেস্ট করার জন্য দেওয়া হয়েছে।
আপনার যা যা লাগবে
- প্রজেক্ট সেট আপ করা একটি সক্রিয় GCP অ্যাকাউন্ট থাকতে হবে। যদি আপনার এমন কোনো অ্যাকাউন্ট না থাকে, তবে আপনি একটি ফ্রি ট্রায়ালের জন্য সাইন আপ করতে পারেন।
- gcloud CLI ইনস্টল এবং কনফিগার করা হয়েছে।
- ডাটাবেস পরিবর্তন অনুকরণ করতে এবং পাব/সাব-এর সাথে কাজ করার জন্য পাইথন ৩.৭+ ইনস্টল করা আছে।
- ডেটাপ্রক, ক্লাউড পাব/সাব, অ্যাপাচি স্পার্ক এবং পাইথন সম্পর্কে প্রাথমিক জ্ঞান।
শুরু করার আগে
প্রয়োজনীয় এপিআইগুলো সক্রিয় করতে টার্মিনালে নিম্নলিখিত কমান্ডটি চালান:
gcloud services enable \
dataproc.googleapis.com \
pubsub.googleapis.com \
২. ক্লাউড পাব/সাব সেট আপ করুন
একটি বিষয় তৈরি করুন
এই টপিকটি ডাটাবেসের পরিবর্তনগুলো প্রকাশ করতে ব্যবহৃত হবে। ডেটাপ্রক জব এই মেসেজগুলোর গ্রাহক হবে এবং চেঞ্জ ডেটা ক্যাপচারের জন্য মেসেজগুলো প্রসেস করবে। আপনি যদি টপিক সম্পর্কে আরও জানতে চান, তাহলে এখান থেকে অফিসিয়াল ডকুমেন্টেশন পড়তে পারেন।
gcloud pubsub topics create database-changes
একটি সাবস্ক্রিপশন তৈরি করুন
একটি সাবস্ক্রিপশন তৈরি করুন যা পাব/সাব-এ মেসেজ গ্রহণ করতে ব্যবহৃত হবে। সাবস্ক্রিপশন সম্পর্কে আরও জানতে, আপনি এখানে অফিসিয়াল ডকুমেন্টেশন পড়তে পারেন।
gcloud pubsub subscriptions create --topic database-changes change-subscriber
৩. ডাটাবেস পরিবর্তন অনুকরণ করুন
পদক্ষেপ
- ডাটাবেস পরিবর্তনগুলি সিমুলেট করতে এবং সেগুলিকে পাব/সাব-এ প্রকাশ করতে একটি পাইথন স্ক্রিপ্ট (যেমন,
simulate_cdc.py) তৈরি করুন।
from google.cloud import pubsub_v1
import json
import time
import random
project_id = "YOUR_PROJECT_ID" # Replace with your GCP project ID
topic_id = "database-changes"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
def publish_message(data):
data_str = json.dumps(data).encode("utf-8")
future = publisher.publish(topic_path, data=data_str)
print(f"Published message ID: {future.result()}")
def simulate_change():
change_types = ["INSERT", "UPDATE", "DELETE"]
change_type = random.choice(change_types)
record_id = random.randint(1, 100)
timestamp = time.time()
change_data = {
"change_type": change_type,
"record_id": record_id,
"timestamp": timestamp,
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
}
publish_message(change_data)
if __name__ == "__main__":
while True:
simulate_change()
time.sleep(2) # Simulate changes every 2 seconds
YOUR_PROJECT_ID-এর জায়গায় আপনার আসল GCP প্রজেক্ট আইডি বসান।
- পাব/সাব ক্লায়েন্ট লাইব্রেরি ইনস্টল করুন:
pip install google-cloud-pubsub
- আপনার টার্মিনালে স্ক্রিপ্টটি চালান। এই স্ক্রিপ্টটি একটানা চলতে থাকবে এবং প্রতি ২ সেকেন্ড পর পর পাব/সাব টপিকে মেসেজ প্রকাশ করবে।
python simulate_cdc.py
- ধরা যাক, স্ক্রিপ্টটি ১ মিনিট চালানোর পর, আপনার পাব/সাব-এ পড়ার জন্য যথেষ্ট মেসেজ জমা হবে। আপনার অপারেটিং সিস্টেমের ওপর নির্ভর করে, আপনি ctrl + C অথবা Cmd + C চেপে চলমান পাইথন স্ক্রিপ্টটি বন্ধ করতে পারেন।
- প্রকাশিত বার্তাগুলো দেখুন:
প্রকাশিত বার্তাগুলো দেখতে অন্য একটি টার্মিনাল খুলুন এবং নিম্নলিখিত কমান্ডটি চালান:
gcloud pubsub subscriptions pull --auto-ack change-subscriber
আপনি বার্তা এবং অন্যান্য ক্ষেত্র সম্বলিত একটি টেবিলের সারি দেখতে পাবেন:
{"পরিবর্তনের ধরণ": "আপডেট", "রেকর্ড আইডি": 10, "টাইমস্ট্যাম্প": 1742466264.888465, "ডেটা": {"ফিল্ড1": "মান1", "ফিল্ড2": "মান2"}}
ব্যাখ্যা
- পাইথন স্ক্রিপ্টটি এলোমেলোভাবে
INSERT,UPDATEবাDELETEইভেন্ট তৈরি করার মাধ্যমে ডাটাবেসের পরিবর্তন অনুকরণ করে। - প্রতিটি পরিবর্তন একটি JSON অবজেক্ট হিসাবে উপস্থাপিত হয়, যাতে পরিবর্তনের ধরণ, রেকর্ড আইডি, টাইমস্ট্যাম্প এবং ডেটা অন্তর্ভুক্ত থাকে।
- স্ক্রিপ্টটি এই পরিবর্তন ইভেন্টগুলিকে
database-changes' টপিকে প্রকাশ করার জন্য ক্লাউড পাব/সাব ক্লায়েন্ট লাইব্রেরি ব্যবহার করে। - সাবস্ক্রাইবার কমান্ডের মাধ্যমে আপনি পাব/সাব টপিকে পাঠানো মেসেজগুলো দেখতে পারেন।
৪. ডেটাপ্রোকের জন্য একটি পরিষেবা অ্যাকাউন্ট তৈরি করুন।
এই অংশে, আপনি একটি সার্ভিস অ্যাকাউন্ট তৈরি করবেন যা ডেটাপ্রোক ক্লাস্টার ব্যবহার করতে পারবে। এছাড়াও, ক্লাস্টার ইনস্ট্যান্সগুলোকে ক্লাউড পাব/সাব এবং ডেটাপ্রোক অ্যাক্সেস করার অনুমতি দেওয়ার জন্য প্রয়োজনীয় পারমিশনগুলোও আপনি নির্ধারণ করবেন।
- একটি পরিষেবা অ্যাকাউন্ট তৈরি করুন:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
- সার্ভিস অ্যাকাউন্টকে ক্লাস্টার তৈরি করতে এবং জব চালাতে অনুমতি দেওয়ার জন্য Dataproc ওয়ার্কার রোলটি যোগ করুন। পূর্ববর্তী কমান্ডে তৈরি করা সার্ভিস অ্যাকাউন্ট আইডিটি নিচের কমান্ডে সদস্য হিসেবে যোগ করুন:
export PROJECT=$(gcloud info --format='value(config.project)')
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/dataproc.worker \
--member="serviceAccount:<your-service-account-with-domain>"
- সার্ভিস অ্যাকাউন্টকে 'change-subscriber' পাব/সাব সাবস্ক্রিপশনে সাবস্ক্রাইব করার অনুমতি দিতে পাব/সাব সাবস্ক্রাইবার রোলটি যোগ করুন:
gcloud beta pubsub subscriptions add-iam-policy-binding \
change-subscriber \
--role roles/pubsub.subscriber \
--member="serviceAccount:<your-service-account-with-domain"
৫. একটি ডেটাপ্রোক ক্লাস্টার তৈরি করুন
ডেটাপ্রোক ক্লাস্টারটি স্পার্ক অ্যাপটি চালাবে, যা পাব/সাব-এ থাকা মেসেজগুলো প্রসেস করবে। এর জন্য আপনার পূর্ববর্তী বিভাগে তৈরি করা সার্ভিস অ্যাকাউন্টটির প্রয়োজন হবে। ডেটাপ্রোক ক্লাস্টারের প্রতিটি ইনস্ট্যান্সে এই সার্ভিস অ্যাকাউন্টটি বরাদ্দ করে, যাতে সমস্ত ইনস্ট্যান্স অ্যাপটি চালানোর জন্য সঠিক অনুমতি পায়।
একটি Dataproc ক্লাস্টার তৈরি করতে নিম্নলিখিত কমান্ডটি ব্যবহার করুন:
gcloud dataproc clusters create cdc-dataproc-cluster \
--region=us-central1 \
--zone=us-central1-a \
--scopes=pubsub,datastore \
--image-version=1.3 \
--service-account="<your-service-account-with-domain-id>"
৬. স্পার্ক জবটি ডেটাপ্রোক ক্লাস্টারে জমা দিন।
স্পার্ক স্ট্রিমিং অ্যাপটি পাব/সাব পদ্ধতিতে ডাটাবেস পরিবর্তনের মেসেজগুলো প্রসেস করে এবং কনসোলে প্রিন্ট করে।
পদক্ষেপ
- একটি ডিরেক্টরি তৈরি করুন এবং কনজিউমারের সোর্স কোডটি PubsubConsumer.scala ফাইলে যোগ করুন।
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object PubsubConsumer {
def createContext(projectID: String, checkpointDirectory: String)
: StreamingContext = {
// [START stream_setup]
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)
// Create stream
val messagesStream: DStream[String] = PubsubUtils
.createStream(
ssc,
projectID,
None,
"change-subscriber", // Cloud Pub/Sub subscription for incoming database updates
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
// [END stream_setup]
processStringDStream(messagesStream)
ssc
}
def processStringDStream(stringDStream: DStream[String]): Unit = {
stringDStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val listOfStrings: List[String] = rdd.collect().toList
listOfStrings.foreach(str => println(s"message received: $str"))
} else {
println("looking for message...")
}
}
}
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("arguments are not passed correctly!")
System.exit(1)
}
val Seq(projectID, checkpointDirectory) = args.toSeq
// Create Spark context
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(projectID, checkpointDirectory))
// Start streaming until we receive an explicit termination
ssc.start()
ssc.awaitTermination()
}
}
- pom.xml-এ নিম্নলিখিতটি তৈরি করে যোগ করুন।
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<jvm.options.xms>-Xms512M</jvm.options.xms>
<jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
<jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
<jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
</properties>
<groupId>dataproc-spark-demos</groupId>
<artifactId>spark-streaming-pubsub-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastore</artifactId>
<version>1.34.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-pubsub_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.11</artifactId>
<version>1.14.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>demo.PubsubConsumer</mainClass>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>repackaged.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- প্রজেক্টের স্পার্ক ডিরেক্টরিতে যান এবং পরবর্তীতে ব্যবহারের জন্য পাথটি একটি এনভায়রনমেন্ট ভেরিয়েবলে সংরক্ষণ করুন:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
- ডিরেক্টরি পরিবর্তন করুন:
cd $REPO_ROOT/spark
- জাভা ১.৮ ডাউনলোড করে ফোল্ডারটি /usr/lib/jvm/ এ রাখুন। তারপর আপনার JAVA_HOME পরিবর্তন করে এটি নির্দেশ করুন:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
- অ্যাপ্লিকেশন জার তৈরি করুন
mvn clean package
অ্যাপ্লিকেশন কোড এবং নির্ভরতা ধারণকারী spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar আর্কাইভটি spark/target ডিরেক্টরিতে তৈরি করা হয়।
- স্পার্ক অ্যাপ্লিকেশনটি জমা দিন:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"
gcloud dataproc jobs submit spark \
--cluster cdc-dataproc-cluster \
--region us-central1 \
--async \
--jar target/$JAR \
--max-failures-per-hour 10 \
--properties $SPARK_PROPERTIES \
-- $ARGUMENTS
- সক্রিয় কাজগুলির তালিকা প্রদর্শন করুন এবং কাজটির জন্য
JOB_IDমানটি নোট করুন:
gcloud dataproc jobs list --region=us-central1 --state-filter=active
আউটপুটটি দেখতে অনেকটা এইরকম হবে
JOB_ID TYPE STATUS
473ecb6d14e2483cb88a18988a5b2e56 spark RUNNING
- আপনার ব্রাউজারে নিম্নলিখিত URL-টি খুলে কাজের আউটপুট দেখুন। [JOB_ID]-এর জায়গায় পূর্ববর্তী ধাপে উল্লেখিত মানটি বসান।
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
- আউটপুটটি নিম্নলিখিতের অনুরূপ:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
Dataproc-এ চলমান স্পার্ক স্ট্রিমিং জবটি Pub/sub থেকে মেসেজ গ্রহণ করে, সেগুলোকে প্রসেস করে এবং আউটপুট কনসোলে প্রদর্শন করে।
- জবটি বন্ধ করা: জবটি বন্ধ করতে নিম্নলিখিত কমান্ডটি চালান। JOB_ID-এর জায়গায় পূর্বে উল্লেখিত একই আইডিটি বসান।
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet
অভিনন্দন! আপনি এইমাত্র একটি শক্তিশালী সিডিসি পাইপলাইন তৈরি করেছেন, যা পাব/সাব (Pub/sub) পদ্ধতিতে ডাটাবেসের পরিবর্তনগুলো গ্রহণ করে এবং ক্লাউড ডেটাপ্রোক (Cloud Dataproc)-এ চলমান স্পার্ক স্ট্রিমিং (spark streaming) ব্যবহার করে সেগুলোকে প্রসেস করে।
৭. পরিষ্কার করুন
আপনার তৈরি করা সমস্ত রিসোর্স মুছে ফেলুন, যাতে ভবিষ্যতে সেগুলোর জন্য আপনাকে বিল করা না হয়। বিলিং বন্ধ করার সবচেয়ে সহজ উপায় হলো টিউটোরিয়ালের জন্য তৈরি করা প্রজেক্টটি ডিলিট করে দেওয়া। বিকল্পভাবে, আপনি আলাদা আলাদা রিসোর্সও ডিলিট করতে পারেন।
স্বতন্ত্র রিসোর্সগুলো মুছে ফেলার জন্য নিম্নলিখিত কমান্ডগুলো চালান।
gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet
৮. অভিনন্দন
অভিনন্দন, আপনি এইমাত্র একটি হাতে-কলমে কোডল্যাব সম্পন্ন করেছেন, যেখানে দেখানো হয়েছে কীভাবে গুগল ক্লাউড প্ল্যাটফর্ম ব্যবহার করে একটি শক্তিশালী রিয়েল-টাইম ডেটা পাইপলাইন তৈরি করতে হয়। চলুন দেখে নেওয়া যাক আপনি কী কী অর্জন করেছেন:
- সিমুলেটেড চেঞ্জ ডেটা ক্যাপচার (সিডিসি): আপনি সিডিসি-র মূল বিষয়গুলো শিখেছেন এবং ডাটাবেস পরিবর্তন সিমুলেট করার জন্য একটি পাইথন স্ক্রিপ্ট ইমপ্লিমেন্ট করেছেন, যা রিয়েল-টাইম ডেটা পরিবর্তনের প্রতিনিধিত্বকারী ইভেন্ট তৈরি করে।
- ক্লাউড পাব/সাব-এর ব্যবহার: আপনি ক্লাউড পাব/সাব টপিক এবং সাবস্ক্রিপশন সেট আপ করেন, যা আপনার সিডিসি ইভেন্টগুলো স্ট্রিমিং করার জন্য একটি পরিমাপযোগ্য ও নির্ভরযোগ্য মেসেজিং পরিষেবা প্রদান করে। আপনি আপনার সিমুলেটেড ডাটাবেস পরিবর্তনগুলো পাব/সাব-এ প্রকাশ করেন, যা একটি রিয়েল-টাইম ডেটা স্ট্রিম তৈরি করে।
- Dataproc এবং Spark দিয়ে ডেটা প্রক্রিয়াকরণ: আপনি একটি Dataproc ক্লাস্টার প্রোভিশন করেছেন এবং আপনার Pub/Sub সাবস্ক্রিপশন থেকে মেসেজ গ্রহণ করার জন্য একটি Spark Streaming জব ডেপ্লয় করেছেন। আপনি রিয়েল-টাইমে আগত CDC ইভেন্টগুলো প্রসেস ও ট্রান্সফর্ম করেছেন এবং ফলাফলগুলো আপনার Dataproc জব লগ-এ প্রদর্শন করেছেন।
- একটি এন্ড-টু-এন্ড রিয়েল-টাইম পাইপলাইন তৈরি করেছেন: আপনি সফলভাবে এই পরিষেবাগুলিকে একীভূত করে একটি সম্পূর্ণ ডেটা পাইপলাইন তৈরি করেছেন যা রিয়েল-টাইমে ডেটার পরিবর্তনগুলি গ্রহণ, প্রবাহ এবং প্রক্রিয়াকরণ করে। আপনি এমন একটি সিস্টেম তৈরিতে বাস্তব অভিজ্ঞতা অর্জন করেছেন যা অবিচ্ছিন্ন ডেটা প্রবাহ পরিচালনা করতে পারে।
- Spark Pub/Sub কানেক্টর ব্যবহার করা হয়েছে: আপনি সফলভাবে Spark Pub/Sub কানেক্টর ব্যবহার করার জন্য একটি Dataproc ক্লাস্টার কনফিগার করেছেন, যা Spark Structured Streaming-এর Pub/Sub থেকে ডেটা পড়ার জন্য অত্যন্ত গুরুত্বপূর্ণ।
রিয়েল-টাইম অ্যানালিটিক্স, ডেটা ওয়্যারহাউজিং এবং মাইক্রোসার্ভিসেস আর্কিটেকচার সহ বিভিন্ন অ্যাপ্লিকেশনের জন্য আরও জটিল ও অত্যাধুনিক ডেটা পাইপলাইন তৈরির একটি মজবুত ভিত্তি এখন আপনার রয়েছে। অন্বেষণ এবং নির্মাণ চালিয়ে যান!