1. परिचय
Google क्लाउड डेटाफ़्लो
पिछली बार अपडेट किए जाने की तारीख: 2023 से 5 जुलाई
Dataflow क्या है?
Dataflow एक ऐसी मैनेज की गई सेवा है जिसकी मदद से, डेटा प्रोसेसिंग के कई तरह के पैटर्न लागू किए जा सकते हैं. इस साइट पर मौजूद दस्तावेज़ में आपको Dataflow का इस्तेमाल करके, बैच और स्ट्रीमिंग डेटा प्रोसेसिंग पाइपलाइन को डिप्लॉय करने का तरीका बताया गया है. इसमें, सेवा सुविधाओं का इस्तेमाल करने के निर्देश भी दिए गए हैं.
Apache बीम SDK, एक ओपन सोर्स प्रोग्रामिंग मॉडल है. इसकी मदद से, बैच और स्ट्रीमिंग पाइपलाइन, दोनों डेवलप किए जा सकते हैं. Apache बीम प्रोग्राम से अपनी पाइपलाइन बनाई जाती है और फिर उन्हें Dataflow सेवा पर चलाया जाता है. Apache बीम दस्तावेज़, Apache बीम प्रोग्रामिंग मॉडल, SDK टूल, और अन्य रनर के बारे में गहराई से सैद्धांतिक जानकारी और रेफ़रंस कॉन्टेंट देता है.
तेज़ी से डेटा के आंकड़े स्ट्रीम करना
Dataflow की मदद से, स्ट्रीमिंग डेटा पाइपलाइन को तेज़ और आसान तरीके से डेवलप किया जा सकता है. ऐसा करने पर डेटा इंतज़ार का समय कम होता है.
ऑपरेशन और मैनेजमेंट को आसान बनाना
टीम को सर्वर क्लस्टर को मैनेज करने के बजाय, प्रोग्रामिंग पर फ़ोकस करने दें. ऐसा इसलिए, क्योंकि Dataflow का बिना सर्वर वाला तरीका, डेटा इंजीनियरिंग के वर्कलोड से ऑपरेशनल ओवरहेड को हटा देता है.
मालिकाना हक की कुल लागत कम करना
संसाधन ऑटो स्केलिंग के साथ लागत के हिसाब से बैच प्रोसेसिंग की सुविधा जोड़ी गई है. इसका मतलब है कि Dataflow बिना किसी सीमा के, सीज़न के हिसाब से और ज़्यादा खर्च वाले वर्कलोड को मैनेज करने के लिए, वर्चुअल तौर पर अनगिनत क्षमता देता है.
मुख्य सुविधाएं
अपने-आप काम करने वाले संसाधन का मैनेजमेंट और काम के डाइनैमिक तरीके के बीच संतुलन
Dataflow प्रोसेस करने के दौरान संसाधनों को अपने-आप प्रावधान और मैनेज करने की सुविधा देता है, ताकि इंतज़ार का समय कम किया जा सके और ज़्यादा से ज़्यादा इस्तेमाल किया जा सके. इससे, आपको इंस्टेंस को स्पिन अप करने या उन्हें खुद सुरक्षित करने की ज़रूरत नहीं पड़ती. काम के बंटवारे की सुविधा अपने-आप काम करती है और इसे इस तरह ऑप्टिमाइज़ किया जाता है कि ज़रूरत के समय होने वाले काम के बीच डाइनैमिक तरीके से दोबारा संतुलन न बने. "हॉट की" खोजने की ज़रूरत नहीं है या अपने इनपुट डेटा को प्रीप्रोसेस करें.
हॉरिज़ॉन्टल ऑटो स्केलिंग
बेहतर सीटीआर के लिए, कर्मचारियों के संसाधनों की हॉरिज़ॉन्टल ऑटो स्केलिंग से कीमत के साथ-साथ परफ़ॉर्मेंस बेहतर होती है.
बैच प्रोसेसिंग के लिए, ज़रूरत के हिसाब से संसाधन शेड्यूलिंग की कीमत
नौकरी के शेड्यूल को आसानी से प्रोसेस करने के लिए, जैसे कि रात भर की नौकरियां, फ़्लेक्सिबल रिसॉर्स शेड्यूलिंग (FlexRS) से बैच प्रोसेसिंग के लिए कम कीमत मिलती है. इन सुविधाजनक जॉब को इस गारंटी के साथ एक सूची में रखा जाता है कि इन्हें छह घंटे की समयसीमा में पूरा करने के लिए मिल जाएगा.
इसके तहत क्या चलाया जाएगा
JupyterLab नोटबुक के साथ Apache बीम इंटरैक्टिव रनर का इस्तेमाल करने पर, आपको बार-बार पाइपलाइन डेवलप करने, पाइपलाइन ग्राफ़ की जांच करने, और Read-eval-print-loop (REPL) वर्कफ़्लो में अलग-अलग PCollections पार्स करने की सुविधा मिलती है. ये Apache बीम नोटबुक, Vertex AI Workbench की मदद से उपलब्ध कराई गई हैं. यह मैनेज की जा रही ऐसी सेवा है जो नोटबुक वर्चुअल मशीनों को होस्ट करती है. इन मशीनों में, सबसे नए डेटा साइंस और मशीन लर्निंग फ़्रेमवर्क का इस्तेमाल किया जाता है.
यह कोडलैब, Apache बीम नोटबुक के फ़ंक्शन पर फ़ोकस करता है.
आपको इनके बारे में जानकारी मिलेगी
- नोटबुक इंस्टेंस बनाने का तरीका
- एक बुनियादी पाइपलाइन बनाना
- अनबाउंड सोर्स से डेटा पढ़ा जा रहा है
- डेटा को विज़ुअलाइज़ करना
- नोटबुक से Dataflow जॉब लॉन्च करना
- नोटबुक सेव करना
आपको इन चीज़ों की ज़रूरत होगी
- एक Google Cloud Platform प्रोजेक्ट जिसमें बिलिंग की सुविधा चालू हो.
- Google Cloud Dataflow और Google Cloud PubSub चालू किए गए.
2. सेट अप किया जा रहा है
- Cloud Console में, प्रोजेक्ट सिलेक्टर पेज पर, कोई Cloud प्रोजेक्ट चुनें या बनाएं.
पक्का करें कि आपने इन एपीआई को चालू किया हो:
- Dataflow एपीआई
- क्लाउड Pub/Sub एपीआई
- Compute Engine
- Notebooks API
इसकी पुष्टि करने के लिए, एपीआई की & सेवाओं वाला पेज.
इस गाइड में, हम Pub/Sub की सदस्यता से मिले डेटा को पढ़ेंगे. इसलिए, पक्का करें कि Compute Engine के डिफ़ॉल्ट सेवा खाते के पास एडिटर की भूमिका हो या उसे Pub/Sub एडिटर की भूमिका अनुमति दें.
3. Apache बीम नोटबुक के साथ शुरुआत करना
Apache बीम नोटबुक के इंस्टेंस लॉन्च करना
- कंसोल पर Dataflow लॉन्च करें:
- बाईं ओर मौजूद मेन्यू का इस्तेमाल करके, Workbench पेज चुनें.
- पक्का करें कि आप उपयोगकर्ता की ओर से मैनेज की जाने वाली नोटबुक टैब पर हैं.
- टूलबार में, नई नोटबुक पर क्लिक करें.
- Apache बीम चुनें > जीपीयू के बिना.
- नई नोटबुक पेज पर, notebook वीएम के लिए सबनेटवर्क चुनें और बनाएं पर क्लिक करें.
- लिंक चालू होने पर, JupyterLab खोलें पर क्लिक करें. Vertex AI Workbench, एक नया Apache बीम नोटबुक इंस्टेंस बनाता है.
4. पाइपलाइन बनाई जा रही है
नोटबुक इंस्टेंस बनाना
फ़ाइल > नया > Notebook खोलें और Apache बीम 2.47 या उसके बाद के वर्शन वाला कर्नेल चुनें.
अपनी नोटबुक में कोड जोड़ना शुरू करें
- हर सेक्शन के कोड को कॉपी करके, अपनी नोटबुक की नई सेल में चिपकाएं
- सेल चलाएं
JupyterLab नोटबुक के साथ Apache बीम इंटरैक्टिव रनर का इस्तेमाल करने पर, आपको बार-बार पाइपलाइन डेवलप करने, पाइपलाइन ग्राफ़ की जांच करने, और Read-eval-print-loop (REPL) वर्कफ़्लो में अलग-अलग PCollections पार्स करने की सुविधा मिलती है.
Apache बीम आपके notebook के इंस्टेंस पर इंस्टॉल होता है. इसलिए, अपनी नोटबुक में interactive_runner
और interactive_beam
मॉड्यूल शामिल करें.
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
अगर आपकी नोटबुक Google की अन्य सेवाओं का इस्तेमाल करती है, तो ये इंपोर्ट स्टेटमेंट जोड़ें:
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
बातचीत के विकल्प सेट करना
इससे डेटा कैप्चर करने की अवधि 60 सेकंड पर सेट हो जाती है. अगर आपको तेज़ी से दोहराना है, तो इसे कम अवधि पर सेट करें, जैसे कि ‘10 सेकंड’.
ib.options.recording_duration = '60s'
अन्य इंटरैक्टिव विकल्पों के लिए, interactive_BM.options क्लास देखें.
InteractiveRunner
ऑब्जेक्ट का इस्तेमाल करके, पाइपलाइन शुरू करें.
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(InteractiveRunner(), options=options)
डेटा को पढ़ना और विज़ुअलाइज़ करना
नीचे दिए गए उदाहरण में, Apache बीम पाइपलाइन दिखाई गई है. यह दिए गए Pub/Sub विषय के लिए सदस्यता बनाती है और सदस्यता से जुड़ी जानकारी को पढ़ती है.
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
पाइपलाइन, सोर्स से विंडो के हिसाब से शब्दों की गिनती करती है. यह एक तय विंडो बनाता है, जिसमें हर विंडो 10 सेकंड की होती है.
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
डेटा विंडो होने के बाद, शब्दों की गिनती विंडो के हिसाब से की जाती है.
windowed_word_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
डेटा को विज़ुअलाइज़ करना
show()
तरीका, notebook में मिलने वाले PCollection को दिखाता है.
ib.show(windowed_word_counts, include_window_info=True)
अपने डेटा के विज़ुअलाइज़ेशन दिखाने के लिए, show()
तरीके में visualize_data=True
को पास करें. कोई नया सेल जोड़ें:
ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)
अपने विज़ुअलाइज़ेशन पर एक से ज़्यादा फ़िल्टर लागू किए जा सकते हैं. यहां दिए गए विज़ुअलाइज़ेशन की मदद से, लेबल और ऐक्सिस के हिसाब से डेटा को फ़िल्टर किया जा सकता है:
5. Pandas DataFrame का इस्तेमाल करना
Apache बीम नोटबुक में Pandas DataFrame एक और उपयोगी विज़ुअलाइज़ेशन है. नीचे दिए गए उदाहरण में, शब्दों को लोअरकेस में बदला गया है. इसके बाद, हर शब्द की फ़्रीक्वेंसी को कैलकुलेट किया गया है.
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
collect()
तरीका, Pandas DataFrame में आउटपुट देता है.
ib.collect(windowed_lower_word_counts, include_window_info=True)
6. (ज़रूरी नहीं) अपनी notebook से Dataflow जॉब लॉन्च करना
- Dataflow पर जॉब चलाने के लिए, आपको अतिरिक्त अनुमतियों की ज़रूरत है. पक्का करें कि Compute Engine के डिफ़ॉल्ट सेवा खाते में एडिटर की भूमिका हो या इसे यहां दी गई आईएएम भूमिकाएं दे दें:
- Dataflow का एडमिन
- Dataflow वर्कर
- स्टोरेज एडमिन, और
- सेवा खाते का उपयोगकर्ता (roles/iam.serviceAccountUser)
दस्तावेज़ में, भूमिकाओं के बारे में ज़्यादा जानें.
- (ज़रूरी नहीं) Dataflow जॉब चलाने के लिए अपनी notebook का इस्तेमाल करने से पहले, कर्नेल को रीस्टार्ट करें, सभी सेल फिर से चलाएं, और आउटपुट की पुष्टि करें.
- नीचे दिए गए इंपोर्ट स्टेटमेंट हटाएं:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
- यह इंपोर्ट स्टेटमेंट जोड़ें:
from apache_beam.runners import DataflowRunner
- रिकॉर्डिंग की अवधि के इन विकल्पों को हटाएं:
ib.options.recording_duration = '60s'
- अपने पाइपलाइन विकल्पों में इन्हें जोड़ें. आपको Cloud Storage की जगह की जानकारी में बदलाव करना होगा, ताकि पहले से मौजूद बकेट का इस्तेमाल किया जा सके. इसके अलावा, इस काम के लिए नया बकेट भी बनाया जा सकता है. क्षेत्र की वैल्यू को
us-central1
से भी बदला जा सकता है.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
beam.Pipeline()
के कंस्ट्रक्टर में,InteractiveRunner
कोDataflowRunner
से बदलें.p
पाइपलाइन ऑब्जेक्ट है, जिससे आपकी पाइपलाइन बनाई जा सकती है.
p = beam.Pipeline(DataflowRunner(), options=options)
- अपने कोड से इंटरैक्टिव कॉल हटाएं. उदाहरण के लिए, अपने कोड से
show()
,collect()
,head()
,show_graph()
, औरwatch()
हटा दें. - कोई भी परिणाम देखने के लिए आपको एक सिंक जोड़ना होगा. पिछले सेक्शन में, हम notebook में नतीजों को विज़ुअलाइज़ कर रहे थे. हालांकि, इस बार हम इस notebook के बाहर डेटा को भेज रहे हैं - Dataflow में. इसलिए, हमें खोज के नतीजों के लिए किसी बाहरी जगह की ज़रूरत होती है. इस उदाहरण में, हम GCS (Google Cloud Storage) की टेक्स्ट फ़ाइलों में नतीजे लिखेंगे. यह स्ट्रीमिंग पाइपलाइन है, जिसमें डेटा विंडो है. इसलिए, हम हर विंडो के लिए एक टेक्स्ट फ़ाइल बनाना चाहेंगे. इसके लिए, अपनी पाइपलाइन में ये चरण जोड़ें:
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
- अपने पाइपलाइन कोड के आखिर में
p.run()
जोड़ें. - अब अपने नोटबुक कोड की समीक्षा करके, पुष्टि करें कि आपने सभी बदलाव शामिल कर लिए हैं. यह कुछ ऐसा दिखना चाहिए:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
p.run()
- सेल चलाएं.
- आपको इससे मिलता-जुलता आउटपुट दिखेगा:
<DataflowPipelineResult <Job
clientRequestId: '20230623100011457336-8998'
createTime: '2023-06-23T10:00:33.447347Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2023-06-23_03_00_33-11346237320103246437'
location: 'us-central1'
name: 'beamapp-root-0623075553-503897-boh4u4wb'
projectId: 'your-project-id'
stageStates: []
startTime: '2023-06-23T10:00:33.447347Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
- यह पुष्टि करने के लिए कि जॉब चल रहा है या नहीं, Dataflow के लिए नौकरियां पेज पर जाएं. आपको सूची में एक नई नौकरी दिखेगी. डेटा प्रोसेस होने में करीब 5 से 10 मिनट लगेंगे.
- डेटा प्रोसेस होने के बाद, Cloud Storage पर जाएं और उस डायरेक्ट्री पर जाएं जहां Dataflow में नतीजे सेव किए जा रहे हों (आपका तय किया गया
output_gcs_location
). आपको हर विंडो में एक फ़ाइल के साथ, टेक्स्ट फ़ाइलों की एक सूची दिखेगी. - फ़ाइल डाउनलोड करें और कॉन्टेंट की जांच करें. इसमें शब्दों की संख्या के साथ उनकी संख्या भी होनी चाहिए. इसके अलावा, फ़ाइलों की जांच करने के लिए कमांड-लाइन इंटरफ़ेस का इस्तेमाल करें. अपनी नोटबुक में नए सेल में नीचे दिया गया काम करके, ऐसा किया जा सकता है:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
- आपको इससे मिलता-जुलता आउटपुट दिखेगा:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- हो गया! आपने जो काम बनाया है उसे खाली करना और रोकना न भूलें (इस कोडलैब का आखिरी चरण देखें).
किसी इंटरैक्टिव नोटबुक पर यह कन्वर्ज़न कैसे करते हैं, इसका उदाहरण देखने के लिए, अपने notebook के इंस्टेंस में Dataflow की वर्ड काउंट नोटबुक देखें.
इसके अलावा, अपनी नोटबुक को एक्ज़ीक्यूटेबल स्क्रिप्ट के तौर पर एक्सपोर्ट किया जा सकता है और पिछले चरणों का इस्तेमाल करके जनरेट की गई .py फ़ाइल में बदलाव किया जा सकता है. इसके बाद, Dataflow सेवा पर अपनी पाइपलाइन डिप्लॉय की जा सकती है.
7. आपकी नोटबुक सेव की जा रही है
आपने जो नोटबुक बनाई हैं वे आपके मौजूदा notebook के इंस्टेंस में सेव होती हैं. अगर डेवलपमेंट के दौरान, notebook के इंस्टेंस को रीसेट या शट डाउन किया जाता है, तो नई नोटबुक तब तक सेव रहती हैं, जब तक उन्हें /home/jupyter
डायरेक्ट्री में बनाया जाता है. हालांकि, अगर किसी नोटबुक इंस्टेंस को मिटाया जाता है, तो वे नोटबुक भी मिट जाती हैं.
अपनी नोटबुक को आने वाले समय में इस्तेमाल करने के लिए, उन्हें स्थानीय तौर पर अपने वर्कस्टेशन में डाउनलोड करें. इसके बाद, उन्हें GitHub में सेव करें या किसी दूसरे फ़ाइल फ़ॉर्मैट में एक्सपोर्ट करें.
8. साफ़ किया जा रहा है
अपने Apache बीम नोटबुक इंस्टेंस का इस्तेमाल करने के बाद, Google Cloud पर बनाए गए संसाधनों को खाली करें. इसके लिए, नोटबुक के इंस्टेंस को बंद करें और अगर आपने कोई स्ट्रीमिंग जॉब चलाया है, तो स्ट्रीमिंग जॉब को बंद करें.
इसके अलावा, अगर आपने सिर्फ़ इस कोडलैब के लिए कोई प्रोजेक्ट बनाया है, तो आपके पास पूरी तरह से प्रोजेक्ट बंद करने का भी विकल्प है.