Cloud Dataproc पर Apache Spark और Jupyter Notebooks

1. खास जानकारी

इस लैब में, Cloud Dataproc पर Apache Spark और Jupyter Notebooks को सेट अप करने और इस्तेमाल करने का तरीका बताया जाएगा.

Jupyter Notebook का इस्तेमाल, खोजी डेटा विश्लेषण और मशीन लर्निंग मॉडल बनाने के लिए बड़े पैमाने पर किया जाता है. ऐसा इसलिए, क्योंकि ये आपको इंटरैक्टिव तरीके से कोड चलाने और तुरंत नतीजे देखने की सुविधा देते हैं.

हालांकि, Apache Spark और Jupyter Notebooks को सेट अप और इस्तेमाल करना मुश्किल हो सकता है.

b9ed855863c57d6.png

Cloud Dataproc की मदद से, यह काम आसानी से और तेज़ी से किया जा सकता है. इसकी मदद से, Apache Spark, Jupyter कॉम्पोनेंट, और कॉम्पोनेंट गेटवे के साथ Dataproc क्लस्टर को करीब 90 सेकंड में बनाया जा सकता है.

आपको क्या सीखने को मिलेगा

इस कोडलैब में, आपको इनके बारे में जानकारी मिलेगी:

  • अपने क्लस्टर के लिए Google Cloud Storage बकेट बनाना
  • Jupyter और कॉम्पोनेंट गेटवे के साथ Dataproc क्लस्टर बनाएं,
  • Dataproc पर JupyterLab के वेब यूज़र इंटरफ़ेस (यूआई) को ऐक्सेस करना
  • Spark BigQuery Storage connector का इस्तेमाल करके नोटबुक बनाना
  • स्पार्क जॉब चलाना और नतीजों को प्लॉट करना.

Google Cloud पर इस लैब को चलाने की कुल लागत करीब 1 डॉलर है. Cloud Dataproc की कीमत के बारे में पूरी जानकारी यहां देखी जा सकती है.

2. प्रोजेक्ट बनाना

console.cloud.google.com पर जाकर, Google Cloud Platform Console में साइन इन करें और एक नया प्रोजेक्ट बनाएं:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

इसके बाद, Google Cloud संसाधनों का इस्तेमाल करने के लिए, आपको Cloud Console में बिलिंग चालू करनी होगी.

इस कोडलैब को पूरा करने में आपको कुछ डॉलर से ज़्यादा खर्च नहीं करने पड़ेंगे. हालांकि, अगर आपने ज़्यादा संसाधनों का इस्तेमाल करने का फ़ैसला किया है या उन्हें चालू रखा है, तो यह ज़्यादा हो सकता है. इस कोडलैब के आखिरी सेक्शन में, आपको अपने प्रोजेक्ट को क्लीन अप करने के बारे में जानकारी मिलेगी.

Google Cloud Platform के नए उपयोगकर्ताओं को, मुफ़्त में आज़माने के लिए 300 डॉलर का क्रेडिट मिलता है.

3. अपना एनवायरमेंट सेट अप करना

सबसे पहले, क्लाउड कंसोल के सबसे ऊपर दाएं कोने में मौजूद बटन पर क्लिक करके, Cloud Shell खोलें:

a10c47ee6ca41c54.png

Cloud Shell लोड होने के बाद, पिछले चरण से प्रोजेक्ट आईडी सेट करने के लिए, यह निर्देश चलाएं**:**

gcloud config set project <project_id>

प्रोजेक्ट आईडी को Cloud Console में सबसे ऊपर बाईं ओर मौजूद अपने प्रोजेक्ट पर क्लिक करके भी देखा जा सकता है:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

इसके बाद, Dataproc, Compute Engine, और BigQuery Storage API चालू करें.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

इसके अलावा, यह काम Cloud Console में भी किया जा सकता है. स्क्रीन पर सबसे ऊपर बाईं ओर मौजूद, मेन्यू आइकॉन पर क्लिक करें.

2bfc27ef9ba2ec7d.png

ड्रॉप-डाउन से API Manager चुनें.

408af5f32c4b7c25.png

एपीआई और सेवाएं चालू करें पर क्लिक करें.

a9c0e84296a7ba5b.png

इन एपीआई को खोजें और चालू करें:

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery स्टोरेज एपीआई

4. GCS बकेट बनाना

अपने डेटा के सबसे नज़दीकी इलाके में Google Cloud Storage बकेट बनाएं और उसे कोई यूनीक नाम दें.

इसका इस्तेमाल Dataproc क्लस्टर के लिए किया जाएगा.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

आपको यह आउटपुट दिखेगा

Creating gs://<your-bucket-name>/...

5. Jupyter और कॉम्पोनेंट गेटवे की मदद से, Dataproc क्लस्टर बनाना

आपका क्लस्टर बनाया जा रहा है

अपने क्लस्टर के लिए एनवायरमेंट वैरिएबल सेट करना

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

इसके बाद, अपने क्लस्टर पर Jupyter का इस्तेमाल करने के लिए, सभी ज़रूरी कॉम्पोनेंट के साथ क्लस्टर बनाने के लिए, यह gcloud कमांड चलाएं.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

क्लस्टर बनाते समय, आपको यह आउटपुट दिखेगा

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

आपका क्लस्टर बनने में करीब 90 सेकंड लगेंगे. इसके बाद, Dataproc Cloud Console के यूज़र इंटरफ़ेस (यूआई) से अपने क्लस्टर को ऐक्सेस किया जा सकेगा.

जब तक यह प्रोसेस पूरी नहीं हो जाती, तब तक gcloud कमांड में इस्तेमाल किए गए फ़्लैग के बारे में ज़्यादा जानने के लिए, यहां दी गई जानकारी पढ़ें.

क्लस्टर बन जाने के बाद, आपको यह आउटपुट दिखेगा:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

gcloud dataproc create कमांड में इस्तेमाल किए गए फ़्लैग

gcloud dataproc create कमांड में इस्तेमाल किए गए फ़्लैग की जानकारी यहां दी गई है

--region=${REGION}

इससे उस क्षेत्र और ज़ोन के बारे में पता चलता है जहां क्लस्टर बनाया जाएगा. यहां उन देशों/इलाकों की सूची दी गई है जहां यह सुविधा उपलब्ध है.

--image-version=1.4

आपके क्लस्टर में इस्तेमाल करने के लिए इमेज का वर्शन. यहां उपलब्ध वर्शन की सूची देखी जा सकती है.

--bucket=${BUCKET_NAME}

उस Google Cloud Storage बकेट के बारे में बताएं जिसे आपने क्लस्टर के लिए इस्तेमाल करने के लिए पहले बनाया था. अगर आपने GCS बकेट नहीं दिया है, तो आपके लिए एक बकेट बनाया जाएगा.

आपकी नोटबुक भी यहीं सेव की जाएंगी. भले ही, आपने अपने क्लस्टर को मिटा दिया हो, क्योंकि GCS बकेट नहीं मिटती है.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

Dataproc क्लस्टर के लिए इस्तेमाल किए जाने वाले मशीन टाइप. उपलब्ध मशीन टाइप की सूची यहां देखी जा सकती है.

अगर आपने –num-workers फ़्लैग सेट नहीं किया है, तो डिफ़ॉल्ट रूप से एक मास्टर नोड और दो वर्कर नोड बनाए जाते हैं

--optional-components=ANACONDA,JUPYTER

ज़रूरी नहीं कॉम्पोनेंट के लिए ये वैल्यू सेट करने से, आपके क्लस्टर पर Jupyter और Anaconda (जो Jupyter नोटबुक के लिए ज़रूरी है) के लिए सभी ज़रूरी लाइब्रेरी इंस्टॉल हो जाएंगी.

--enable-component-gateway

कॉम्पोनेंट गेटवे को चालू करने से, Apache Knox और Inverting Proxy का इस्तेमाल करके App Engine लिंक बनता है. इससे Jupyter और JupyterLab के वेब इंटरफ़ेस को आसानी से, सुरक्षित तरीके से, और पुष्टि किए गए तरीके से ऐक्सेस किया जा सकता है. इसका मतलब है कि अब आपको एसएसएच टनल बनाने की ज़रूरत नहीं है.

यह क्लस्टर पर मौजूद अन्य टूल के लिए भी लिंक बनाएगा. इनमें Yarn Resource Manager और Spark History Server शामिल हैं. ये टूल, आपके जॉब की परफ़ॉर्मेंस और क्लस्टर के इस्तेमाल के पैटर्न देखने के लिए काम के होते हैं.

6. Apache Spark नोटबुक बनाना

JupyterLab के वेब इंटरफ़ेस को ऐक्सेस करना

क्लस्टर तैयार होने के बाद, JupyterLab के वेब इंटरफ़ेस पर जाने के लिए कॉम्पोनेंट गेटवे का लिंक ढूंढें. इसके लिए, Dataproc क्लस्टर - Cloud Console पर जाएं. इसके बाद, बनाए गए क्लस्टर पर क्लिक करें और वेब इंटरफ़ेस टैब पर जाएं.

afc40202d555de47.png

आपको दिखेगा कि आपके पास Jupyter का ऐक्सेस है. यह क्लासिक नोटबुक इंटरफ़ेस है. इसके अलावा, आपके पास JupyterLab का ऐक्सेस भी है. इसे Project Jupyter के लिए अगली जनरेशन का यूज़र इंटरफ़ेस (यूआई) बताया गया है.

JupyterLab में यूज़र इंटरफ़ेस (यूआई) की कई नई सुविधाएं उपलब्ध हैं. इसलिए, अगर आपको नोटबुक का इस्तेमाल करना नहीं आता या आपको नए सुधारों के बारे में जानना है, तो हमारा सुझाव है कि JupyterLab का इस्तेमाल करें. आधिकारिक दस्तावेज़ों के मुताबिक, यह क्लासिक Jupyter इंटरफ़ेस की जगह ले लेगा.

Python 3 कर्नेल वाली नोटबुक बनाना

a463623f2ebf0518.png

लॉन्चर टैब में, Python 3 नोटबुक आइकॉन पर क्लिक करें. इससे, Python 3 कर्नल (PySpark कर्नल नहीं) वाली नोटबुक बनाई जा सकती है. इसकी मदद से, नोटबुक में SparkSession को कॉन्फ़िगर किया जा सकता है. साथ ही, BigQuery Storage API का इस्तेमाल करने के लिए ज़रूरी spark-bigquery-connector को शामिल किया जा सकता है.

नोटबुक का नाम बदलना

196a3276ed07e1f3.png

बाईं ओर मौजूद साइडबार या सबसे ऊपर मौजूद नेविगेशन में, नोटबुक के नाम पर राइट क्लिक करें. इसके बाद, नोटबुक का नाम बदलकर "BigQuery Storage & Spark DataFrames.ipynb" करें

नोटबुक में Spark कोड चलाना

fbac38062e5bb9cf.png

इस नोटबुक में, spark-bigquery-connector का इस्तेमाल किया जाएगा. यह एक ऐसा टूल है जो BigQuery Storage API का इस्तेमाल करके, BigQuery और Spark के बीच डेटा को पढ़ने और लिखने की सुविधा देता है.

BigQuery Storage API, आरपीसी पर आधारित प्रोटोकॉल का इस्तेमाल करके, BigQuery में डेटा ऐक्सेस करने की प्रोसेस को बेहतर बनाता है. यह एक साथ डेटा को पढ़ने और लिखने के साथ-साथ, अलग-अलग सीरियलाइज़ेशन फ़ॉर्मैट के साथ काम करता है. जैसे, Apache Avro और Apache Arrow. इससे, परफ़ॉर्मेंस में काफ़ी सुधार होता है. खास तौर पर, बड़े डेटा सेट पर काम करते समय.

पहली सेल में, अपने क्लस्टर का Scala वर्शन देखें, ताकि spark-bigquery-connector jar का सही वर्शन शामिल किया जा सके.

इनपुट [1]:

!scala -version

आउटपुट [1]:f580e442576b8b1f.png Spark सेशन बनाएं और उसमें spark-bigquery-connector पैकेज शामिल करें.

अगर Scala का वर्शन 2.11 है, तो इस पैकेज का इस्तेमाल करें.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

अगर Scala का वर्शन 2.12 है, तो इस पैकेज का इस्तेमाल करें.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

इनपुट [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

repl.eagerEval को चालू करें

इससे हर चरण में डेटाफ़्रेम के नतीजे दिखेंगे. इसके लिए, df.show() दिखाने की ज़रूरत नहीं होगी. साथ ही, इससे आउटपुट का फ़ॉर्मैट भी बेहतर होगा.

इनपुट [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

BigQuery टेबल को Spark DataFrame में पढ़ना

BigQuery के सार्वजनिक डेटासेट से डेटा पढ़कर, Spark DataFrame बनाएं. यह Spark क्लस्टर में डेटा लोड करने के लिए, spark-bigquery-connector और BigQuery Storage API का इस्तेमाल करता है.

Spark DataFrame बनाएं और Wikipedia पेजव्यू के लिए BigQuery के सार्वजनिक डेटासेट से डेटा लोड करें. आपको दिखेगा कि डेटा पर कोई क्वेरी नहीं चल रही है, क्योंकि डेटा को Spark में लोड करने के लिए spark-bigquery-connector का इस्तेमाल किया जा रहा है. डेटा की प्रोसेसिंग Spark में होगी. इस कोड को चलाने पर, टेबल लोड नहीं होगी. ऐसा इसलिए, क्योंकि Spark में लेज़ी इवैलुएशन होता है. इसलिए, टेबल लोड करने की प्रोसेस अगले चरण में होगी.

इनपुट [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

आउटपुट [4]:

c107a33f6fc30ca.png

ज़रूरी कॉलम चुनें और where() का इस्तेमाल करके फ़िल्टर लागू करें. यह filter() का एलियास है.

इस कोड को चलाने पर, Spark ऐक्शन ट्रिगर होता है. साथ ही, इस समय BigQuery Storage से डेटा पढ़ा जाता है.

इनपुट [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

आउटपुट [5]:

ad363cbe510d625a.png

सबसे ज़्यादा व्यू वाले पेज देखने के लिए, टाइटल के हिसाब से ग्रुप करें और पेज व्यू के हिसाब से क्रम में लगाएं

इनपुट [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

आउटपुट [6]:f718abd05afc0f4.png

7. नोटबुक में Python की प्लॉटिंग लाइब्रेरी का इस्तेमाल करना

अपने Spark जॉब के आउटपुट को प्लॉट करने के लिए, Python में उपलब्ध अलग-अलग प्लॉटिंग लाइब्रेरी का इस्तेमाल किया जा सकता है.

Spark DataFrame को Pandas DataFrame में बदलना

Spark DataFrame को Pandas DataFrame में बदलें और datehour को इंडेक्स के तौर पर सेट करें. अगर आपको Python में सीधे तौर पर डेटा के साथ काम करना है और Python की कई उपलब्ध प्लॉटर लाइब्रेरी का इस्तेमाल करके डेटा को प्लॉट करना है, तो यह तरीका आपके लिए फ़ायदेमंद है.

इनपुट [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

आउटपुट [7]:

3df2aaa2351f028d.png

Pandas Dataframe को प्लॉट करना

matplotlib लाइब्रेरी इंपोर्ट करें. इसकी मदद से, नोटबुक में प्लॉट दिखाए जा सकते हैं

इनपुट [8]:

import matplotlib.pyplot as plt

Pandas DataFrame से लाइन चार्ट बनाने के लिए, Pandas plot फ़ंक्शन का इस्तेमाल करें.

इनपुट [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

आउटपुट [9]:bade7042c3033594.png

देखें कि नोटबुक को GCS में सेव किया गया हो

अब आपके पास अपने Dataproc क्लस्टर पर, Jupyter notebook का पहला वर्शन चालू होना चाहिए. अपनी नोटबुक को कोई नाम दें. यह उस GCS बकेट में अपने-आप सेव हो जाएगी जिसका इस्तेमाल क्लस्टर बनाते समय किया गया था.

क्लाउड शेल में इस gsutil कमांड का इस्तेमाल करके, इसकी जांच की जा सकती है

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

आपको यह आउटपुट दिखेगा

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. ऑप्टिमाइज़ेशन से जुड़ी सलाह - डेटा को मेमोरी में कैश मेमोरी के तौर पर सेव करें

ऐसा हो सकता है कि आपको हर बार BigQuery स्टोरेज से डेटा पढ़ने के बजाय, मेमोरी में मौजूद डेटा का इस्तेमाल करना हो.

यह जॉब, BigQuery से डेटा पढ़ेगी और फ़िल्टर को BigQuery पर पुश करेगी. इसके बाद, एग्रीगेशन की गणना Apache Spark में की जाएगी.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

ऊपर दिए गए जॉब में बदलाव करके, टेबल की कैश मेमोरी को शामिल किया जा सकता है. अब Apache Spark, मेमोरी में मौजूद विकी कॉलम पर फ़िल्टर लागू करेगा.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

इसके बाद, BigQuery स्टोरेज से डेटा को फिर से पढ़ने के बजाय, कैश मेमोरी में सेव किए गए डेटा का इस्तेमाल करके, किसी दूसरी विकी भाषा के लिए फ़िल्टर किया जा सकता है. इसलिए, यह प्रोसेस बहुत तेज़ी से पूरी होगी.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

कैश मेमोरी को हटाने के लिए, यह कमांड चलाएं

df_wiki_all.unpersist()

9. इस्तेमाल के अन्य उदाहरणों के लिए नोटबुक के उदाहरण

Cloud Dataproc GitHub repo में, डेटा लोड करने, डेटा सेव करने, और Google Cloud Platform के अलग-अलग प्रॉडक्ट और ओपन-सोर्स टूल की मदद से डेटा को प्लॉट करने के लिए, Apache Spark के सामान्य पैटर्न वाली Jupyter नोटबुक शामिल हैं:

10. व्यवस्थित करें

क्विकस्टार्ट पूरा होने के बाद, अपने GCP खाते पर बेवजह के शुल्क से बचने के लिए:

  1. आपने जिस एनवायरमेंट के लिए Cloud Storage बकेट बनाया है उसे मिटाएं
  2. Dataproc एनवायरमेंट मिटाएं.

अगर आपने यह प्रोजेक्ट सिर्फ़ इस कोडलैब के लिए बनाया है, तो आपके पास इसे मिटाने का विकल्प भी है:

  1. GCP Console में, प्रोजेक्ट पेज पर जाएं.
  2. प्रोजेक्ट की सूची में, वह प्रोजेक्ट चुनें जिसे मिटाना है. इसके बाद, मिटाएं पर क्लिक करें.
  3. बॉक्स में प्रोजेक्ट आईडी डालें. इसके बाद, प्रोजेक्ट मिटाने के लिए बंद करें पर क्लिक करें.

लाइसेंस

इस काम के लिए, Creative Commons एट्रिब्यूशन 3.0 जेनेरिक लाइसेंस और Apache 2.0 लाइसेंस के तहत लाइसेंस मिला है.