1. खास जानकारी
इस कोडलैब में, Google Cloud Platform पर Dataproc के साथ Apache Spark का इस्तेमाल करके, डेटा प्रोसेसिंग पाइपलाइन बनाने का तरीका बताया गया है. डेटा साइंस और डेटा इंजीनियरिंग में, किसी स्टोरेज लोकेशन से डेटा पढ़ने, उसमें बदलाव करने, और उसे किसी दूसरी स्टोरेज लोकेशन में लिखने का इस्तेमाल आम तौर पर किया जाता है. डेटा में आम तौर पर होने वाले बदलावों में, डेटा का कॉन्टेंट बदलना, ग़ैर-ज़रूरी जानकारी हटाना, और फ़ाइल टाइप बदलना शामिल है.
इस कोडलैब में, आपको Apache Spark के बारे में जानकारी मिलेगी. साथ ही, PySpark (Apache Spark का Python API), BigQuery, Google Cloud Storage, और Reddit के डेटा के साथ Dataproc का इस्तेमाल करके, सैंपल पाइपलाइन चलाने का तरीका भी बताया जाएगा.
2. Apache Spark के बारे में जानकारी (ज़रूरी नहीं)
वेबसाइट के मुताबिक, " Apache Spark, बड़े पैमाने पर डेटा प्रोसेस करने के लिए एक यूनिफ़ाइड ऐनलिटिक्स इंजन है." इसकी मदद से, डेटा का विश्लेषण और प्रोसेसिंग, पैरलल और इन-मेमोरी में की जा सकती है. इससे, कई अलग-अलग मशीनों और नोड पर बड़े पैमाने पर पैरलल कंप्यूटेशन किया जा सकता है. इसे मूल रूप से 2014 में, MapReduce के अपग्रेड के तौर पर रिलीज़ किया गया था. यह अब भी बड़े पैमाने पर कैलकुलेशन करने के लिए, सबसे लोकप्रिय फ़्रेमवर्क में से एक है. Apache Spark को Scala में लिखा गया है. इसके बाद, इसमें Scala, Java, Python, और R के एपीआई जोड़े गए हैं. इसमें कई लाइब्रेरी शामिल हैं. जैसे, डेटा पर SQL क्वेरी करने के लिए Spark SQL, डेटा स्ट्रीम करने के लिए Spark Streaming, मशीन लर्निंग के लिए MLlib, और ग्राफ़ प्रोसेस करने के लिए GraphX. ये सभी लाइब्रेरी, Apache Spark इंजन पर काम करती हैं.
Spark अपने-आप चल सकता है या स्केलिंग के लिए, Yarn, Mesos या Kubernetes जैसी संसाधन मैनेजमेंट सेवा का फ़ायदा ले सकता है. इस कोडलैब के लिए, Dataproc का इस्तेमाल किया जाएगा. इसमें Yarn का इस्तेमाल किया जाता है.
Spark में डेटा को मूल रूप से मेमोरी में लोड किया जाता था. इसे आरडीडी या रीज़िलिएंट डिस्ट्रिब्यूटेड डेटासेट कहा जाता है. Spark के डेवलपमेंट में, कॉलम वाले दो नए डेटा टाइप जोड़े गए हैं: टाइप किया गया डेटासेट और बिना टाइप वाला डेटाफ़्रेम. आम तौर पर, आरडीडी किसी भी तरह के डेटा के लिए बेहतर होते हैं, जबकि डेटासेट और डेटाफ़्रेम, टेबल वाले डेटा के लिए ऑप्टिमाइज़ किए जाते हैं. डेटासेट सिर्फ़ Java और Scala एपीआई के साथ उपलब्ध हैं. इसलिए, हम इस कोडलैब के लिए PySpark Dataframe API का इस्तेमाल करेंगे. ज़्यादा जानकारी के लिए, कृपया Apache Spark का दस्तावेज़ देखें.
3. इस्तेमाल का उदाहरण
डेटा इंजीनियर को अक्सर डेटा को डेटा साइंटिस्ट के लिए आसानी से ऐक्सेस करने लायक बनाना पड़ता है. हालांकि, डेटा अक्सर शुरुआत में गलत होता है. इसका मतलब है कि मौजूदा स्थिति में, इसका इस्तेमाल आंकड़ों के लिए करना मुश्किल होता है. इसलिए, इसका ज़्यादा से ज़्यादा इस्तेमाल करने से पहले, इसे ठीक करना ज़रूरी होता है. इसका एक उदाहरण, वेब से स्क्रैप किया गया डेटा है. इसमें अजीब कोडिंग या अतिरिक्त एचटीएमएल टैग हो सकते हैं.
इस लैब में, आपको Reddit पोस्ट के तौर पर BigQuery से डेटा का एक सेट, Dataproc पर होस्ट किए गए Spark क्लस्टर में लोड करना होगा. साथ ही, काम की जानकारी निकालनी होगी और प्रोसेस किए गए डेटा को Google Cloud Storage में ज़िप की गई CSV फ़ाइलों के तौर पर सेव करना होगा.
आपकी कंपनी के चीफ़ डेटा साइंटिस्ट को अपनी टीमों को, सामान्य भाषा को प्रोसेस करने से जुड़ी अलग-अलग समस्याओं पर काम करने में दिलचस्पी है. खास तौर पर, उन्हें सबरेडिट "r/food" के डेटा का विश्लेषण करना है. आपको डेटा डंप के लिए एक पाइपलाइन बनानी होगी. यह जनवरी 2017 से अगस्त 2019 तक के बैकफ़िल से शुरू होगी.
4. BigQuery Storage API की मदद से BigQuery को ऐक्सेस करना
tabledata.list API के तरीके का इस्तेमाल करके, BigQuery से डेटा खींचने में काफ़ी समय लग सकता है. साथ ही, डेटा के स्केल होने पर यह तरीका असरदार नहीं होता. यह तरीका, JSON ऑब्जेक्ट की सूची दिखाता है. साथ ही, पूरे डेटासेट को पढ़ने के लिए, एक बार में एक पेज को क्रम से पढ़ना ज़रूरी होता है.
BigQuery Storage API, आरपीसी-आधारित प्रोटोकॉल का इस्तेमाल करके, BigQuery में डेटा को ऐक्सेस करने की सुविधा को बेहतर बनाता है. यह डेटा को एक साथ पढ़ने और लिखने के साथ-साथ, Apache Avro और Apache Arrow जैसे अलग-अलग सीरियलाइज़ेशन फ़ॉर्मैट के साथ काम करता है. हाई-लेवल पर, इससे परफ़ॉर्मेंस में काफ़ी सुधार होता है. खास तौर पर, बड़े डेटा सेट पर.
इस कोडलैब में, BigQuery और Spark के बीच डेटा पढ़ने और लिखने के लिए, spark-bigquery-connector का इस्तेमाल किया जाएगा.
5. प्रोजेक्ट बनाना
console.cloud.google.com पर जाकर, Google Cloud Platform Console में साइन इन करें और नया प्रोजेक्ट बनाएं:
इसके बाद, Google Cloud के संसाधनों का इस्तेमाल करने के लिए, आपको Cloud Console में बिलिंग की सुविधा चालू करनी होगी.
इस कोडलैब को चलाने में आपको कुछ डॉलर से ज़्यादा खर्च नहीं करने पड़ेंगे. हालांकि, ज़्यादा संसाधनों का इस्तेमाल करने या उन्हें चालू रहने देने पर, खर्च ज़्यादा हो सकता है. इस कोडलैब के आखिरी सेक्शन में, आपको अपने प्रोजेक्ट को क्लीन अप करने के बारे में जानकारी मिलेगी.
Google Cloud Platform के नए उपयोगकर्ता, 300 डॉलर का क्रेडिट मुफ़्त में आज़माने की सुविधा का फ़ायदा ले सकते हैं.
6. अपना एनवायरमेंट सेट अप करना
अब आपको अपना एनवायरमेंट सेट अप करना होगा. इसके लिए:
- Compute Engine, Dataproc, और BigQuery Storage API चालू करना
- प्रोजेक्ट की सेटिंग कॉन्फ़िगर करना
- Dataproc क्लस्टर बनाना
- Google Cloud Storage बकेट बनाना
एपीआई चालू करना और अपना एनवायरमेंट कॉन्फ़िगर करना
Cloud Console के सबसे ऊपर दाएं कोने में मौजूद बटन को दबाकर, Cloud Shell खोलें.
Cloud Shell लोड होने के बाद, Compute Engine, Dataproc, और BigQuery Storage API चालू करने के लिए, ये निर्देश चलाएं:
gcloud services enable compute.googleapis.com \
dataproc.googleapis.com \
bigquerystorage.googleapis.com
अपने प्रोजेक्ट का प्रोजेक्ट आईडी सेट करें. प्रोजेक्ट का ऐक्सेस पाने के लिए, प्रोजेक्ट चुनने वाले पेज पर जाएं और अपना प्रोजेक्ट खोजें. ऐसा हो सकता है कि यह आपके प्रोजेक्ट के नाम से अलग हो.
अपना प्रोजेक्ट आईडी सेट करने के लिए, यह कमांड चलाएं:
gcloud config set project <project_id>
यहां दी गई सूची में से कोई एक देश चुनकर, अपने प्रोजेक्ट का इलाका सेट करें. इसका एक उदाहरण us-central1
हो सकता है.
gcloud config set dataproc/region <region>
अपने Dataproc क्लस्टर के लिए कोई नाम चुनें और उसके लिए एक एनवायरमेंट वैरिएबल बनाएं.
CLUSTER_NAME=<cluster_name>
Dataproc क्लस्टर बनाना
नीचे दिया गया कमांड चलाकर, Dataproc क्लस्टर बनाएं:
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--worker-machine-type n1-standard-8 \
--num-workers 8 \
--image-version 1.5-debian \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--metadata 'PIP_PACKAGES=google-cloud-storage' \
--optional-components=ANACONDA \
--enable-component-gateway
इस निर्देश को पूरा होने में कुछ मिनट लगेंगे. कमांड को अलग-अलग हिस्सों में बांटने के लिए:
इससे, आपके दिए गए नाम से Dataproc क्लस्टर बनाने की प्रोसेस शुरू हो जाएगी. beta
एपीआई का इस्तेमाल करने से, Dataproc की बीटा सुविधाएं चालू हो जाएंगी. जैसे, कॉम्पोनेंट गेटवे.
gcloud beta dataproc clusters create ${CLUSTER_NAME}
इससे, आपके कर्मचारियों के लिए मशीन का टाइप सेट हो जाएगा.
--worker-machine-type n1-standard-8
इससे आपके क्लस्टर में काम करने वाले लोगों की संख्या सेट हो जाएगी.
--num-workers 8
इससे Dataproc का इमेज वर्शन सेट हो जाएगा.
--image-version 1.5-debian
इससे क्लस्टर पर इस्तेमाल की जाने वाली शुरुआती कार्रवाइयां कॉन्फ़िगर हो जाएंगी. यहां, pip को शुरू करने की कार्रवाई शामिल की जा रही है.
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh
यह क्लस्टर में शामिल किया जाने वाला मेटाडेटा है. यहां, pip
शुरू करने की कार्रवाई के लिए मेटाडेटा दिया जा रहा है.
--metadata 'PIP_PACKAGES=google-cloud-storage'
इससे क्लस्टर पर इंस्टॉल किए जाने वाले वैकल्पिक कॉम्पोनेंट सेट हो जाएंगे.
--optional-components=ANACONDA
इससे कॉम्पोनेंट गेटवे चालू हो जाएगा. इससे आपको Zeppelin, Jupyter या Spark History जैसे सामान्य यूज़र इंटरफ़ेस (यूआई) देखने के लिए, Dataproc के कॉम्पोनेंट गेटवे का इस्तेमाल करने की सुविधा मिलेगी
--enable-component-gateway
Dataproc के बारे में ज़्यादा जानने के लिए, कृपया यह codelab देखें.
Google Cloud Storage बकेट बनाना
जॉब के आउटपुट के लिए, आपके पास Google Cloud Storage बकेट होना चाहिए. अपनी बकेट के लिए कोई यूनीक नाम तय करें और नई बकेट बनाने के लिए, यह कमांड चलाएं. सभी उपयोगकर्ताओं के लिए, Google Cloud के सभी प्रोजेक्ट में बकेट के नाम यूनीक होते हैं. इसलिए, आपको अलग-अलग नामों के साथ कुछ बार कोशिश करनी पड़ सकती है. अगर आपको ServiceException
नहीं मिलता है, तो इसका मतलब है कि बकेट बन गई है.
BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}
7. एक्सप्लोरेटरी डेटा ऐनलिसिस
डेटा को प्री-प्रोसेस करने से पहले, आपको उस डेटा के बारे में ज़्यादा जानना चाहिए जिसे आपको प्रोसेस करना है. इसके लिए, आपको डेटा एक्सप्लोरेशन के दो तरीके एक्सप्लोर करने होंगे. सबसे पहले, आपको BigQuery वेब यूज़र इंटरफ़ेस (यूआई) का इस्तेमाल करके कुछ रॉ डेटा दिखेगा. इसके बाद, PySpark और Dataproc का इस्तेमाल करके हर सबरेडिट में पोस्ट की संख्या का हिसाब लगाया जाएगा.
BigQuery के वेब यूज़र इंटरफ़ेस (यूआई) का इस्तेमाल करना
अपना डेटा देखने के लिए, BigQuery वेब यूज़र इंटरफ़ेस (यूआई) का इस्तेमाल करें. Cloud Console में मेन्यू आइकॉन पर जाकर, नीचे की ओर स्क्रोल करें. इसके बाद, BigQuery वेब यूज़र इंटरफ़ेस (यूआई) खोलने के लिए, "BigQuery" दबाएं.
इसके बाद, BigQuery वेब यूज़र इंटरफ़ेस (यूआई) के क्वेरी एडिटर में यह कमांड चलाएं. इससे, जनवरी 2017 के डेटा की 10 पूरी पंक्तियां दिखेंगी:
select * from fh-bigquery.reddit_posts.2017_01 limit 10;
पेज पर स्क्रोल करके, उपलब्ध सभी कॉलम के साथ-साथ कुछ उदाहरण देखे जा सकते हैं. खास तौर पर, आपको दो कॉलम दिखेंगे, जिनमें हर पोस्ट का टेक्स्ट कॉन्टेंट होता है: "टाइटल" और "सेल्फ़टेक्स्ट". "सेल्फ़टेक्स्ट" कॉलम में पोस्ट का मुख्य हिस्सा होता है. "created_utc" जैसे अन्य कॉलम भी देखें. यह यूटीसी टाइम है, जब कोई पोस्ट की गई थी. "subreddit" वह सबरेडिट है जिसमें पोस्ट मौजूद है.
PySpark जॉब को लागू करना
सैंपल कोड की मदद से, अपने Cloud Shell में रेपो को क्लोन करने और सही डायरेक्ट्री में जाने के लिए, ये कमांड चलाएं:
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
PySpark का इस्तेमाल करके, यह पता लगाया जा सकता है कि हर सबरेडिट के लिए कितनी पोस्ट मौजूद हैं. अगले चरण में स्क्रिप्ट cloud-dataproc/codelabs/spark-bigquery
को लागू करने से पहले, Cloud Editor खोलकर उसे पढ़ा जा सकता है:
Cloud Shell पर वापस जाने के लिए, Cloud Editor में "टर्मिनल खोलें" बटन पर क्लिक करें. इसके बाद, अपना पहला PySpark जॉब चलाने के लिए, यह कमांड चलाएं:
cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
--driver-log-levels root=FATAL \
counts_by_subreddit.py
इस कमांड की मदद से, Jobs API के ज़रिए Dataproc में जॉब सबमिट किए जा सकते हैं. यहां नौकरी के टाइप को pyspark
के तौर पर दिखाया गया है. क्लस्टर का नाम, ज़रूरी नहीं पैरामीटर , और उस फ़ाइल का नाम डाला जा सकता है जिसमें जॉब मौजूद है. यहां, पैरामीटर --jars
दिया गया है. इसकी मदद से, अपनी नौकरी में spark-bigquery-connector
को शामिल किया जा सकता है. --driver-log-levels root=FATAL
का इस्तेमाल करके, लॉग आउट लेवल भी सेट किए जा सकते हैं. इससे, गड़बड़ियों को छोड़कर सभी लॉग आउट को दबा दिया जाएगा. Spark लॉग में अक्सर ग़ैर-ज़रूरी जानकारी होती है.
इसे चलने में कुछ मिनट लग सकते हैं. इसके बाद, आपको यह आउटपुट दिखेगा:
8. Dataproc और Spark के यूज़र इंटरफ़ेस (यूआई) के बारे में जानकारी
Dataproc पर Spark जॉब चलाते समय, आपके पास अपनी जॉब / क्लस्टर की स्थिति देखने के लिए दो यूज़र इंटरफ़ेस (यूआई) का ऐक्सेस होता है. पहला, Dataproc यूज़र इंटरफ़ेस (यूआई). इसे देखने के लिए, मेन्यू आइकॉन पर क्लिक करें और नीचे की ओर स्क्रोल करके Dataproc पर जाएं. यहां, आपको मौजूदा मेमोरी के साथ-साथ, बाकी मेमोरी और वर्कर्स की संख्या भी दिखेगी.
पूरे हो चुके टास्क देखने के लिए, 'नौकरियां' टैब पर भी क्लिक किया जा सकता है. किसी खास नौकरी के आईडी पर क्लिक करके, नौकरी की जानकारी देखी जा सकती है. जैसे, उन नौकरियों के लॉग और आउटपुट.
Spark का यूज़र इंटरफ़ेस (यूआई) भी देखा जा सकता है. नौकरी के पेज पर, बैक ऐरो पर क्लिक करें. इसके बाद, वेब इंटरफ़ेस पर क्लिक करें. आपको कॉम्पोनेंट गेटवे में कई विकल्प दिखेंगे. क्लस्टर सेट अप करते समय, वैकल्पिक कॉम्पोनेंट की मदद से इनमें से कई सुविधाओं को चालू किया जा सकता है. इस लैब के लिए, "Spark History Server" पर क्लिक करें.
इससे यह विंडो खुलेगी:
यहां, पूरी हो चुकी सभी नौकरियां दिखेंगी. साथ ही, नौकरी के बारे में ज़्यादा जानकारी पाने के लिए, किसी भी application_id पर क्लिक किया जा सकता है. इसी तरह, फ़िलहाल चल रही सभी नौकरियां देखने के लिए, लैंडिंग पेज पर सबसे नीचे मौजूद "पूरे नहीं किए गए आवेदन दिखाएं" पर क्लिक करें.
9. बैकफ़िल जॉब चलाना
अब आपको एक ऐसी जॉब चलानी होगी जो डेटा को मेमोरी में लोड करती है, ज़रूरी जानकारी निकालती है, और आउटपुट को Google Cloud Storage बकेट में डालती है. आपको Reddit की हर टिप्पणी के लिए, "टाइटल", "बडी" (रॉ टेक्स्ट), और "बनाने का टाइमस्टैंप" मिलेगा. इसके बाद, आपको इस डेटा को CSV में बदलना होगा, उसे ज़िप करना होगा, और gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz के यूआरआई वाली बकेट में लोड करना होगा.
cloud-dataproc/codelabs/spark-bigquery/backfill.sh
के कोड को पढ़ने के लिए, Cloud Editor का फिर से इस्तेमाल किया जा सकता है. यह cloud-dataproc/codelabs/spark-bigquery/backfill.py
में कोड को लागू करने के लिए, एक रैपर स्क्रिप्ट है.
cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}
आपको कुछ समय बाद, प्रोसेस पूरी होने के कई मैसेज दिखेंगे. इस प्रोसेस को पूरा होने में 15 मिनट लग सकते हैं. gsutil का इस्तेमाल करके, डेटा आउटपुट की पुष्टि करने के लिए, अपनी स्टोरेज बकेट की दोबारा जांच भी की जा सकती है. सभी जॉब पूरे होने के बाद, यह कमांड चलाएं:
gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz
आपको यह आउटपुट दिखेगा:
बधाई हो, आपने Reddit पर की गई टिप्पणियों के डेटा का बैकफ़िल पूरा कर लिया है! अगर आपको इस डेटा के आधार पर मॉडल बनाने का तरीका जानना है, तो कृपया Spark-NLP कोडलैब पर जाएं.
10. साफ़-सफ़ाई सेवा
इस शुरुआती लेख को पढ़ने के बाद, अपने GCP खाते से अनचाहे शुल्क लगने से बचने के लिए:
- अपने बनाए गए और एनवायरमेंट के लिए Cloud Storage बकेट मिटाना
- Dataproc एनवायरमेंट मिटाएं.
अगर आपने सिर्फ़ इस कोडलैब के लिए कोई प्रोजेक्ट बनाया है, तो आपके पास प्रोजेक्ट को मिटाने का विकल्प भी है:
- GCP Console में, प्रोजेक्ट पेज पर जाएं.
- प्रोजेक्ट की सूची में, वह प्रोजेक्ट चुनें जिसे मिटाना है. इसके बाद, मिटाएं पर क्लिक करें.
- बॉक्स में प्रोजेक्ट आईडी टाइप करें. इसके बाद, प्रोजेक्ट मिटाने के लिए बंद करें पर क्लिक करें.
लाइसेंस
इस काम के लिए, Creative Commons Attribution 3.0 जनरिक लाइसेंस और Apache 2.0 लाइसेंस के तहत लाइसेंस मिला है.