Dataproc पर PySpark की मदद से, BigQuery डेटा को प्री-प्रोसेस करना

1. खास जानकारी

इस कोडलैब में, Google Cloud Platform पर Dataproc के साथ Apache Spark का इस्तेमाल करके, डेटा प्रोसेसिंग पाइपलाइन बनाने का तरीका बताया गया है. डेटा साइंस और डेटा इंजीनियरिंग में, किसी स्टोरेज लोकेशन से डेटा पढ़ने, उसमें बदलाव करने, और उसे किसी दूसरी स्टोरेज लोकेशन में लिखने का इस्तेमाल आम तौर पर किया जाता है. डेटा में आम तौर पर होने वाले बदलावों में, डेटा का कॉन्टेंट बदलना, ग़ैर-ज़रूरी जानकारी हटाना, और फ़ाइल टाइप बदलना शामिल है.

इस कोडलैब में, आपको Apache Spark के बारे में जानकारी मिलेगी. साथ ही, PySpark (Apache Spark का Python API), BigQuery, Google Cloud Storage, और Reddit के डेटा के साथ Dataproc का इस्तेमाल करके, सैंपल पाइपलाइन चलाने का तरीका भी बताया जाएगा.

2. Apache Spark के बारे में जानकारी (ज़रूरी नहीं)

वेबसाइट के मुताबिक, " Apache Spark, बड़े पैमाने पर डेटा प्रोसेस करने के लिए एक यूनिफ़ाइड ऐनलिटिक्स इंजन है." इसकी मदद से, डेटा का विश्लेषण और प्रोसेसिंग, पैरलल और इन-मेमोरी में की जा सकती है. इससे, कई अलग-अलग मशीनों और नोड पर बड़े पैमाने पर पैरलल कंप्यूटेशन किया जा सकता है. इसे मूल रूप से 2014 में, MapReduce के अपग्रेड के तौर पर रिलीज़ किया गया था. यह अब भी बड़े पैमाने पर कैलकुलेशन करने के लिए, सबसे लोकप्रिय फ़्रेमवर्क में से एक है. Apache Spark को Scala में लिखा गया है. इसके बाद, इसमें Scala, Java, Python, और R के एपीआई जोड़े गए हैं. इसमें कई लाइब्रेरी शामिल हैं. जैसे, डेटा पर SQL क्वेरी करने के लिए Spark SQL, डेटा स्ट्रीम करने के लिए Spark Streaming, मशीन लर्निंग के लिए MLlib, और ग्राफ़ प्रोसेस करने के लिए GraphX. ये सभी लाइब्रेरी, Apache Spark इंजन पर काम करती हैं.

32add0b6a47bafbc.png

Spark अपने-आप चल सकता है या स्केलिंग के लिए, Yarn, Mesos या Kubernetes जैसी संसाधन मैनेजमेंट सेवा का फ़ायदा ले सकता है. इस कोडलैब के लिए, Dataproc का इस्तेमाल किया जाएगा. इसमें Yarn का इस्तेमाल किया जाता है.

Spark में डेटा को मूल रूप से मेमोरी में लोड किया जाता था. इसे आरडीडी या रीज़िलिएंट डिस्ट्रिब्यूटेड डेटासेट कहा जाता है. Spark के डेवलपमेंट में, कॉलम वाले दो नए डेटा टाइप जोड़े गए हैं: टाइप किया गया डेटासेट और बिना टाइप वाला डेटाफ़्रेम. आम तौर पर, आरडीडी किसी भी तरह के डेटा के लिए बेहतर होते हैं, जबकि डेटासेट और डेटाफ़्रेम, टेबल वाले डेटा के लिए ऑप्टिमाइज़ किए जाते हैं. डेटासेट सिर्फ़ Java और Scala एपीआई के साथ उपलब्ध हैं. इसलिए, हम इस कोडलैब के लिए PySpark Dataframe API का इस्तेमाल करेंगे. ज़्यादा जानकारी के लिए, कृपया Apache Spark का दस्तावेज़ देखें.

3. इस्तेमाल का उदाहरण

डेटा इंजीनियर को अक्सर डेटा को डेटा साइंटिस्ट के लिए आसानी से ऐक्सेस करने लायक बनाना पड़ता है. हालांकि, डेटा अक्सर शुरुआत में गलत होता है. इसका मतलब है कि मौजूदा स्थिति में, इसका इस्तेमाल आंकड़ों के लिए करना मुश्किल होता है. इसलिए, इसका ज़्यादा से ज़्यादा इस्तेमाल करने से पहले, इसे ठीक करना ज़रूरी होता है. इसका एक उदाहरण, वेब से स्क्रैप किया गया डेटा है. इसमें अजीब कोडिंग या अतिरिक्त एचटीएमएल टैग हो सकते हैं.

इस लैब में, आपको Reddit पोस्ट के तौर पर BigQuery से डेटा का एक सेट, Dataproc पर होस्ट किए गए Spark क्लस्टर में लोड करना होगा. साथ ही, काम की जानकारी निकालनी होगी और प्रोसेस किए गए डेटा को Google Cloud Storage में ज़िप की गई CSV फ़ाइलों के तौर पर सेव करना होगा.

be2a4551ece63bfc.png

आपकी कंपनी के चीफ़ डेटा साइंटिस्ट को अपनी टीमों को, सामान्य भाषा को प्रोसेस करने से जुड़ी अलग-अलग समस्याओं पर काम करने में दिलचस्पी है. खास तौर पर, उन्हें सबरेडिट "r/food" के डेटा का विश्लेषण करना है. आपको डेटा डंप के लिए एक पाइपलाइन बनानी होगी. यह जनवरी 2017 से अगस्त 2019 तक के बैकफ़िल से शुरू होगी.

4. BigQuery Storage API की मदद से BigQuery को ऐक्सेस करना

tabledata.list API के तरीके का इस्तेमाल करके, BigQuery से डेटा खींचने में काफ़ी समय लग सकता है. साथ ही, डेटा के स्केल होने पर यह तरीका असरदार नहीं होता. यह तरीका, JSON ऑब्जेक्ट की सूची दिखाता है. साथ ही, पूरे डेटासेट को पढ़ने के लिए, एक बार में एक पेज को क्रम से पढ़ना ज़रूरी होता है.

BigQuery Storage API, आरपीसी-आधारित प्रोटोकॉल का इस्तेमाल करके, BigQuery में डेटा को ऐक्सेस करने की सुविधा को बेहतर बनाता है. यह डेटा को एक साथ पढ़ने और लिखने के साथ-साथ, Apache Avro और Apache Arrow जैसे अलग-अलग सीरियलाइज़ेशन फ़ॉर्मैट के साथ काम करता है. हाई-लेवल पर, इससे परफ़ॉर्मेंस में काफ़ी सुधार होता है. खास तौर पर, बड़े डेटा सेट पर.

इस कोडलैब में, BigQuery और Spark के बीच डेटा पढ़ने और लिखने के लिए, spark-bigquery-connector का इस्तेमाल किया जाएगा.

5. प्रोजेक्ट बनाना

console.cloud.google.com पर जाकर, Google Cloud Platform Console में साइन इन करें और नया प्रोजेक्ट बनाएं:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

इसके बाद, Google Cloud के संसाधनों का इस्तेमाल करने के लिए, आपको Cloud Console में बिलिंग की सुविधा चालू करनी होगी.

इस कोडलैब को चलाने में आपको कुछ डॉलर से ज़्यादा खर्च नहीं करने पड़ेंगे. हालांकि, ज़्यादा संसाधनों का इस्तेमाल करने या उन्हें चालू रहने देने पर, खर्च ज़्यादा हो सकता है. इस कोडलैब के आखिरी सेक्शन में, आपको अपने प्रोजेक्ट को क्लीन अप करने के बारे में जानकारी मिलेगी.

Google Cloud Platform के नए उपयोगकर्ता, 300 डॉलर का क्रेडिट मुफ़्त में आज़माने की सुविधा का फ़ायदा ले सकते हैं.

6. अपना एनवायरमेंट सेट अप करना

अब आपको अपना एनवायरमेंट सेट अप करना होगा. इसके लिए:

  • Compute Engine, Dataproc, और BigQuery Storage API चालू करना
  • प्रोजेक्ट की सेटिंग कॉन्फ़िगर करना
  • Dataproc क्लस्टर बनाना
  • Google Cloud Storage बकेट बनाना

एपीआई चालू करना और अपना एनवायरमेंट कॉन्फ़िगर करना

Cloud Console के सबसे ऊपर दाएं कोने में मौजूद बटन को दबाकर, Cloud Shell खोलें.

a10c47ee6ca41c54.png

Cloud Shell लोड होने के बाद, Compute Engine, Dataproc, और BigQuery Storage API चालू करने के लिए, ये निर्देश चलाएं:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

अपने प्रोजेक्ट का प्रोजेक्ट आईडी सेट करें. प्रोजेक्ट का ऐक्सेस पाने के लिए, प्रोजेक्ट चुनने वाले पेज पर जाएं और अपना प्रोजेक्ट खोजें. ऐसा हो सकता है कि यह आपके प्रोजेक्ट के नाम से अलग हो.

e682e8227aa3c781.png

76d45fb295728542.png

अपना प्रोजेक्ट आईडी सेट करने के लिए, यह कमांड चलाएं:

gcloud config set project <project_id>

यहां दी गई सूची में से कोई एक देश चुनकर, अपने प्रोजेक्ट का इलाका सेट करें. इसका एक उदाहरण us-central1 हो सकता है.

gcloud config set dataproc/region <region>

अपने Dataproc क्लस्टर के लिए कोई नाम चुनें और उसके लिए एक एनवायरमेंट वैरिएबल बनाएं.

CLUSTER_NAME=<cluster_name>

Dataproc क्लस्टर बनाना

नीचे दिया गया कमांड चलाकर, Dataproc क्लस्टर बनाएं:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

इस निर्देश को पूरा होने में कुछ मिनट लगेंगे. कमांड को अलग-अलग हिस्सों में बांटने के लिए:

इससे, आपके दिए गए नाम से Dataproc क्लस्टर बनाने की प्रोसेस शुरू हो जाएगी. beta एपीआई का इस्तेमाल करने से, Dataproc की बीटा सुविधाएं चालू हो जाएंगी. जैसे, कॉम्पोनेंट गेटवे.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

इससे, आपके कर्मचारियों के लिए मशीन का टाइप सेट हो जाएगा.

--worker-machine-type n1-standard-8

इससे आपके क्लस्टर में काम करने वाले लोगों की संख्या सेट हो जाएगी.

--num-workers 8

इससे Dataproc का इमेज वर्शन सेट हो जाएगा.

--image-version 1.5-debian

इससे क्लस्टर पर इस्तेमाल की जाने वाली शुरुआती कार्रवाइयां कॉन्फ़िगर हो जाएंगी. यहां, pip को शुरू करने की कार्रवाई शामिल की जा रही है.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

यह क्लस्टर में शामिल किया जाने वाला मेटाडेटा है. यहां, pip शुरू करने की कार्रवाई के लिए मेटाडेटा दिया जा रहा है.

--metadata 'PIP_PACKAGES=google-cloud-storage'

इससे क्लस्टर पर इंस्टॉल किए जाने वाले वैकल्पिक कॉम्पोनेंट सेट हो जाएंगे.

--optional-components=ANACONDA

इससे कॉम्पोनेंट गेटवे चालू हो जाएगा. इससे आपको Zeppelin, Jupyter या Spark History जैसे सामान्य यूज़र इंटरफ़ेस (यूआई) देखने के लिए, Dataproc के कॉम्पोनेंट गेटवे का इस्तेमाल करने की सुविधा मिलेगी

--enable-component-gateway

Dataproc के बारे में ज़्यादा जानने के लिए, कृपया यह codelab देखें.

Google Cloud Storage बकेट बनाना

जॉब के आउटपुट के लिए, आपके पास Google Cloud Storage बकेट होना चाहिए. अपनी बकेट के लिए कोई यूनीक नाम तय करें और नई बकेट बनाने के लिए, यह कमांड चलाएं. सभी उपयोगकर्ताओं के लिए, Google Cloud के सभी प्रोजेक्ट में बकेट के नाम यूनीक होते हैं. इसलिए, आपको अलग-अलग नामों के साथ कुछ बार कोशिश करनी पड़ सकती है. अगर आपको ServiceException नहीं मिलता है, तो इसका मतलब है कि बकेट बन गई है.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. एक्सप्लोरेटरी डेटा ऐनलिसिस

डेटा को प्री-प्रोसेस करने से पहले, आपको उस डेटा के बारे में ज़्यादा जानना चाहिए जिसे आपको प्रोसेस करना है. इसके लिए, आपको डेटा एक्सप्लोरेशन के दो तरीके एक्सप्लोर करने होंगे. सबसे पहले, आपको BigQuery वेब यूज़र इंटरफ़ेस (यूआई) का इस्तेमाल करके कुछ रॉ डेटा दिखेगा. इसके बाद, PySpark और Dataproc का इस्तेमाल करके हर सबरेडिट में पोस्ट की संख्या का हिसाब लगाया जाएगा.

BigQuery के वेब यूज़र इंटरफ़ेस (यूआई) का इस्तेमाल करना

अपना डेटा देखने के लिए, BigQuery वेब यूज़र इंटरफ़ेस (यूआई) का इस्तेमाल करें. Cloud Console में मेन्यू आइकॉन पर जाकर, नीचे की ओर स्क्रोल करें. इसके बाद, BigQuery वेब यूज़र इंटरफ़ेस (यूआई) खोलने के लिए, "BigQuery" दबाएं.

242a597d7045b4da.png

इसके बाद, BigQuery वेब यूज़र इंटरफ़ेस (यूआई) के क्वेरी एडिटर में यह कमांड चलाएं. इससे, जनवरी 2017 के डेटा की 10 पूरी पंक्तियां दिखेंगी:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

पेज पर स्क्रोल करके, उपलब्ध सभी कॉलम के साथ-साथ कुछ उदाहरण देखे जा सकते हैं. खास तौर पर, आपको दो कॉलम दिखेंगे, जिनमें हर पोस्ट का टेक्स्ट कॉन्टेंट होता है: "टाइटल" और "सेल्फ़टेक्स्ट". "सेल्फ़टेक्स्ट" कॉलम में पोस्ट का मुख्य हिस्सा होता है. "created_utc" जैसे अन्य कॉलम भी देखें. यह यूटीसी टाइम है, जब कोई पोस्ट की गई थी. "subreddit" वह सबरेडिट है जिसमें पोस्ट मौजूद है.

PySpark जॉब को लागू करना

सैंपल कोड की मदद से, अपने Cloud Shell में रेपो को क्लोन करने और सही डायरेक्ट्री में जाने के लिए, ये कमांड चलाएं:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

PySpark का इस्तेमाल करके, यह पता लगाया जा सकता है कि हर सबरेडिट के लिए कितनी पोस्ट मौजूद हैं. अगले चरण में स्क्रिप्ट cloud-dataproc/codelabs/spark-bigquery को लागू करने से पहले, Cloud Editor खोलकर उसे पढ़ा जा सकता है:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Cloud Shell पर वापस जाने के लिए, Cloud Editor में "टर्मिनल खोलें" बटन पर क्लिक करें. इसके बाद, अपना पहला PySpark जॉब चलाने के लिए, यह कमांड चलाएं:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

इस कमांड की मदद से, Jobs API के ज़रिए Dataproc में जॉब सबमिट किए जा सकते हैं. यहां नौकरी के टाइप को pyspark के तौर पर दिखाया गया है. क्लस्टर का नाम, ज़रूरी नहीं पैरामीटर , और उस फ़ाइल का नाम डाला जा सकता है जिसमें जॉब मौजूद है. यहां, पैरामीटर --jars दिया गया है. इसकी मदद से, अपनी नौकरी में spark-bigquery-connector को शामिल किया जा सकता है. --driver-log-levels root=FATAL का इस्तेमाल करके, लॉग आउट लेवल भी सेट किए जा सकते हैं. इससे, गड़बड़ियों को छोड़कर सभी लॉग आउट को दबा दिया जाएगा. Spark लॉग में अक्सर ग़ैर-ज़रूरी जानकारी होती है.

इसे चलने में कुछ मिनट लग सकते हैं. इसके बाद, आपको यह आउटपुट दिखेगा:

6c185228db47bb18.png

8. Dataproc और Spark के यूज़र इंटरफ़ेस (यूआई) के बारे में जानकारी

Dataproc पर Spark जॉब चलाते समय, आपके पास अपनी जॉब / क्लस्टर की स्थिति देखने के लिए दो यूज़र इंटरफ़ेस (यूआई) का ऐक्सेस होता है. पहला, Dataproc यूज़र इंटरफ़ेस (यूआई). इसे देखने के लिए, मेन्यू आइकॉन पर क्लिक करें और नीचे की ओर स्क्रोल करके Dataproc पर जाएं. यहां, आपको मौजूदा मेमोरी के साथ-साथ, बाकी मेमोरी और वर्कर्स की संख्या भी दिखेगी.

6f2987346d15c8e2.png

पूरे हो चुके टास्क देखने के लिए, 'नौकरियां' टैब पर भी क्लिक किया जा सकता है. किसी खास नौकरी के आईडी पर क्लिक करके, नौकरी की जानकारी देखी जा सकती है. जैसे, उन नौकरियों के लॉग और आउटपुट. 114d90129b0e4c88.png

1b2160f0f484594a.png

Spark का यूज़र इंटरफ़ेस (यूआई) भी देखा जा सकता है. नौकरी के पेज पर, बैक ऐरो पर क्लिक करें. इसके बाद, वेब इंटरफ़ेस पर क्लिक करें. आपको कॉम्पोनेंट गेटवे में कई विकल्प दिखेंगे. क्लस्टर सेट अप करते समय, वैकल्पिक कॉम्पोनेंट की मदद से इनमें से कई सुविधाओं को चालू किया जा सकता है. इस लैब के लिए, "Spark History Server" पर क्लिक करें.

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

इससे यह विंडो खुलेगी:

8f6786760f994fe8.png

यहां, पूरी हो चुकी सभी नौकरियां दिखेंगी. साथ ही, नौकरी के बारे में ज़्यादा जानकारी पाने के लिए, किसी भी application_id पर क्लिक किया जा सकता है. इसी तरह, फ़िलहाल चल रही सभी नौकरियां देखने के लिए, लैंडिंग पेज पर सबसे नीचे मौजूद "पूरे नहीं किए गए आवेदन दिखाएं" पर क्लिक करें.

9. बैकफ़िल जॉब चलाना

अब आपको एक ऐसी जॉब चलानी होगी जो डेटा को मेमोरी में लोड करती है, ज़रूरी जानकारी निकालती है, और आउटपुट को Google Cloud Storage बकेट में डालती है. आपको Reddit की हर टिप्पणी के लिए, "टाइटल", "बडी" (रॉ टेक्स्ट), और "बनाने का टाइमस्टैंप" मिलेगा. इसके बाद, आपको इस डेटा को CSV में बदलना होगा, उसे ज़िप करना होगा, और gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz के यूआरआई वाली बकेट में लोड करना होगा.

cloud-dataproc/codelabs/spark-bigquery/backfill.sh के कोड को पढ़ने के लिए, Cloud Editor का फिर से इस्तेमाल किया जा सकता है. यह cloud-dataproc/codelabs/spark-bigquery/backfill.py में कोड को लागू करने के लिए, एक रैपर स्क्रिप्ट है.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

आपको कुछ समय बाद, प्रोसेस पूरी होने के कई मैसेज दिखेंगे. इस प्रोसेस को पूरा होने में 15 मिनट लग सकते हैं. gsutil का इस्तेमाल करके, डेटा आउटपुट की पुष्टि करने के लिए, अपनी स्टोरेज बकेट की दोबारा जांच भी की जा सकती है. सभी जॉब पूरे होने के बाद, यह कमांड चलाएं:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

आपको यह आउटपुट दिखेगा:

a7c3c7b2e82f9fca.png

बधाई हो, आपने Reddit पर की गई टिप्पणियों के डेटा का बैकफ़िल पूरा कर लिया है! अगर आपको इस डेटा के आधार पर मॉडल बनाने का तरीका जानना है, तो कृपया Spark-NLP कोडलैब पर जाएं.

10. साफ़-सफ़ाई सेवा

इस शुरुआती लेख को पढ़ने के बाद, अपने GCP खाते से अनचाहे शुल्क लगने से बचने के लिए:

  1. अपने बनाए गए और एनवायरमेंट के लिए Cloud Storage बकेट मिटाना
  2. Dataproc एनवायरमेंट मिटाएं.

अगर आपने सिर्फ़ इस कोडलैब के लिए कोई प्रोजेक्ट बनाया है, तो आपके पास प्रोजेक्ट को मिटाने का विकल्प भी है:

  1. GCP Console में, प्रोजेक्ट पेज पर जाएं.
  2. प्रोजेक्ट की सूची में, वह प्रोजेक्ट चुनें जिसे मिटाना है. इसके बाद, मिटाएं पर क्लिक करें.
  3. बॉक्स में प्रोजेक्ट आईडी टाइप करें. इसके बाद, प्रोजेक्ट मिटाने के लिए बंद करें पर क्लिक करें.

लाइसेंस

इस काम के लिए, Creative Commons Attribution 3.0 जनरिक लाइसेंस और Apache 2.0 लाइसेंस के तहत लाइसेंस मिला है.