1. खास जानकारी
इस लैब में, Cloud Dataproc पर Apache Spark और Jupyter Notebooks को सेट अप करने और इस्तेमाल करने का तरीका बताया जाएगा.
Jupyter Notebook का इस्तेमाल, खोजी डेटा विश्लेषण और मशीन लर्निंग मॉडल बनाने के लिए बड़े पैमाने पर किया जाता है. ऐसा इसलिए, क्योंकि ये आपको इंटरैक्टिव तरीके से कोड चलाने और तुरंत नतीजे देखने की सुविधा देते हैं.
हालांकि, Apache Spark और Jupyter Notebooks को सेट अप और इस्तेमाल करना मुश्किल हो सकता है.

Cloud Dataproc की मदद से, यह काम आसानी से और तेज़ी से किया जा सकता है. इसकी मदद से, Apache Spark, Jupyter कॉम्पोनेंट, और कॉम्पोनेंट गेटवे के साथ Dataproc क्लस्टर को करीब 90 सेकंड में बनाया जा सकता है.
आपको क्या सीखने को मिलेगा
इस कोडलैब में, आपको इनके बारे में जानकारी मिलेगी:
- अपने क्लस्टर के लिए Google Cloud Storage बकेट बनाना
- Jupyter और कॉम्पोनेंट गेटवे के साथ Dataproc क्लस्टर बनाएं,
- Dataproc पर JupyterLab के वेब यूज़र इंटरफ़ेस (यूआई) को ऐक्सेस करना
- Spark BigQuery Storage connector का इस्तेमाल करके नोटबुक बनाना
- स्पार्क जॉब चलाना और नतीजों को प्लॉट करना.
Google Cloud पर इस लैब को चलाने की कुल लागत करीब 1 डॉलर है. Cloud Dataproc की कीमत के बारे में पूरी जानकारी यहां देखी जा सकती है.
2. प्रोजेक्ट बनाना
console.cloud.google.com पर जाकर, Google Cloud Platform Console में साइन इन करें और एक नया प्रोजेक्ट बनाएं:



इसके बाद, Google Cloud संसाधनों का इस्तेमाल करने के लिए, आपको Cloud Console में बिलिंग चालू करनी होगी.
इस कोडलैब को पूरा करने में आपको कुछ डॉलर से ज़्यादा खर्च नहीं करने पड़ेंगे. हालांकि, अगर आपने ज़्यादा संसाधनों का इस्तेमाल करने का फ़ैसला किया है या उन्हें चालू रखा है, तो यह ज़्यादा हो सकता है. इस कोडलैब के आखिरी सेक्शन में, आपको अपने प्रोजेक्ट को क्लीन अप करने के बारे में जानकारी मिलेगी.
Google Cloud Platform के नए उपयोगकर्ताओं को, मुफ़्त में आज़माने के लिए 300 डॉलर का क्रेडिट मिलता है.
3. अपना एनवायरमेंट सेट अप करना
सबसे पहले, क्लाउड कंसोल के सबसे ऊपर दाएं कोने में मौजूद बटन पर क्लिक करके, Cloud Shell खोलें:

Cloud Shell लोड होने के बाद, पिछले चरण से प्रोजेक्ट आईडी सेट करने के लिए, यह निर्देश चलाएं**:**
gcloud config set project <project_id>
प्रोजेक्ट आईडी को Cloud Console में सबसे ऊपर बाईं ओर मौजूद अपने प्रोजेक्ट पर क्लिक करके भी देखा जा सकता है:


इसके बाद, Dataproc, Compute Engine, और BigQuery Storage API चालू करें.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
इसके अलावा, यह काम Cloud Console में भी किया जा सकता है. स्क्रीन पर सबसे ऊपर बाईं ओर मौजूद, मेन्यू आइकॉन पर क्लिक करें.

ड्रॉप-डाउन से API Manager चुनें.

एपीआई और सेवाएं चालू करें पर क्लिक करें.

इन एपीआई को खोजें और चालू करें:
- Compute Engine API
- Dataproc API
- BigQuery API
- BigQuery स्टोरेज एपीआई
4. GCS बकेट बनाना
अपने डेटा के सबसे नज़दीकी इलाके में Google Cloud Storage बकेट बनाएं और उसे कोई यूनीक नाम दें.
इसका इस्तेमाल Dataproc क्लस्टर के लिए किया जाएगा.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
आपको यह आउटपुट दिखेगा
Creating gs://<your-bucket-name>/...
5. Jupyter और कॉम्पोनेंट गेटवे की मदद से, Dataproc क्लस्टर बनाना
आपका क्लस्टर बनाया जा रहा है
अपने क्लस्टर के लिए एनवायरमेंट वैरिएबल सेट करना
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
इसके बाद, अपने क्लस्टर पर Jupyter का इस्तेमाल करने के लिए, सभी ज़रूरी कॉम्पोनेंट के साथ क्लस्टर बनाने के लिए, यह gcloud कमांड चलाएं.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
क्लस्टर बनाते समय, आपको यह आउटपुट दिखेगा
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
आपका क्लस्टर बनने में करीब 90 सेकंड लगेंगे. इसके बाद, Dataproc Cloud Console के यूज़र इंटरफ़ेस (यूआई) से अपने क्लस्टर को ऐक्सेस किया जा सकेगा.
जब तक यह प्रोसेस पूरी नहीं हो जाती, तब तक gcloud कमांड में इस्तेमाल किए गए फ़्लैग के बारे में ज़्यादा जानने के लिए, यहां दी गई जानकारी पढ़ें.
क्लस्टर बन जाने के बाद, आपको यह आउटपुट दिखेगा:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
gcloud dataproc create कमांड में इस्तेमाल किए गए फ़्लैग
gcloud dataproc create कमांड में इस्तेमाल किए गए फ़्लैग की जानकारी यहां दी गई है
--region=${REGION}
इससे उस क्षेत्र और ज़ोन के बारे में पता चलता है जहां क्लस्टर बनाया जाएगा. यहां उन देशों/इलाकों की सूची दी गई है जहां यह सुविधा उपलब्ध है.
--image-version=1.4
आपके क्लस्टर में इस्तेमाल करने के लिए इमेज का वर्शन. यहां उपलब्ध वर्शन की सूची देखी जा सकती है.
--bucket=${BUCKET_NAME}
उस Google Cloud Storage बकेट के बारे में बताएं जिसे आपने क्लस्टर के लिए इस्तेमाल करने के लिए पहले बनाया था. अगर आपने GCS बकेट नहीं दिया है, तो आपके लिए एक बकेट बनाया जाएगा.
आपकी नोटबुक भी यहीं सेव की जाएंगी. भले ही, आपने अपने क्लस्टर को मिटा दिया हो, क्योंकि GCS बकेट नहीं मिटती है.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
Dataproc क्लस्टर के लिए इस्तेमाल किए जाने वाले मशीन टाइप. उपलब्ध मशीन टाइप की सूची यहां देखी जा सकती है.
अगर आपने –num-workers फ़्लैग सेट नहीं किया है, तो डिफ़ॉल्ट रूप से एक मास्टर नोड और दो वर्कर नोड बनाए जाते हैं
--optional-components=ANACONDA,JUPYTER
ज़रूरी नहीं कॉम्पोनेंट के लिए ये वैल्यू सेट करने से, आपके क्लस्टर पर Jupyter और Anaconda (जो Jupyter नोटबुक के लिए ज़रूरी है) के लिए सभी ज़रूरी लाइब्रेरी इंस्टॉल हो जाएंगी.
--enable-component-gateway
कॉम्पोनेंट गेटवे को चालू करने से, Apache Knox और Inverting Proxy का इस्तेमाल करके App Engine लिंक बनता है. इससे Jupyter और JupyterLab के वेब इंटरफ़ेस को आसानी से, सुरक्षित तरीके से, और पुष्टि किए गए तरीके से ऐक्सेस किया जा सकता है. इसका मतलब है कि अब आपको एसएसएच टनल बनाने की ज़रूरत नहीं है.
यह क्लस्टर पर मौजूद अन्य टूल के लिए भी लिंक बनाएगा. इनमें Yarn Resource Manager और Spark History Server शामिल हैं. ये टूल, आपके जॉब की परफ़ॉर्मेंस और क्लस्टर के इस्तेमाल के पैटर्न देखने के लिए काम के होते हैं.
6. Apache Spark नोटबुक बनाना
JupyterLab के वेब इंटरफ़ेस को ऐक्सेस करना
क्लस्टर तैयार होने के बाद, JupyterLab के वेब इंटरफ़ेस पर जाने के लिए कॉम्पोनेंट गेटवे का लिंक ढूंढें. इसके लिए, Dataproc क्लस्टर - Cloud Console पर जाएं. इसके बाद, बनाए गए क्लस्टर पर क्लिक करें और वेब इंटरफ़ेस टैब पर जाएं.

आपको दिखेगा कि आपके पास Jupyter का ऐक्सेस है. यह क्लासिक नोटबुक इंटरफ़ेस है. इसके अलावा, आपके पास JupyterLab का ऐक्सेस भी है. इसे Project Jupyter के लिए अगली जनरेशन का यूज़र इंटरफ़ेस (यूआई) बताया गया है.
JupyterLab में यूज़र इंटरफ़ेस (यूआई) की कई नई सुविधाएं उपलब्ध हैं. इसलिए, अगर आपको नोटबुक का इस्तेमाल करना नहीं आता या आपको नए सुधारों के बारे में जानना है, तो हमारा सुझाव है कि JupyterLab का इस्तेमाल करें. आधिकारिक दस्तावेज़ों के मुताबिक, यह क्लासिक Jupyter इंटरफ़ेस की जगह ले लेगा.
Python 3 कर्नेल वाली नोटबुक बनाना

लॉन्चर टैब में, Python 3 नोटबुक आइकॉन पर क्लिक करें. इससे, Python 3 कर्नल (PySpark कर्नल नहीं) वाली नोटबुक बनाई जा सकती है. इसकी मदद से, नोटबुक में SparkSession को कॉन्फ़िगर किया जा सकता है. साथ ही, BigQuery Storage API का इस्तेमाल करने के लिए ज़रूरी spark-bigquery-connector को शामिल किया जा सकता है.
नोटबुक का नाम बदलना

बाईं ओर मौजूद साइडबार या सबसे ऊपर मौजूद नेविगेशन में, नोटबुक के नाम पर राइट क्लिक करें. इसके बाद, नोटबुक का नाम बदलकर "BigQuery Storage & Spark DataFrames.ipynb" करें
नोटबुक में Spark कोड चलाना

इस नोटबुक में, spark-bigquery-connector का इस्तेमाल किया जाएगा. यह एक ऐसा टूल है जो BigQuery Storage API का इस्तेमाल करके, BigQuery और Spark के बीच डेटा को पढ़ने और लिखने की सुविधा देता है.
BigQuery Storage API, आरपीसी पर आधारित प्रोटोकॉल का इस्तेमाल करके, BigQuery में डेटा ऐक्सेस करने की प्रोसेस को बेहतर बनाता है. यह एक साथ डेटा को पढ़ने और लिखने के साथ-साथ, अलग-अलग सीरियलाइज़ेशन फ़ॉर्मैट के साथ काम करता है. जैसे, Apache Avro और Apache Arrow. इससे, परफ़ॉर्मेंस में काफ़ी सुधार होता है. खास तौर पर, बड़े डेटा सेट पर काम करते समय.
पहली सेल में, अपने क्लस्टर का Scala वर्शन देखें, ताकि spark-bigquery-connector jar का सही वर्शन शामिल किया जा सके.
इनपुट [1]:
!scala -version
आउटपुट [1]:
Spark सेशन बनाएं और उसमें spark-bigquery-connector पैकेज शामिल करें.
अगर Scala का वर्शन 2.11 है, तो इस पैकेज का इस्तेमाल करें.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
अगर Scala का वर्शन 2.12 है, तो इस पैकेज का इस्तेमाल करें.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
इनपुट [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
repl.eagerEval को चालू करें
इससे हर चरण में डेटाफ़्रेम के नतीजे दिखेंगे. इसके लिए, df.show() दिखाने की ज़रूरत नहीं होगी. साथ ही, इससे आउटपुट का फ़ॉर्मैट भी बेहतर होगा.
इनपुट [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
BigQuery टेबल को Spark DataFrame में पढ़ना
BigQuery के सार्वजनिक डेटासेट से डेटा पढ़कर, Spark DataFrame बनाएं. यह Spark क्लस्टर में डेटा लोड करने के लिए, spark-bigquery-connector और BigQuery Storage API का इस्तेमाल करता है.
Spark DataFrame बनाएं और Wikipedia पेजव्यू के लिए BigQuery के सार्वजनिक डेटासेट से डेटा लोड करें. आपको दिखेगा कि डेटा पर कोई क्वेरी नहीं चल रही है, क्योंकि डेटा को Spark में लोड करने के लिए spark-bigquery-connector का इस्तेमाल किया जा रहा है. डेटा की प्रोसेसिंग Spark में होगी. इस कोड को चलाने पर, टेबल लोड नहीं होगी. ऐसा इसलिए, क्योंकि Spark में लेज़ी इवैलुएशन होता है. इसलिए, टेबल लोड करने की प्रोसेस अगले चरण में होगी.
इनपुट [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
आउटपुट [4]:

ज़रूरी कॉलम चुनें और where() का इस्तेमाल करके फ़िल्टर लागू करें. यह filter() का एलियास है.
इस कोड को चलाने पर, Spark ऐक्शन ट्रिगर होता है. साथ ही, इस समय BigQuery Storage से डेटा पढ़ा जाता है.
इनपुट [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
आउटपुट [5]:

सबसे ज़्यादा व्यू वाले पेज देखने के लिए, टाइटल के हिसाब से ग्रुप करें और पेज व्यू के हिसाब से क्रम में लगाएं
इनपुट [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
आउटपुट [6]:
7. नोटबुक में Python की प्लॉटिंग लाइब्रेरी का इस्तेमाल करना
अपने Spark जॉब के आउटपुट को प्लॉट करने के लिए, Python में उपलब्ध अलग-अलग प्लॉटिंग लाइब्रेरी का इस्तेमाल किया जा सकता है.
Spark DataFrame को Pandas DataFrame में बदलना
Spark DataFrame को Pandas DataFrame में बदलें और datehour को इंडेक्स के तौर पर सेट करें. अगर आपको Python में सीधे तौर पर डेटा के साथ काम करना है और Python की कई उपलब्ध प्लॉटर लाइब्रेरी का इस्तेमाल करके डेटा को प्लॉट करना है, तो यह तरीका आपके लिए फ़ायदेमंद है.
इनपुट [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
आउटपुट [7]:

Pandas Dataframe को प्लॉट करना
matplotlib लाइब्रेरी इंपोर्ट करें. इसकी मदद से, नोटबुक में प्लॉट दिखाए जा सकते हैं
इनपुट [8]:
import matplotlib.pyplot as plt
Pandas DataFrame से लाइन चार्ट बनाने के लिए, Pandas plot फ़ंक्शन का इस्तेमाल करें.
इनपुट [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
आउटपुट [9]:
देखें कि नोटबुक को GCS में सेव किया गया हो
अब आपके पास अपने Dataproc क्लस्टर पर, Jupyter notebook का पहला वर्शन चालू होना चाहिए. अपनी नोटबुक को कोई नाम दें. यह उस GCS बकेट में अपने-आप सेव हो जाएगी जिसका इस्तेमाल क्लस्टर बनाते समय किया गया था.
क्लाउड शेल में इस gsutil कमांड का इस्तेमाल करके, इसकी जांच की जा सकती है
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
आपको यह आउटपुट दिखेगा
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. ऑप्टिमाइज़ेशन से जुड़ी सलाह - डेटा को मेमोरी में कैश मेमोरी के तौर पर सेव करें
ऐसा हो सकता है कि आपको हर बार BigQuery स्टोरेज से डेटा पढ़ने के बजाय, मेमोरी में मौजूद डेटा का इस्तेमाल करना हो.
यह जॉब, BigQuery से डेटा पढ़ेगी और फ़िल्टर को BigQuery पर पुश करेगी. इसके बाद, एग्रीगेशन की गणना Apache Spark में की जाएगी.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
ऊपर दिए गए जॉब में बदलाव करके, टेबल की कैश मेमोरी को शामिल किया जा सकता है. अब Apache Spark, मेमोरी में मौजूद विकी कॉलम पर फ़िल्टर लागू करेगा.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
इसके बाद, BigQuery स्टोरेज से डेटा को फिर से पढ़ने के बजाय, कैश मेमोरी में सेव किए गए डेटा का इस्तेमाल करके, किसी दूसरी विकी भाषा के लिए फ़िल्टर किया जा सकता है. इसलिए, यह प्रोसेस बहुत तेज़ी से पूरी होगी.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
कैश मेमोरी को हटाने के लिए, यह कमांड चलाएं
df_wiki_all.unpersist()
9. इस्तेमाल के अन्य उदाहरणों के लिए नोटबुक के उदाहरण
Cloud Dataproc GitHub repo में, डेटा लोड करने, डेटा सेव करने, और Google Cloud Platform के अलग-अलग प्रॉडक्ट और ओपन-सोर्स टूल की मदद से डेटा को प्लॉट करने के लिए, Apache Spark के सामान्य पैटर्न वाली Jupyter नोटबुक शामिल हैं:
10. व्यवस्थित करें
क्विकस्टार्ट पूरा होने के बाद, अपने GCP खाते पर बेवजह के शुल्क से बचने के लिए:
- आपने जिस एनवायरमेंट के लिए Cloud Storage बकेट बनाया है उसे मिटाएं
- Dataproc एनवायरमेंट मिटाएं.
अगर आपने यह प्रोजेक्ट सिर्फ़ इस कोडलैब के लिए बनाया है, तो आपके पास इसे मिटाने का विकल्प भी है:
- GCP Console में, प्रोजेक्ट पेज पर जाएं.
- प्रोजेक्ट की सूची में, वह प्रोजेक्ट चुनें जिसे मिटाना है. इसके बाद, मिटाएं पर क्लिक करें.
- बॉक्स में प्रोजेक्ट आईडी डालें. इसके बाद, प्रोजेक्ट मिटाने के लिए बंद करें पर क्लिक करें.
लाइसेंस
इस काम के लिए, Creative Commons एट्रिब्यूशन 3.0 जेनेरिक लाइसेंस और Apache 2.0 लाइसेंस के तहत लाइसेंस मिला है.