1. परिचय
पिछले अपडेट की तारीख: 19-06-2025
बदलाव का डेटा कैप्चर क्या है?
बदलाव का डेटा कैप्चर (सीडीसी), सॉफ़्टवेयर डिज़ाइन पैटर्न का एक सेट है. इसका इस्तेमाल, डेटाबेस में हुए बदलावों का पता लगाने और उन्हें ट्रैक करने के लिए किया जाता है. आसान शब्दों में, यह डेटा में किए गए बदलावों को कैप्चर और रिकॉर्ड करने का एक तरीका है, ताकि उन बदलावों को दूसरे सिस्टम में दोहराया जा सके.
डेटा माइग्रेशन, रीयल-टाइम डेटा वेयरहाउसिंग और Analytics, आपदा से जुड़ी रिकवरी और ज़्यादा उपलब्धता, ऑडिट और कानूनों का पालन करने जैसी कई डेटा-ड्रिवन स्थितियों में, बदलाव डेटा कैप्चर (सीडीसी) का इस्तेमाल काफ़ी फ़ायदेमंद होता है.
डेटा माइग्रेशन
सीडीसी, डेटा माइग्रेशन प्रोजेक्ट को आसान बनाता है. ऐसा, डेटा ट्रांसफ़र की सुविधा देने, डाउनटाइम को कम करने, और रुकावट को कम करने की मदद से किया जाता है.
रीयल-टाइम डेटा वेयरहाउसिंग और आंकड़े
सीडीसी यह पक्का करता है कि डेटा वेयरहाउस और विश्लेषण सिस्टम, ऑपरेशनल डेटाबेस में होने वाले नए बदलावों के साथ लगातार अपडेट होते रहें.
इससे कारोबारों को रीयल-टाइम जानकारी के आधार पर फ़ैसले लेने में मदद मिलती है.
आपदा के बाद डेटा की बहाली और ज़्यादा उपलब्धता
आपदा से जुड़ी रिकवरी के लिए, सीडीसी की मदद से डेटा को सेकंडरी डेटाबेस में रीयल-टाइम में कॉपी किया जा सकता है. किसी गड़बड़ी की स्थिति में, सीडीसी की मदद से, सेकंडरी डेटाबेस पर तुरंत फ़ॉलओवर किया जा सकता है. इससे, डेटा का नुकसान कम होता है और डाउनटाइम भी कम होता है.
ऑडिट और नियमों का पालन
सीडीसी, डेटा में हुए बदलावों का ज़्यादा जानकारी वाला ऑडिट ट्रेल उपलब्ध कराता है. यह नियमों और कानूनी शर्तों का पालन करने के लिए ज़रूरी है.
आपको क्या बनाना है
इस कोडलैब में, आपको Cloud Pub/Sub, Dataproc, Python, और Apache Spark का इस्तेमाल करके, बदलाव-डेटा-कैप्चर (सीडीसी) डेटा पाइपलाइन बनानी होगी. आपकी पाइपलाइन:
- डेटाबेस में होने वाले बदलावों को सिम्युलेट करें और उन्हें Cloud Pub/Sub पर इवेंट के तौर पर पब्लिश करें. यह एक स्केलेबल और भरोसेमंद मैसेजिंग सेवा है.
- इन इवेंट को रीयल-टाइम में प्रोसेस करने के लिए, Google Cloud की मैनेज की गई Spark और Hadoop सेवा, Dataproc का इस्तेमाल करें.
इन सेवाओं को कनेक्ट करके, एक बेहतर पाइपलाइन बनाई जा सकती है. यह पाइपलाइन, डेटा में होने वाले बदलावों को कैप्चर और प्रोसेस कर सकती है. साथ ही, रीयल-टाइम आंकड़े, डेटा वेयरहाउसिंग, और अन्य ज़रूरी ऐप्लिकेशन के लिए बुनियाद तैयार कर सकती है.
आपको क्या सीखने को मिलेगा
- बदलाव का डेटा कैप्चर करने वाली बुनियादी पाइपलाइन बनाने का तरीका
- स्ट्रीम प्रोसेसिंग के लिए Dataproc
- रीयल-टाइम मैसेजिंग के लिए Cloud Pub/Sub
- Apache Spark के बारे में बुनियादी जानकारी
इस कोडलैब में, Dataproc और Cloud Pub/Sub पर फ़ोकस किया गया है. काम के नहीं लगने वाले कॉन्सेप्ट और कोड ब्लॉक को हटा दिया जाता है. साथ ही, उन्हें कॉपी करके चिपकाया जा सकता है.
आपको इन चीज़ों की ज़रूरत होगी
- चालू GCP खाता, जिसमें प्रोजेक्ट सेट अप हो. अगर आपके पास खाता नहीं है, तो मुफ़्त में आज़माने के लिए साइन अप करें.
- gcloud सीएलआई इंस्टॉल और कॉन्फ़िगर किया गया हो.
- डेटाबेस में बदलावों को सिम्युलेट करने और Pub/Sub के साथ इंटरैक्ट करने के लिए, Python 3.7 या इसके बाद का वर्शन इंस्टॉल होना चाहिए.
- Dataproc, Cloud Pub/Sub, Apache Spark, और Python के बारे में बुनियादी जानकारी.
शुरू करने से पहले
ज़रूरी एपीआई चालू करने के लिए, टर्मिनल में यह कमांड चलाएं:
gcloud services enable \
dataproc.googleapis.com \
pubsub.googleapis.com \
2. Cloud Pub/Sub सेट अप करना
कोई विषय बनाना
इस विषय का इस्तेमाल, डेटाबेस में किए गए बदलावों को पब्लिश करने के लिए किया जाएगा. Dataproc जॉब इन मैसेज का उपभोक्ता होगा और बदलाव के डेटा को कैप्चर करने के लिए मैसेज को प्रोसेस करेगा. Topics के बारे में ज़्यादा जानने के लिए, यहां आधिकारिक दस्तावेज़ पढ़ें.
gcloud pubsub topics create database-changes
सदस्यता बनाना
ऐसी सदस्यता बनाएं जिसका इस्तेमाल, Pub/Sub में मैसेज पाने के लिए किया जाएगा. सदस्यताओं के बारे में ज़्यादा जानने के लिए, यहां आधिकारिक दस्तावेज़ पढ़ें.
gcloud pubsub subscriptions create --topic database-changes change-subscriber
3. डेटाबेस में होने वाले बदलावों को सिम्युलेट करना
चरण
- Python स्क्रिप्ट बनाएं (उदाहरण के लिए,
simulate_cdc.py
) का इस्तेमाल करके, डेटाबेस में होने वाले बदलावों को सिम्युलेट करें और उन्हें Pub/Sub पर पब्लिश करें.
from google.cloud import pubsub_v1
import json
import time
import random
project_id = "YOUR_PROJECT_ID" # Replace with your GCP project ID
topic_id = "database-changes"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
def publish_message(data):
data_str = json.dumps(data).encode("utf-8")
future = publisher.publish(topic_path, data=data_str)
print(f"Published message ID: {future.result()}")
def simulate_change():
change_types = ["INSERT", "UPDATE", "DELETE"]
change_type = random.choice(change_types)
record_id = random.randint(1, 100)
timestamp = time.time()
change_data = {
"change_type": change_type,
"record_id": record_id,
"timestamp": timestamp,
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
}
publish_message(change_data)
if __name__ == "__main__":
while True:
simulate_change()
time.sleep(2) # Simulate changes every 2 seconds
YOUR_PROJECT_ID की जगह अपना असल GCP प्रोजेक्ट आईडी डालें
- Pub/Sub क्लाइंट लाइब्रेरी इंस्टॉल करें:
pip install google-cloud-pubsub
- अपने टर्मिनल पर स्क्रिप्ट चलाएं. यह स्क्रिप्ट लगातार चलती रहेगी और हर दो सेकंड में Pub/Sub टॉपिक पर मैसेज पब्लिश करती रहेगी.
python simulate_cdc.py
- मान लें कि आपने स्क्रिप्ट को एक मिनट तक चलाया है, तो आपके पास Pub/Sub में ज़रूरत के मुताबिक मैसेज होंगे. अपने ओएस के हिसाब से, Ctrl + C या Cmd + C दबाकर, चल रही Python स्क्रिप्ट को बंद किया जा सकता है.
- पब्लिश किए गए मैसेज देखना:
पब्लिश किए गए मैसेज देखने के लिए, कोई दूसरा टर्मिनल खोलें और यह कमांड चलाएं:
gcloud pubsub subscriptions pull --auto-ack change-subscriber
आपको एक टेबल लाइन दिखेगी, जिसमें मैसेज और अन्य फ़ील्ड शामिल होंगे:
{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}
जानकारी
- Python स्क्रिप्ट,
INSERT
,UPDATE
याDELETE
इवेंट को बेतरतीब तरीके से जनरेट करके, डेटाबेस में होने वाले बदलावों को सिम्युलेट करती है. - हर बदलाव को JSON ऑब्जेक्ट के तौर पर दिखाया जाता है. इसमें बदलाव का टाइप, रिकॉर्ड आईडी, टाइमस्टैंप, और डेटा शामिल होता है.
- स्क्रिप्ट,
database-changes
विषय में इन बदलाव इवेंट को पब्लिश करने के लिए, Cloud Pub/Sub क्लाइंट लाइब्रेरी का इस्तेमाल करती है. - subscriber कमांड की मदद से, Pub/Sub विषय पर भेजे जा रहे मैसेज देखे जा सकते हैं.
4. Dataproc के लिए सेवा खाता बनाना
इस सेक्शन में, एक सेवा खाता बनाया जाता है जिसका इस्तेमाल Dataproc क्लस्टर कर सकता है. क्लस्टर इंस्टेंस को Cloud Pub/Sub और Dataproc को ऐक्सेस करने की अनुमति देने के लिए, आपको ज़रूरी अनुमतियां भी असाइन करनी होंगी.
- सेवा खाता बनाने के लिए:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
- सेवा खाते को क्लस्टर बनाने और जॉब चलाने की अनुमति देने के लिए, Dataproc वर्कर्स की भूमिका जोड़ें. पिछले निर्देश में जनरेट किए गए सेवा खाते के आईडी को, यहां दिए गए निर्देश में सदस्य के तौर पर जोड़ें:
export PROJECT=$(gcloud info --format='value(config.project)')
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/dataproc.worker \
--member="serviceAccount:<your-service-account-with-domain>"
- सेवा खाते को "change-subscriber" Pub/sub सदस्यता की सदस्यता लेने की अनुमति देने के लिए, Pub/sub सदस्य की भूमिका जोड़ें:
gcloud beta pubsub subscriptions add-iam-policy-binding \
change-subscriber \
--role roles/pubsub.subscriber \
--member="serviceAccount:<your-service-account-with-domain"
5. Dataproc क्लस्टर बनाना
Dataproc क्लस्टर, Spark ऐप्लिकेशन चलाएगा. यह ऐप्लिकेशन, Pub/sub में मौजूद मैसेज को प्रोसेस करेगा. इसके लिए, आपको पिछले सेक्शन में बनाया गया सेवा खाता चाहिए होगा. Dataproc, क्लस्टर के हर इंस्टेंस को यह सेवा खाता असाइन करता है, ताकि सभी इंस्टेंस को ऐप्लिकेशन चलाने के लिए सही अनुमतियां मिल सकें.
Dataproc क्लस्टर बनाने के लिए, इस निर्देश का इस्तेमाल करें:
gcloud dataproc clusters create cdc-dataproc-cluster \
--region=us-central1 \
--zone=us-central1-a \
--scopes=pubsub,datastore \
--image-version=1.3 \
--service-account="<your-service-account-with-domain-id>"
6. Dataproc क्लस्टर में Spark जॉब सबमिट करना
Spark स्ट्रीमिंग ऐप्लिकेशन, Pub/sub में डेटाबेस में हुए बदलावों के मैसेज को प्रोसेस करता है और उन्हें कंसोल पर प्रिंट करता है.
चरण
- डायरेक्ट्री बनाएं और PubsubConsumer.scala फ़ाइल में, कंज्यूमर का सोर्स कोड जोड़ें
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object PubsubConsumer {
def createContext(projectID: String, checkpointDirectory: String)
: StreamingContext = {
// [START stream_setup]
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)
// Create stream
val messagesStream: DStream[String] = PubsubUtils
.createStream(
ssc,
projectID,
None,
"change-subscriber", // Cloud Pub/Sub subscription for incoming database updates
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
// [END stream_setup]
processStringDStream(messagesStream)
ssc
}
def processStringDStream(stringDStream: DStream[String]): Unit = {
stringDStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val listOfStrings: List[String] = rdd.collect().toList
listOfStrings.foreach(str => println(s"message received: $str"))
} else {
println("looking for message...")
}
}
}
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("arguments are not passed correctly!")
System.exit(1)
}
val Seq(projectID, checkpointDirectory) = args.toSeq
// Create Spark context
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(projectID, checkpointDirectory))
// Start streaming until we receive an explicit termination
ssc.start()
ssc.awaitTermination()
}
}
- pom.xml बनाएं और उसमें यह कोड जोड़ें
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<jvm.options.xms>-Xms512M</jvm.options.xms>
<jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
<jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
<jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
</properties>
<groupId>dataproc-spark-demos</groupId>
<artifactId>spark-streaming-pubsub-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastore</artifactId>
<version>1.34.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-pubsub_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.11</artifactId>
<version>1.14.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>demo.PubsubConsumer</mainClass>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>repackaged.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- प्रोजेक्ट की स्पार्क डायरेक्ट्री पर जाएं और पाथ को एनवायरमेंट वैरिएबल में सेव करें, ताकि इसका इस्तेमाल बाद में किया जा सके:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
- डायरेक्ट्री बदलें:
cd $REPO_ROOT/spark
- Java 1.8 डाउनलोड करें और फ़ोल्डर को /usr/lib/jvm/ में डालें. इसके बाद, JAVA_HOME को इस पर सेट करें:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
- ऐप्लिकेशन का jar फ़ाइल बनाना
mvn clean package
spark/target
डायरेक्ट्री में, spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar संग्रह बनाया गया है. इसमें ऐप्लिकेशन कोड और डिपेंडेंसी शामिल हैं
- स्पार्क ऐप्लिकेशन सबमिट करें:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"
gcloud dataproc jobs submit spark \
--cluster cdc-dataproc-cluster \
--region us-central1 \
--async \
--jar target/$JAR \
--max-failures-per-hour 10 \
--properties $SPARK_PROPERTIES \
-- $ARGUMENTS
- चालू नौकरियों की सूची दिखाएं और नौकरी के लिए
JOB_ID
वैल्यू पर ध्यान दें:
gcloud dataproc jobs list --region=us-central1 --state-filter=active
आउटपुट कुछ ऐसा दिखेगा
JOB_ID TYPE STATUS
473ecb6d14e2483cb88a18988a5b2e56 spark RUNNING
- अपने ब्राउज़र में यह यूआरएल खोलकर, जॉब का आउटपुट देखें. [JOB_ID] को पिछले चरण में बताई गई वैल्यू से बदलें.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
- आउटपुट इस तरह का दिखता है:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
Dataproc में चल रही स्पार्क स्ट्रीमिंग जॉब, Pub/sub से मैसेज खींचती है, उन्हें प्रोसेस करती है, और कंसोल पर आउटपुट दिखाती है.
- जॉब को खत्म करना: जॉब को खत्म करने के लिए, यह कमांड चलाएं. JOB_ID को उसी से बदलें जिसे हमने पहले नोट किया था
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet
बधाई हो! आपने अभी-अभी एक बेहतर सीडीसी पाइपलाइन बनाई है, जो Pub/sub में डेटाबेस में हुए बदलावों को कैप्चर करती है और Cloud Dataproc में चल रही स्पार्क स्ट्रीमिंग का इस्तेमाल करके उन्हें प्रोसेस करती है.
7. व्यवस्थित करें
आपने जो भी संसाधन बनाए हैं उन्हें हटा दें, ताकि आने वाले समय में आपसे उनका शुल्क न लिया जाए. बिलिंग की सुविधा बंद करने का सबसे आसान तरीका यह है कि आप ट्यूटोरियल के लिए बनाया गया प्रोजेक्ट मिटा दें. इसके अलावा, अलग-अलग संसाधनों को भी मिटाया जा सकता है.
अलग-अलग संसाधनों को मिटाने के लिए, ये कमांड चलाएं
gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet
8. बधाई हो
बधाई हो, आपने एक ऐसा कोडलैब पूरा कर लिया है जिसमें Google Cloud Platform का इस्तेमाल करके, रीयल-टाइम डेटा पाइपलाइन बनाने का तरीका बताया गया है. आइए, यह देखें कि आपने क्या-क्या हासिल किया है:
- सिम्युलेटेड चेंज डेटा कैप्चर (सीडीसी): आपने सीडीसी के बुनियादी सिद्धांतों के बारे में जाना और डेटाबेस में होने वाले बदलावों को सिम्युलेट करने के लिए, Python स्क्रिप्ट लागू की. इससे, रीयल-टाइम में डेटा में होने वाले बदलावों को दिखाने वाले इवेंट जनरेट हुए.
- Cloud Pub/Sub का इस्तेमाल करना: इसमें Cloud Pub/Sub के विषय और सदस्यताएं सेट अप की जाती हैं. इससे, सीडीसी इवेंट को स्ट्रीम करने के लिए, स्केल की जा सकने वाली और भरोसेमंद मैसेज सेवा मिलती है. आपने सिम्युलेट किए गए डेटाबेस के बदलावों को Pub/Sub पर पब्लिश किया है. इससे, रीयल-टाइम डेटा स्ट्रीम बनाई गई है.
- Dataproc और Spark की मदद से प्रोसेस किया गया डेटा: आपने अपनी Pub/Sub सदस्यता से मैसेज पाने के लिए, Dataproc क्लस्टर का प्रावधान किया है और Spark Streaming जॉब को डिप्लॉय किया है. आपने आने वाले सीडीसी इवेंट को रीयल-टाइम में प्रोसेस और ट्रांसफ़ॉर्म किया है. साथ ही, नतीजों को अपने Dataproc जॉब लॉग में दिखाया है.
- पूरी तरह से रीयल-टाइम पाइपलाइन बनाई: आपने इन सेवाओं को इंटिग्रेट करके, एक पूरी डेटा पाइपलाइन बनाई है. यह पाइपलाइन, रीयल-टाइम में डेटा में होने वाले बदलावों को कैप्चर, स्ट्रीम, और प्रोसेस करती है. आपको ऐसा सिस्टम बनाने का व्यावहारिक अनुभव मिला है जो लगातार डेटा स्ट्रीम को मैनेज कर सकता है.
- Spark Pub/Sub कनेक्टर का इस्तेमाल किया गया: आपने Spark Pub/Sub कनेक्टर का इस्तेमाल करने के लिए, Dataproc क्लस्टर को कॉन्फ़िगर कर लिया है. यह कनेक्टर, Spark स्ट्रक्चर्ड स्ट्रीमिंग के लिए ज़रूरी है, ताकि वह Pub/Sub से डेटा पढ़ सके.
अब आपके पास अलग-अलग ऐप्लिकेशन के लिए, ज़्यादा जटिल और बेहतर डेटा पाइपलाइन बनाने का एक मज़बूत आधार है. इन ऐप्लिकेशन में, रियल-टाइम आंकड़े, डेटा वेयरहाउसिंग, और माइक्रोसर्विस आर्किटेक्चर शामिल हैं. एक्सप्लोर करते रहें और अपने चैनल को बेहतर बनाते रहें!