Dataproc এবং Cloud Pub/Sub ব্যবহার করে বিল্ডিং পরিবর্তন ডেটা ক্যাপচার

1. ভূমিকা

df8070bd84336207.png

শেষ আপডেট: 2025-06-19

পরিবর্তন ডাটা ক্যাপচার কি?

চেঞ্জ ডাটা ক্যাপচার (সিডিসি) হল সফ্টওয়্যার ডিজাইন প্যাটার্নের একটি সেট যা ডেটাবেসে পরিবর্তিত ডেটা নির্ধারণ এবং ট্র্যাক করতে ব্যবহৃত হয়। সহজ কথায়, এটি ডেটাতে করা পরিবর্তনগুলি ক্যাপচার এবং রেকর্ড করার একটি উপায় যাতে সেই পরিবর্তনগুলি অন্যান্য সিস্টেমে প্রতিলিপি করা যায়।

ডেটা মাইগ্রেশন, রিয়েল-টাইম ডেটা গুদামজাতকরণ এবং বিশ্লেষণ, দুর্যোগ পুনরুদ্ধার এবং উচ্চ প্রাপ্যতা, অডিট এবং কমপ্লায়েন্স ইত্যাদির মতো ডেটা-চালিত পরিস্থিতিগুলির একটি বিস্তৃত পরিসরে পরিবর্তন ডেটা ক্যাপচার (CDC) অবিশ্বাস্যভাবে কার্যকর।

ডেটা মাইগ্রেশন

সিডিসি ক্রমবর্ধমান ডেটা স্থানান্তর, ডাউনটাইম হ্রাস এবং বিঘ্ন হ্রাস করার অনুমতি দিয়ে ডেটা মাইগ্রেশন প্রকল্পগুলিকে সহজ করে।

রিয়েল-টাইম ডেটা গুদামজাতকরণ এবং বিশ্লেষণ

CDC নিশ্চিত করে যে ডেটা গুদাম এবং বিশ্লেষণাত্মক সিস্টেমগুলি অপারেশনাল ডাটাবেস থেকে সাম্প্রতিক পরিবর্তনগুলির সাথে ক্রমাগত আপডেট করা হয়।

এটি ব্যবসাগুলিকে রিয়েল-টাইম তথ্যের উপর ভিত্তি করে সিদ্ধান্ত নিতে দেয়।

দুর্যোগ পুনরুদ্ধার এবং উচ্চ প্রাপ্যতা

CDC দুর্যোগ পুনরুদ্ধারের উদ্দেশ্যে সেকেন্ডারি ডাটাবেসে ডেটার রিয়েল-টাইম প্রতিলিপি সক্ষম করে। ব্যর্থতার ক্ষেত্রে, সিডিসি একটি সেকেন্ডারি ডাটাবেসে দ্রুত ব্যর্থতার অনুমতি দেয়, ডাউনটাইম এবং ডেটা ক্ষতি কমিয়ে দেয়।

অডিট এবং কমপ্লায়েন্স

CDC ডেটা পরিবর্তনের একটি বিস্তারিত অডিট ট্রেল প্রদান করে, যা সম্মতি এবং নিয়ন্ত্রক প্রয়োজনীয়তার জন্য অপরিহার্য।

আপনি কি নির্মাণ করবেন

এই কোডল্যাবে, আপনি Cloud Pub/Sub, Dataproc, Python এবং Apache Spark ব্যবহার করে একটি পরিবর্তন-ডেটা-ক্যাপচার (CDC) ডেটা পাইপলাইন তৈরি করতে যাচ্ছেন। আপনার পাইপলাইন হবে:

  • ডাটাবেসের পরিবর্তনগুলি অনুকরণ করুন এবং সেগুলিকে ক্লাউড পাব/সাব-এ ইভেন্ট হিসাবে প্রকাশ করুন, একটি পরিমাপযোগ্য এবং নির্ভরযোগ্য মেসেজিং পরিষেবা৷
  • এই ইভেন্টগুলিকে রিয়েল-টাইমে প্রক্রিয়া করতে Dataproc, Google ক্লাউড-এর পরিচালিত স্পার্ক এবং Hadoop পরিষেবার শক্তি ব্যবহার করুন৷

এই পরিষেবাগুলিকে সংযুক্ত করার মাধ্যমে, আপনি একটি শক্তিশালী পাইপলাইন তৈরি করবেন যা ডেটা পরিবর্তনগুলি ঘটলে ক্যাপচার এবং প্রক্রিয়াকরণ করতে সক্ষম হবে, যা রিয়েল-টাইম বিশ্লেষণ, ডেটা গুদামজাতকরণ এবং অন্যান্য গুরুত্বপূর্ণ অ্যাপ্লিকেশনগুলির জন্য একটি ভিত্তি প্রদান করবে।

আপনি কি শিখবেন

এই কোডল্যাবটি ডেটাপ্রোক এবং ক্লাউড পাব/সাবের উপর দৃষ্টি নিবদ্ধ করে। অ-প্রাসঙ্গিক ধারণা এবং কোড ব্লকগুলিকে চকচকে করা হয়েছে এবং আপনাকে কেবল অনুলিপি এবং পেস্ট করার জন্য সরবরাহ করা হয়েছে।

আপনি কি প্রয়োজন হবে

  • একটি প্রকল্প সেট আপ সহ একটি সক্রিয় GCP অ্যাকাউন্ট। আপনার যদি এটি না থাকে তবে আপনি একটি বিনামূল্যে ট্রায়ালের জন্য সাইন আপ করতে পারেন৷
  • gcloud CLI ইনস্টল এবং কনফিগার করা হয়েছে।
  • Python 3.7+ ডাটাবেস পরিবর্তনের অনুকরণ এবং পাব/সাবের সাথে ইন্টারঅ্যাক্ট করার জন্য ইনস্টল করা হয়েছে।
  • ডেটাপ্রোক, ক্লাউড পাব/সাব, অ্যাপাচি স্পার্ক এবং পাইথনের প্রাথমিক জ্ঞান।

আপনি শুরু করার আগে

প্রয়োজনীয় API গুলি সক্ষম করতে টার্মিনালে নিম্নলিখিত কমান্ডটি চালান:

gcloud services enable \
    dataproc.googleapis.com \
    pubsub.googleapis.com \

2. ক্লাউড পাব/সাব সেট আপ করুন৷

একটি বিষয় তৈরি করুন

এই বিষয় ডাটাবেস পরিবর্তন প্রকাশ করতে ব্যবহার করা হবে. Dataproc কাজ এই বার্তাগুলির ভোক্তা হবে এবং ডেটা ক্যাপচার পরিবর্তনের জন্য বার্তাগুলি প্রক্রিয়া করবে। আপনি যদি বিষয়গুলি সম্পর্কে আরও জানতে চান তবে আপনি এখানে অফিসিয়াল ডকুমেন্টেশন পড়তে পারেন।

gcloud pubsub topics create database-changes

একটি সদস্যতা তৈরি করুন

একটি সাবস্ক্রিপশন তৈরি করুন যা পাব/সাব-এ বার্তাগুলি ব্যবহার করতে ব্যবহার করা হবে। সদস্যতা সম্পর্কে আরও জানতে, আপনি এখানে অফিসিয়াল ডকুমেন্টেশন পড়তে পারেন।

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. ডাটাবেস পরিবর্তন অনুকরণ

ধাপ

  1. ডাটাবেস পরিবর্তন অনুকরণ করতে একটি পাইথন স্ক্রিপ্ট (যেমন, simulate_cdc.py ) তৈরি করুন এবং সেগুলিকে Pub/Sub-এ প্রকাশ করুন।
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
    data_str = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data=data_str)
    print(f"Published message ID: {future.result()}")

def simulate_change():
    change_types = ["INSERT", "UPDATE", "DELETE"]
    change_type = random.choice(change_types)
    record_id = random.randint(1, 100)
    timestamp = time.time()
    change_data = {
        "change_type": change_type,
        "record_id": record_id,
        "timestamp": timestamp,
        "data": {"field1": "value1", "field2": "value2"}, #Place holder data.
    }
    publish_message(change_data)

if __name__ == "__main__":
    while True:
        simulate_change()
        time.sleep(2)  # Simulate changes every 2 seconds

আপনার প্রকৃত GCP প্রকল্প আইডি দিয়ে YOUR_PROJECT_ID প্রতিস্থাপন করুন

  1. পাব/সাব ক্লায়েন্ট লাইব্রেরি ইনস্টল করুন:
pip install google-cloud-pubsub
  1. আপনার টার্মিনালে স্ক্রিপ্ট চালান। এই স্ক্রিপ্টটি ক্রমাগত চলবে এবং প্রতি 2 সেকেন্ডে পাব/সাব বিষয়ে বার্তা প্রকাশ করবে।
python simulate_cdc.py
  1. 1 মিনিটের জন্য স্ক্রিপ্ট চালানোর পরে, আপনার কাছে পাব/সাব-এ পর্যাপ্ত বার্তা থাকবে। আপনি আপনার OS এর উপর নির্ভর করে ctrl + C বা Cmd + C টিপে চলমান পাইথন স্ক্রিপ্টটি বন্ধ করতে পারেন।
  2. প্রকাশিত বার্তা দেখুন:

অন্য টার্মিনাল খুলুন এবং প্রকাশিত বার্তাগুলি দেখতে নিম্নলিখিত কমান্ডটি চালান:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

আপনি বার্তা এবং অন্যান্য ক্ষেত্র ধারণকারী একটি টেবিল সারি দেখতে হবে:

{"change_type": "আপডেট", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

ব্যাখ্যা

  • পাইথন স্ক্রিপ্ট এলোমেলোভাবে INSERT , UPDATE , বা DELETE ইভেন্টগুলি তৈরি করে ডাটাবেস পরিবর্তনগুলিকে অনুকরণ করে৷
  • প্রতিটি পরিবর্তন একটি JSON অবজেক্ট হিসাবে উপস্থাপন করা হয় যাতে পরিবর্তনের ধরন, রেকর্ড আইডি, টাইমস্ট্যাম্প এবং ডেটা থাকে।
  • স্ক্রিপ্টটি ক্লাউড পাব/সাব ক্লায়েন্ট লাইব্রেরি ব্যবহার করে database-changes বিষয়গুলিতে এই পরিবর্তন ইভেন্টগুলি প্রকাশ করে।
  • সাবস্ক্রাইবার কমান্ড আপনাকে পাব/সাব বিষয়ে পাঠানো বার্তাগুলি দেখতে দেয়।

4. Dataproc-এর জন্য একটি পরিষেবা অ্যাকাউন্ট তৈরি করুন

এই বিভাগে, আপনি একটি পরিষেবা অ্যাকাউন্ট তৈরি করুন যা Dataproc ক্লাস্টার ব্যবহার করতে পারে। আপনি ক্লাস্টার দৃষ্টান্তগুলিকে ক্লাউড পাব/সাব এবং ডেটাপ্রোক অ্যাক্সেস করার অনুমতি দেওয়ার জন্য প্রয়োজনীয় অনুমতিগুলিও বরাদ্দ করেন।

  1. একটি পরিষেবা অ্যাকাউন্ট তৈরি করুন:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. সেবা অ্যাকাউন্টকে ক্লাস্টার তৈরি করতে এবং চাকরি চালানোর অনুমতি দিতে Dataproc কর্মী ভূমিকা যোগ করুন। নীচের কমান্ডে সদস্য হিসাবে পূর্ববর্তী কমান্ডে তৈরি পরিষেবা অ্যাকাউন্ট আইডি যুক্ত করুন:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. পরিষেবা অ্যাকাউন্টটিকে "পরিবর্তন-সাবস্ক্রাইবার" পাব/সাবস্ক্রিপশনে সদস্যতা নেওয়ার অনুমতি দিতে পাব/সাবস্ক্রাইবার ভূমিকা যুক্ত করুন:
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change-subscriber \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:<your-service-account-with-domain"

5. একটি Dataproc ক্লাস্টার তৈরি করুন

Dataproc ক্লাস্টার স্পার্ক অ্যাপ চালাবে যা Pub/sub-এ বার্তাগুলি প্রক্রিয়া করবে। আপনার পূর্ববর্তী বিভাগে তৈরি পরিষেবা অ্যাকাউন্টের প্রয়োজন হবে। Dataproc ক্লাস্টারের প্রতিটি দৃষ্টান্তে এই পরিষেবা অ্যাকাউন্টটি বরাদ্দ করে যাতে সমস্ত দৃষ্টান্ত অ্যাপটি চালানোর জন্য সঠিক অনুমতি পায়।

একটি Dataproc ক্লাস্টার তৈরি করতে নিম্নলিখিত কমান্ড ব্যবহার করুন:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. ডাটাপ্রোক ক্লাস্টারে স্পার্ক জব জমা দিন

স্পার্ক স্ট্রিমিং অ্যাপ পাব/সাব-এ ডাটাবেস পরিবর্তনের বার্তাগুলিকে প্রক্রিয়া করে এবং সেগুলিকে কনসোলে প্রিন্ট করে।

ধাপ

  1. একটি ডিরেক্টরি তৈরি করুন এবং PubsubConsumer.scala ফাইলে ভোক্তার সোর্স কোড যোগ করুন
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala 
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

  def createContext(projectID: String, checkpointDirectory: String)
    : StreamingContext = {

    // [START stream_setup]
    val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Set the checkpoint directory
    val yarnTags = sparkConf.get("spark.yarn.tags")
    val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
    ssc.checkpoint(checkpointDirectory + '/' + jobId)
    
    // Create stream
    val messagesStream: DStream[String] = PubsubUtils
      .createStream(
        ssc,
        projectID,
        None,
        "change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
        SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
      .map(message => new String(message.getData(), StandardCharsets.UTF_8))
    // [END stream_setup]

    processStringDStream(messagesStream)
    
        ssc
  }

  def processStringDStream(stringDStream: DStream[String]): Unit = {
    stringDStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val listOfStrings: List[String] = rdd.collect().toList
        listOfStrings.foreach(str => println(s"message received: $str"))
      } else {
        println("looking for message...")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("arguments are not passed correctly!")
      System.exit(1)
    }

    val Seq(projectID, checkpointDirectory) = args.toSeq

    // Create Spark context
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(projectID, checkpointDirectory))

    // Start streaming until we receive an explicit termination
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. তৈরি করুন এবং pom.xml এ নিম্নলিখিত যোগ করুন
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. প্রকল্পের স্পার্ক ডিরেক্টরিতে পরিবর্তন করুন এবং পরে ব্যবহার করার জন্য একটি পরিবেশ ভেরিয়েবলে পাথ সংরক্ষণ করুন:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. ডিরেক্টরি পরিবর্তন করুন:
cd $REPO_ROOT/spark
  1. Java 1.8 ডাউনলোড করুন এবং ফোল্ডারটি /usr/lib/jvm/ এ রাখুন। তারপর এটি নির্দেশ করতে আপনার JAVA_HOME পরিবর্তন করুন:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. অ্যাপ্লিকেশন জার তৈরি করুন
mvn clean package

spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar আর্কাইভ যেখানে অ্যাপ্লিকেশন কোড এবং নির্ভরতা রয়েছে তা spark/target ডিরেক্টরিতে তৈরি করা হয়েছে

  1. স্পার্ক আবেদন জমা দিন:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. সক্রিয় কাজের তালিকা প্রদর্শন করুন এবং কাজের জন্য JOB_ID মান নোট করুন:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

আউটপুট দেখতে একই রকম হবে

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. আপনার ব্রাউজারে নিম্নলিখিত URLটি খোলার মাধ্যমে কাজের আউটপুট দেখুন। পূর্ববর্তী ধাপে উল্লেখিত মান দিয়ে [JOB_ID] প্রতিস্থাপন করুন।
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. আউটপুট নিম্নলিখিত অনুরূপ:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

Dataproc-এ চলমান স্পার্ক স্ট্রিমিং জব পাব/সাব থেকে বার্তা টেনে আনে, সেগুলিকে প্রসেস করে এবং কনসোলে আউটপুট প্রদর্শন করে।

  1. কাজটি বন্ধ করা: কাজটি শেষ করতে নিম্নলিখিত কমান্ডটি চালান। JOB_ID প্রতিস্থাপন আমরা আগে উল্লেখ করেছি একই সাথে
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

অভিনন্দন! আপনি এইমাত্র একটি শক্তিশালী CDC পাইপলাইন তৈরি করেছেন যা Pub/sub-এ ডাটাবেস পরিবর্তনগুলি ক্যাপচার করে এবং ক্লাউড ডেটাপ্রোকে চলমান স্পার্ক স্ট্রিমিং ব্যবহার করে সেগুলি প্রক্রিয়া করে৷

7. পরিষ্কার করুন

আপনার তৈরি করা কোনও সংস্থান পরিষ্কার করুন যাতে ভবিষ্যতে আপনাকে তাদের জন্য বিল করা হবে না। বিলিং দূর করার সবচেয়ে সহজ উপায় হল টিউটোরিয়ালের জন্য আপনার তৈরি করা প্রকল্পটি মুছে ফেলা। বিকল্পভাবে, আপনি পৃথক সম্পদ মুছে ফেলতে পারেন।

পৃথক সম্পদ মুছে ফেলার জন্য নিম্নলিখিত কমান্ড চালান

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet

8. অভিনন্দন

অভিনন্দন, আপনি এইমাত্র একটি হ্যান্ডস-অন কোডল্যাব সম্পূর্ণ করেছেন যা Google ক্লাউড প্ল্যাটফর্ম ব্যবহার করে কীভাবে একটি শক্তিশালী রিয়েল-টাইম ডেটা পাইপলাইন তৈরি করতে হয় তা প্রদর্শন করে৷ আপনি যা করেছেন তা সংক্ষিপ্ত করা যাক:

  • সিমুলেটেড চেঞ্জ ডেটা ক্যাপচার (সিডিসি): আপনি সিডিসির মৌলিক বিষয়গুলি শিখেছেন এবং ডাটাবেস পরিবর্তনগুলি অনুকরণ করতে একটি পাইথন স্ক্রিপ্ট প্রয়োগ করেছেন, ইভেন্টগুলি তৈরি করে যা রিয়েল-টাইম ডেটা পরিবর্তনের প্রতিনিধিত্ব করে।
  • লিভারেজড ক্লাউড পাব/সাব: আপনি ক্লাউড পাব/সাব বিষয় এবং সদস্যতা সেট আপ করেন, আপনার সিডিসি ইভেন্টগুলি স্ট্রিম করার জন্য একটি পরিমাপযোগ্য এবং নির্ভরযোগ্য মেসেজিং পরিষেবা প্রদান করেন। আপনি আপনার সিমুলেটেড ডাটাবেসের পরিবর্তনগুলি Pub/Sub-এ প্রকাশ করেছেন, একটি রিয়েল-টাইম ডেটা স্ট্রিম তৈরি করেছেন।
  • ডেটাপ্রোক এবং স্পার্ক সহ প্রক্রিয়াকৃত ডেটা: আপনি একটি ডেটাপ্রোক ক্লাস্টারের ব্যবস্থা করেছেন এবং আপনার পাব/সাবস্ক্রিপশন থেকে বার্তাগুলি গ্রহণ করার জন্য একটি স্পার্ক স্ট্রিমিং কাজ স্থাপন করেছেন৷ আপনি রিয়েল-টাইমে ইনকামিং CDC ইভেন্টগুলি প্রক্রিয়া করেছেন এবং রূপান্তর করেছেন, ফলাফলগুলি আপনার Dataproc কাজের লগগুলিতে প্রদর্শন করেছেন।
  • একটি এন্ড-টু-এন্ড রিয়েল-টাইম পাইপলাইন তৈরি করেছেন: আপনি একটি সম্পূর্ণ ডেটা পাইপলাইন তৈরি করতে এই পরিষেবাগুলিকে সফলভাবে সংহত করেছেন যা রিয়েল-টাইমে ডেটা পরিবর্তনগুলি ক্যাপচার করে, স্ট্রিম করে এবং প্রক্রিয়া করে৷ আপনি এমন একটি সিস্টেম তৈরি করার বাস্তব অভিজ্ঞতা অর্জন করেছেন যা ক্রমাগত ডেটা স্ট্রিমগুলি পরিচালনা করতে পারে।
  • স্পার্ক পাব/সাব সংযোগকারী ব্যবহার করেছেন: আপনি স্পার্ক পাব/সাব সংযোগকারী ব্যবহার করার জন্য সফলভাবে একটি ডেটাপ্রোক ক্লাস্টার কনফিগার করেছেন, যা পাব/সাব থেকে ডেটা পড়ার জন্য স্পার্ক স্ট্রাকচার্ড স্ট্রিমিংয়ের জন্য গুরুত্বপূর্ণ।

রিয়েল-টাইম অ্যানালিটিক্স, ডেটা গুদামজাতকরণ এবং মাইক্রোসার্ভিসেস আর্কিটেকচার সহ বিভিন্ন অ্যাপ্লিকেশনের জন্য আরও জটিল এবং পরিশীলিত ডেটা পাইপলাইন তৈরি করার জন্য আপনার কাছে এখন একটি শক্ত ভিত্তি রয়েছে। অন্বেষণ এবং বিল্ডিং রাখুন!

রেফারেন্স ডক্স