Cloud Dataproc पर Apache Spark और Jupyter Notebooks

1. खास जानकारी

इस लैब में, Cloud Dataproc पर Apache Spark और Jupyter notebook को सेट अप करने और इस्तेमाल करने का तरीका बताया जाएगा.

Jupyter notebook का इस्तेमाल बड़े पैमाने पर डेटा का विश्लेषण करने और मशीन लर्निंग मॉडल बनाने के लिए किया जाता है. इसकी मदद से, इंटरैक्टिव तरीके से कोड चलाया जा सकता है और तुरंत नतीजे देखे जा सकते हैं.

हालांकि, Apache Spark और Jupyter Notebooks को सेट अप और इस्तेमाल करना मुश्किल हो सकता है.

b9ed855863c57d6.png

Cloud Dataproc, आपको करीब 90 सेकंड में Apache Spark, Jupyter कॉम्पोनेंट, और कॉम्पोनेंट गेटवे के साथ Dataproc क्लस्टर बनाने की अनुमति देता है. इससे, यह प्रोसेस तेज़ और आसान हो जाती है.

आप इन चीज़ों के बारे में जानेंगे

इस कोडलैब में, आपको यह पता चलेगा कि कैसे:

  • अपने क्लस्टर के लिए Google Cloud Storage बकेट बनाएं
  • Jupyter और कॉम्पोनेंट गेटवे के साथ डेटाप्रॉक क्लस्टर बनाएं,
  • Dataproc पर JupyterLab वेब यूज़र इंटरफ़ेस (यूआई) को ऐक्सेस करना
  • Spark BigQuery Storage कनेक्टर का इस्तेमाल करने वाली नोटबुक बनाएं
  • स्पार्क जॉब चलाना और नतीजे प्लॉट करना.

Google Cloud पर इस लैब को चलाने की कुल लागत करीब 1 डॉलर है. Cloud Dataproc की कीमत के बारे में पूरी जानकारी यहां देखी जा सकती है.

2. प्रोजेक्ट बनाना

console.cloud.google.com पर जाकर, Google Cloud Platform के कंसोल में साइन इन करें और एक नया प्रोजेक्ट बनाएं:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

इसके बाद, आपको Google Cloud के संसाधनों का इस्तेमाल करने के लिए, Cloud Console में बिलिंग की सुविधा चालू करनी होगी.

इस कोडलैब का इस्तेमाल करने के लिए आपको कुछ डॉलर से ज़्यादा खर्च नहीं करना चाहिए. हालांकि, अगर आप ज़्यादा संसाधनों का इस्तेमाल करने का फ़ैसला करते हैं या उन्हें चलाना बंद कर देते हैं, तो यह ज़्यादा हो सकता है. इस कोडलैब के आखिरी सेक्शन में, आपको अपने प्रोजेक्ट को हटाने का तरीका बताया जाएगा.

Google Cloud Platform के नए उपयोगकर्ता $300 के मुफ़्त परीक्षण के लिए योग्य हैं.

3. आपका एनवायरमेंट सेट अप किया जा रहा है

सबसे पहले, Cloud Console के सबसे ऊपर दाएं कोने में मौजूद बटन पर क्लिक करके Cloud Shell को खोलें:

a10c47ee6ca41c54.png

Cloud Shell के लोड होने के बाद, पिछले चरण से प्रोजेक्ट आईडी को सेट करने के लिए नीचे दिया गया कमांड चलाएं**:**

gcloud config set project <project_id>

Cloud Console में सबसे ऊपर बाईं ओर, अपने प्रोजेक्ट पर क्लिक करके भी प्रोजेक्ट आईडी मिल सकता है:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

इसके बाद, Dataproc, Compute Engine, और BigQuery Storage के एपीआई चालू करें.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

इसके अलावा, Cloud Console में जाकर ऐसा किया जा सकता है. स्क्रीन पर सबसे ऊपर बाईं ओर, मेन्यू आइकॉन पर क्लिक करें.

2bfc27ef9ba2ec7d.png

ड्रॉप-डाउन से एपीआई मैनेजर चुनें.

408af5f32c4b7c25.png

एपीआई और सेवाएं चालू करें पर क्लिक करें.

a9c0e84296a7ba5b.png

इन एपीआई को खोजें और चालू करें:

  • Compute Engine एपीआई
  • डेटाप्रॉक एपीआई
  • BigQuery API
  • BigQuery स्टोरेज एपीआई

4. GCS बकेट बनाना

अपने डेटा के सबसे नज़दीक वाले इलाके में Google Cloud Storage बकेट बनाएं और उसे एक यूनीक नाम दें.

इसका इस्तेमाल Dataproc क्लस्टर के लिए किया जाएगा.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

आपको यह आउटपुट दिखेगा

Creating gs://<your-bucket-name>/...

5. Jupyter &के साथ अपना Dataproc क्लस्टर बनाएं कॉम्पोनेंट गेटवे

अपना क्लस्टर बनाया जा रहा है

अपने क्लस्टर के लिए env वैरिएबल सेट करें

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

इसके बाद, सभी ज़रूरी कॉम्पोनेंट के साथ अपना क्लस्टर बनाने के लिए, इस gcloud कमांड को चलाएं, ताकि आप अपने क्लस्टर पर Jupyter के साथ काम कर सकें.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

क्लस्टर बनाए जाने के दौरान, आपको यह आउटपुट दिखेगा

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

आपका क्लस्टर बनाने में करीब 90 सेकंड लगेंगे. इसके तैयार होने पर, आपको Dataproc Cloud Console के यूज़र इंटरफ़ेस (यूआई) से अपना क्लस्टर ऐक्सेस करने की सुविधा मिलेगी.

इंतज़ार करने के दौरान, gcloud कमांड में इस्तेमाल किए गए फ़्लैग के बारे में ज़्यादा जानने के लिए, नीचे दी गई जानकारी पढ़ें.

क्लस्टर बनाने के बाद, आपको नीचे दिया गया आउटपुट मिलेगा:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

gcloud dataproc बनाने के लिए इस्तेमाल किए गए फ़्लैग

यहां gcloud dataproc Create कमांड में इस्तेमाल किए गए फ़्लैग की जानकारी दी गई है

--region=${REGION}

उस इलाके और ज़ोन के बारे में बताता है जहां क्लस्टर बनाया जाएगा. यहां उन इलाकों की सूची देखी जा सकती है जहां यह सुविधा उपलब्ध है.

--image-version=1.4

आपके क्लस्टर में इस्तेमाल करने के लिए इमेज का वर्शन. आप यहां उपलब्ध वर्शन की सूची देख सकते हैं.

--bucket=${BUCKET_NAME}

क्लस्टर के लिए इस्तेमाल करने के लिए, पहले बनाया गया Google Cloud Storage बकेट तय करें. अगर आपने GCS की बकेट नहीं दी है, तो यह आपके लिए बना दी जाएगी.

आपके क्लस्टर को मिटाने के बाद भी, आपकी नोटबुक यहां सेव होंगी. ऐसा इसलिए, क्योंकि GCS (जीसीएस) बकेट को नहीं मिटाया जाता.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

आपके Dataproc क्लस्टर के लिए, इस्तेमाल किए जाने वाले मशीन टाइप. आप उपलब्ध मशीन टाइप की सूची यहां देख सकते हैं.

अगर आप फ़्लैग–num- सहमति को सेट नहीं करते है, तो डिफ़ॉल्ट रूप से 1 मास्टर नोड और 2 वर्कर नोड बन जाते हैं

--optional-components=ANACONDA,JUPYTER

वैकल्पिक कॉम्पोनेंट के लिए ये वैल्यू सेट करने से, आपके क्लस्टर पर Jupyter और Anaconda के लिए सभी ज़रूरी लाइब्रेरी इंस्टॉल हो जाएंगी. यह Jupyter notebook के लिए ज़रूरी है.

--enable-component-gateway

कॉम्पोनेंट गेटवे को चालू करने से, Apache Knox और इन्वर्टिंग प्रॉक्सी का इस्तेमाल करके, App Engine लिंक बनाया जाता है. यह Jupyter और JupyterLab वेब इंटरफ़ेस का आसान, सुरक्षित, और पुष्टि किया हुआ ऐक्सेस देता है. इसका मतलब है कि अब आपको एसएसएच टनल बनाने की ज़रूरत नहीं पड़ेगी.

यह क्लस्टर पर अन्य टूल के लिए लिंक भी बनाएगा. इनमें यार्न रिसॉर्स मैनेजर और स्पार्क हिस्ट्री सर्वर शामिल हैं जो आपके टास्क की परफ़ॉर्मेंस और क्लस्टर के इस्तेमाल के पैटर्न को देखने में मदद करेंगे.

6. Apache Spark नोटबुक बनाएं

JupyterLab के वेब इंटरफ़ेस को ऐक्सेस करना

क्लस्टर तैयार होने के बाद, Dataproc Clusters - Cloud console पर जाकर, अपने बनाए गए क्लस्टर पर क्लिक करके, और Web Interfaces टैब पर जाकर, JupyterLab वेब इंटरफ़ेस के लिए कॉम्पोनेंट गेटवे का लिंक ढूंढा जा सकता है.

afc40202d555de47.png

आपको दिखेगा कि आपके पास Jupyter का ऐक्सेस है. यह क्लासिक नोटबुक इंटरफ़ेस या JupyterLab है. इसे Project Jupyter के लिए अगली-पीढ़ी की टेक्नोलॉजी के यूज़र इंटरफ़ेस (यूआई) के तौर पर बताया गया है.

JupyterLab में कई बेहतरीन यूज़र इंटरफ़ेस (यूआई) सुविधाएं हैं. इसलिए, अगर आपने नोटबुक का इस्तेमाल पहले कभी नहीं किया है या नए सुधारों की जानकारी हासिल की है, तो हमारा सुझाव है कि JupyterLab का इस्तेमाल करें. ऐसा इसलिए, क्योंकि आधिकारिक दस्तावेज़ के हिसाब से यह क्लासिक Jupyter इंटरफ़ेस की जगह ले लेगा.

Python 3 कर्नेल के साथ नोटबुक बनाना

a463623f2ebf0518.png

Python 3 कर्नेल (न कि PySpark कर्नेल) के साथ नोटबुक बनाने के लिए, लॉन्चर टैब से Python 3 नोटबुक आइकॉन पर क्लिक करें. इसकी मदद से, notebook में Sparksession कॉन्फ़िगर किया जा सकता है. साथ ही, BigQuery Storage API का इस्तेमाल करने के लिए ज़रूरी spark-bigquery-कनेक्टर को शामिल किया जा सकता है.

नोटबुक का नाम बदलें

196a3276ed07e1f3.png

बाईं ओर मौजूद साइडबार में या सबसे ऊपर मौजूद नेविगेशन में, नोटबुक के नाम पर राइट क्लिक करें और नोटबुक का नाम बदलकर "BigQuery Storage & Spark DataFrames.ipynb"

Notebook में अपना स्पार्क कोड चलाएं

fbac38062e5bb9cf.png

इस notebook में, spark-bigquery-connector का इस्तेमाल किया जाएगा. यह एक ऐसा टूल है जिसकी मदद से, BigQuery और Spark के बीच डेटा को देखा और लिखा जा सकता है. साथ ही, BigQuery Storage API का इस्तेमाल किया जा सकता है.

BigQuery Storage API की मदद से, RPC पर आधारित प्रोटोकॉल का इस्तेमाल करके, BigQuery में डेटा को ऐक्सेस करने की प्रोसेस में काफ़ी सुधार किए जाते हैं. यह डेटा को एक साथ पढ़ने और लिखने के साथ-साथ, क्रम से लगाने के अलग-अलग फ़ॉर्मैट के साथ काम करता है. जैसे, Apache Avro और Apache Arrow. इससे, खास तौर पर बड़े डेटा सेट की परफ़ॉर्मेंस काफ़ी बेहतर होती है.

पहली सेल में, अपने क्लस्टर के स्काला वर्शन की जांच करें, ताकि आप spark-bigquery-कनेक्टर जार का सही वर्शन शामिल कर सकें.

इनपुट [1]:

!scala -version

आउटपुट [1]:f580e442576b8b1f.png एक स्पार्क सेशन बनाएं और spark-bigquery-connector पैकेज को शामिल करें.

अगर आपका Scala वर्शन 2.11 है, तो यहां दिया गया पैकेज इस्तेमाल करें.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

अगर आपका स्काला वर्शन 2.12 है, तो नीचे दिया गया पैकेज इस्तेमाल करें.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

इनपुट [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

repl.eagerEval चालू करें

इससे हर चरण में DataFrames के नतीजे मिलेंगे. इसके लिए, df.show() दिखाने की नई ज़रूरत नहीं पड़ेगी. साथ ही, आउटपुट की फ़ॉर्मैटिंग भी बेहतर होगी.

इनपुट [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

Spark DataFrame में BigQuery टेबल को पढ़ना

सार्वजनिक BigQuery डेटासेट के डेटा को पढ़कर, Spark DataFrame बनाएं. यह स्पार्क क्लस्टर में डेटा लोड करने के लिए, spark-bigquery-connector और BigQuery Storage API का इस्तेमाल करता है.

Spark DataFrame बनाएं और Wikipedia पेज व्यू के लिए BigQuery सार्वजनिक डेटासेट से डेटा लोड करें. आपको पता चलेगा कि डेटा को क्वेरी नहीं चलाया जा रहा, क्योंकि डेटा को स्पार्क में लोड करने के लिए spark-bigquery-कनेक्टर का इस्तेमाल किया जा रहा है, जहां डेटा को प्रोसेस किया जाएगा. इस कोड को चलाने पर, यह टेबल को लोड नहीं करेगा, क्योंकि यह स्पार्क में लेज़ी इवैलुएशन है और अगले चरण में इसे एक्ज़ीक्यूट किया जाएगा.

इनपुट [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

आउटपुट [4]:

c107a33f6fc30ca.png

ज़रूरी कॉलम चुनें और where() का इस्तेमाल करके फ़िल्टर लागू करें. यह filter() के लिए उपनाम है.

इस कोड को चलाने पर, यह स्पार्क ऐक्शन ट्रिगर करता है और इस समय BigQuery Storage से डेटा पढ़ा जाता है.

इनपुट [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

आउटपुट [5]:

ad363cbe510d625a.png

सबसे अच्छे पेज देखने के लिए, टाइटल के हिसाब से ग्रुप बनाएं और पेज व्यू के हिसाब से उन्हें क्रम में लगाएं

इनपुट [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

आउटपुट [6]:f718abd05afc0f4.png

7. Notebook में Python प्लॉटिंग लाइब्रेरी का इस्तेमाल करना

अपने Spark जॉब के आउटपुट को प्लॉट करने के लिए, Python में मौजूद अलग-अलग प्लॉटिंग लाइब्रेरी का इस्तेमाल किया जा सकता है.

Spark DataFrame को Pandas DataFrame में बदलना

Spark DataFrame को Pandas DataFrame में बदलें और समय की तारीख को इंडेक्स के रूप में सेट करें. यह तरीका तब काम आता है, जब आपको सीधे Python में डेटा के साथ काम करना हो और कई उपलब्ध Python प्लॉटिंग लाइब्रेरी का इस्तेमाल करके डेटा प्लॉट करना हो.

इनपुट [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

आउटपुट [7]:

3df2aaa2351f028d.png

Pands DataFrame को प्लॉट करना

matplotlib लाइब्रेरी को इंपोर्ट करें, जो notebook में प्लॉट दिखाने के लिए ज़रूरी है

इनपुट [8]:

import matplotlib.pyplot as plt

Pandas DataFrame से लाइन चार्ट बनाने के लिए, Pandas प्लॉट फ़ंक्शन का इस्तेमाल करना.

इनपुट [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

आउटपुट [9]:bade7042c3033594.png

देखें कि नोटबुक को GCS (जीसीएस) में सेव किया गया था

अब आपकी पहली Jupyter notebook बन चुकी है और यह आपके Dataproc क्लस्टर पर चल रही है. अपनी नोटबुक को कोई नाम दें. इसके बाद, यह क्लस्टर बनाते समय इस्तेमाल किए गए GCS बकेट में अपने-आप सेव हो जाएगी.

क्लाउड शेल में इस gsutil कमांड का इस्तेमाल करके, इसकी जांच की जा सकती है

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

आपको यह आउटपुट दिखेगा

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. ऑप्टिमाइज़ेशन के लिए सलाह - मेमोरी में कैश डेटा

ऐसा भी हो सकता है कि आपको हर बार BigQuery स्टोरेज से डेटा पढ़ने के बजाय, मेमोरी में डेटा चाहिए हो.

यह जॉब, BigQuery से डेटा पढ़ेगा और फ़िल्टर को BigQuery में भेजेगा. इसके बाद, Apache Spark में एग्रीगेशन की गिनती की जाएगी.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

आप टेबल के कैश को शामिल करने के लिए ऊपर दिए गए जॉब में बदलाव कर सकते हैं. अब विकी कॉलम में मौजूद फ़िल्टर, Apache Spark की मेमोरी में लागू किया जाएगा.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

इसके बाद, BigQuery स्टोरेज से डेटा को दोबारा पढ़ने के बजाय, कैश मेमोरी में सेव किए गए डेटा का इस्तेमाल करके, अन्य विकी भाषा को फ़िल्टर किया जा सकता है. इससे डेटा बहुत तेज़ी से चलेगा.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

आप चलाकर कैश मेमोरी हटा सकते हैं

df_wiki_all.unpersist()

9. इस्तेमाल के ज़्यादा उदाहरणों के लिए नोटबुक के उदाहरण

Cloud Dataproc GitHub रेपो में, Jupyter नोटबुक की सुविधाएं दी गई हैं. इनमें Apache Spark पैटर्न वाले सामान्य पैटर्न मौजूद हैं. इनकी मदद से, डेटा लोड किया जा सकता है, डेटा सेव किया जा सकता है, और Google Cloud Platform के अलग-अलग प्रॉडक्ट और ओपन-सोर्स टूल की मदद से डेटा प्लॉट किया जा सकता है:

10. व्यवस्थित करें

इस क्विकस्टार्ट के पूरा होने के बाद, आपके GCP खाते पर गैर-ज़रूरी शुल्क लगने से बचने के लिए:

  1. एनवायरमेंट और अपने बनाए गए एनवायरमेंट के लिए, Cloud Storage बकेट को मिटाना
  2. Dataproc एनवायरमेंट को मिटाएं.

अगर आपने सिर्फ़ इस कोडलैब के लिए कोई प्रोजेक्ट बनाया है, तो आपके पास उसे मिटाने का विकल्प भी होता है:

  1. GCP कंसोल में, प्रोजेक्ट पेज पर जाएं.
  2. प्रोजेक्ट की सूची में, वह प्रोजेक्ट चुनें जिसे आपको मिटाना है. इसके बाद, मिटाएं पर क्लिक करें.
  3. बॉक्स में, प्रोजेक्ट आईडी लिखें और फिर प्रोजेक्ट मिटाने के लिए शट डाउन करें पर क्लिक करें.

लाइसेंस

इस काम को Creative Commons एट्रिब्यूशन 3.0 सामान्य लाइसेंस और Apache 2.0 लाइसेंस के तहत लाइसेंस मिला है.