نوت بوک های Apache Spark و Jupyter در Cloud Dataproc

1. بررسی اجمالی

این آزمایشگاه نحوه راه اندازی و استفاده از نوت بوک های Apache Spark و Jupyter در Cloud Dataproc را پوشش می دهد.

نوت‌بوک‌های Jupyter به طور گسترده برای تجزیه و تحلیل داده‌های اکتشافی و ساخت مدل‌های یادگیری ماشین استفاده می‌شوند، زیرا به شما امکان می‌دهند کد خود را به صورت تعاملی اجرا کنید و فوراً نتایج خود را مشاهده کنید.

با این حال راه اندازی و استفاده از نوت بوک های Apache Spark و Jupyter می تواند پیچیده باشد.

b9ed855863c57d6.png

Cloud Dataproc این کار را سریع و آسان می کند و به شما امکان می دهد یک کلاستر Dataproc با Apache Spark، جزء Jupyter و Component Gateway در حدود 90 ثانیه ایجاد کنید.

چیزی که یاد خواهید گرفت

در این کد لبه، شما یاد خواهید گرفت که چگونه:

  • یک سطل Google Cloud Storage برای خوشه خود ایجاد کنید
  • ایجاد یک خوشه Dataproc با Jupyter و Component Gateway،
  • دسترسی به رابط کاربری وب JupyterLab در Dataproc
  • با استفاده از کانکتور Spark BigQuery Storage یک نوت بوک ایجاد کنید
  • اجرای یک کار اسپارک و ترسیم نتایج.

کل هزینه اجرای این آزمایشگاه در Google Cloud حدود 1 دلار است. جزئیات کامل در مورد قیمت گذاری Cloud Dataproc را می توانید در اینجا پیدا کنید.

2. ایجاد یک پروژه

به کنسول Google Cloud Platform در console.cloud.google.com وارد شوید و یک پروژه جدید ایجاد کنید:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

در مرحله بعد، برای استفاده از منابع Google Cloud، باید صورتحساب را در کنسول Cloud فعال کنید .

گذراندن این کد نباید بیش از چند دلار برای شما هزینه داشته باشد، اما اگر تصمیم به استفاده از منابع بیشتری داشته باشید یا آنها را در حال اجرا رها کنید ممکن است بیشتر باشد. آخرین بخش از این نرم افزار کد شما را در تمیز کردن پروژه راهنمایی می کند.

کاربران جدید Google Cloud Platform واجد شرایط استفاده آزمایشی رایگان 300 دلاری هستند.

3. تنظیم محیط خود

ابتدا، Cloud Shell را با کلیک بر روی دکمه در گوشه سمت راست بالای کنسول ابری باز کنید:

a10c47ee6ca41c54.png

پس از بارگیری Cloud Shell، دستور زیر را برای تنظیم ID پروژه از مرحله قبل اجرا کنید**:**

gcloud config set project <project_id>

شناسه پروژه همچنین با کلیک بر روی پروژه خود در سمت چپ بالای کنسول ابری قابل مشاهده است:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

سپس، APIهای Dataproc، Compute Engine و BigQuery Storage را فعال کنید.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

یا این را می توان در Cloud Console انجام داد. روی نماد منو در سمت چپ بالای صفحه کلیک کنید.

2bfc27ef9ba2ec7d.png

از منوی کشویی API Manager را انتخاب کنید.

408af5f32c4b7c25.png

روی Enable APIs and Services کلیک کنید.

a9c0e84296a7ba5b.png

API های زیر را جستجو و فعال کنید:

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. یک سطل GCS ایجاد کنید

یک سطل Google Cloud Storage در نزدیک ترین منطقه به داده های خود ایجاد کنید و نامی منحصر به فرد به آن بدهید.

این برای خوشه Dataproc استفاده خواهد شد.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

شما باید خروجی زیر را ببینید

Creating gs://<your-bucket-name>/...

5. خوشه Dataproc خود را با Jupyter & Component Gateway ایجاد کنید

ایجاد خوشه شما

متغیرهای env را برای خوشه خود تنظیم کنید

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

سپس این دستور gcloud را اجرا کنید تا خوشه خود را با تمام اجزای لازم برای کار با Jupyter در خوشه خود ایجاد کنید.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

هنگام ایجاد خوشه شما باید خروجی زیر را ببینید

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

حدود 90 ثانیه طول می کشد تا خوشه خود را ایجاد کنید و پس از آماده شدن می توانید از رابط کاربری کنسول Dataproc Cloud به کلاستر خود دسترسی پیدا کنید.

در حالی که منتظر هستید، می توانید به خواندن زیر ادامه دهید تا در مورد پرچم های استفاده شده در دستور gcloud بیشتر بدانید.

پس از ایجاد خوشه باید خروجی زیر را داشته باشید:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

پرچم های استفاده شده در دستور ایجاد gcloud dataproc

در اینجا یک تفکیک از پرچم های استفاده شده در دستور ایجاد gcloud dataproc آورده شده است

--region=${REGION}

منطقه و منطقه ای را که خوشه در آن ایجاد می شود را مشخص می کند. لیست مناطق موجود را می توانید اینجا ببینید.

--image-version=1.4

نسخه تصویر برای استفاده در خوشه. لیست نسخه های موجود را می توانید اینجا ببینید.

--bucket=${BUCKET_NAME}

سطل Google Cloud Storage را که قبلاً ایجاد کرده اید برای استفاده برای خوشه مشخص کنید. اگر یک سطل GCS تهیه نکنید، برای شما ایجاد خواهد شد.

این همان جایی است که نوت‌بوک‌های شما ذخیره می‌شوند، حتی اگر خوشه خود را حذف کنید زیرا سطل GCS حذف نشده است.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

انواع ماشین برای استفاده برای خوشه Dataproc شما. می توانید لیستی از انواع ماشین های موجود را در اینجا ببینید.

به طور پیش فرض، اگر flag –num-workers را تنظیم نکنید، 1 گره اصلی و 2 گره کارگر ایجاد می شود.

--optional-components=ANACONDA,JUPYTER

تنظیم این مقادیر برای مؤلفه‌های اختیاری ، تمام کتابخانه‌های لازم برای Jupyter و Anaconda (که برای نوت‌بوک‌های Jupyter مورد نیاز است) روی خوشه شما نصب می‌کند.

--enable-component-gateway

فعال کردن Component Gateway یک پیوند App Engine را با استفاده از Apache Knox و Inverting Proxy ایجاد می کند که دسترسی آسان، ایمن و تأیید شده را به رابط های وب Jupyter و JupyterLab می دهد به این معنی که دیگر نیازی به ایجاد تونل های SSH ندارید.

همچنین پیوندهایی را برای ابزارهای دیگر در کلاستر از جمله مدیریت منابع نخ و سرور Spark History ایجاد می کند که برای مشاهده عملکرد مشاغل و الگوهای استفاده از خوشه مفید هستند.

6. یک نوت بوک آپاچی اسپارک بسازید

دسترسی به رابط وب JupyterLab

پس از آماده شدن خوشه، می‌توانید با رفتن به Dataproc Clusters - Cloud console ، روی خوشه‌ای که ایجاد کرده‌اید کلیک کنید و به تب Web Interfaces بروید، پیوند دروازه Component به رابط وب JupyterLab را پیدا کنید.

afc40202d555de47.png

متوجه خواهید شد که به Jupyter که رابط نوت بوک کلاسیک است یا JupyterLab که به عنوان رابط کاربری نسل بعدی پروژه Jupyter توصیف می شود، دسترسی دارید.

بسیاری از ویژگی‌های رابط کاربری جدید و فوق‌العاده در JupyterLab وجود دارد، بنابراین اگر در استفاده از نوت‌بوک‌ها تازه کار هستید یا به دنبال آخرین پیشرفت‌ها هستید، توصیه می‌شود از JupyterLab استفاده کنید زیرا در نهایت طبق اسناد رسمی جایگزین رابط کلاسیک Jupyter خواهد شد.

یک نوت بوک با هسته Python 3 ایجاد کنید

a463623f2ebf0518.png

از تب لانچر، روی نماد نوت بوک Python 3 کلیک کنید تا یک نوت بوک با هسته Python 3 (نه هسته PySpark) ایجاد کنید که به شما امکان می دهد SparkSession را در نوت بوک پیکربندی کنید و کانکتور spark-bigquery- مورد نیاز برای استفاده از BigQuery Storage را اضافه کنید. API .

نام نوت بوک را تغییر دهید

196a3276ed07e1f3.png

روی نام نوت بوک در نوار کناری سمت چپ یا پیمایش بالا کلیک راست کنید و نام نوت بوک را به "BigQuery Storage & Spark DataFrames.ipynb" تغییر دهید.

کد Spark خود را در نوت بوک اجرا کنید

fbac38062e5bb9cf.png

در این نوت بوک، شما از کانکتور spark-bigquery استفاده خواهید کرد که ابزاری برای خواندن و نوشتن داده ها بین BigQuery و Spark با استفاده از BigQuery Storage API است.

BigQuery Storage API با استفاده از یک پروتکل مبتنی بر RPC، پیشرفت های قابل توجهی را برای دسترسی به داده ها در BigQuery به ارمغان می آورد. از خواندن و نوشتن داده ها به صورت موازی و همچنین فرمت های سریال سازی مختلف مانند Apache Avro و Apache Arrow پشتیبانی می کند. در سطح بالا، این به بهبود عملکرد قابل توجهی، به ویژه در مجموعه داده‌های بزرگتر منجر می‌شود.

در اولین سلول، نسخه Scala خوشه خود را بررسی کنید تا بتوانید نسخه صحیح spark-bigquery-connector jar را وارد کنید.

ورودی [1]:

!scala -version

خروجی [1]: f580e442576b8b1f.png یک جلسه Spark ایجاد کنید و بسته spark-bigquery-connector را در آن قرار دهید.

اگر نسخه Scala شما 2.11 است از بسته زیر استفاده کنید.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

اگر نسخه Scala شما 2.12 است از بسته زیر استفاده کنید.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

ورودی [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

repl.eagerEval را فعال کنید

این کار نتایج DataFrames را در هر مرحله بدون نیاز به نمایش df.show() خروجی می دهد و همچنین قالب بندی خروجی را بهبود می بخشد.

ورودی [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

جدول BigQuery را در Spark DataFrame بخوانید

با خواندن داده ها از مجموعه داده عمومی BigQuery یک Spark DataFrame ایجاد کنید. این باعث می شود از spark-bigquery-connector و BigQuery Storage API برای بارگذاری داده ها در Spark cluster استفاده شود.

یک Spark DataFrame ایجاد کنید و داده ها را از مجموعه داده عمومی BigQuery برای بازدید از صفحه ویکی پدیا بارگیری کنید. متوجه خواهید شد که در حال اجرای پرس و جو روی داده ها نیستید زیرا از کانکتور spark-bigquery-connector برای بارگذاری داده ها در Spark استفاده می کنید، جایی که پردازش داده ها انجام می شود. هنگامی که این کد اجرا می شود، در واقع جدول را بارگذاری نمی کند زیرا این یک ارزیابی تنبل در Spark است و اجرا در مرحله بعدی رخ می دهد.

ورودی [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

خروجی [4]:

c107a33f6fc30ca.png

ستون های مورد نیاز را انتخاب کنید و با استفاده از where() که نام مستعار filter() است، یک فیلتر اعمال کنید.

وقتی این کد اجرا می‌شود، یک عمل Spark را راه‌اندازی می‌کند و در این مرحله داده‌ها از BigQuery Storage خوانده می‌شوند.

ورودی [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

خروجی [5]:

ad363cbe510d625a.png

گروه بندی بر اساس عنوان و ترتیب بر اساس بازدید از صفحه برای دیدن صفحات برتر

ورودی [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

خروجی [6]: f718abd05afc0f4.png

7. از کتابخانه های ترسیم پایتون در نوت بوک استفاده کنید

می‌توانید از کتابخانه‌های ترسیم مختلف موجود در پایتون برای رسم خروجی کارهای Spark خود استفاده کنید.

Spark DataFrame را به Pandas DataFrame تبدیل کنید

Spark DataFrame را به Pandas DataFrame تبدیل کنید و تاریخ ساعت را به عنوان شاخص تنظیم کنید. اگر می‌خواهید مستقیماً با داده‌ها در پایتون کار کنید و داده‌ها را با استفاده از بسیاری از کتابخانه‌های موجود در پایتون رسم کنید، مفید است.

ورودی [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

خروجی [7]:

3df2aaa2351f028d.png

ترسیم چارچوب داده پانداها

کتابخانه matplotlib را که برای نمایش نمودارها در نوت بوک لازم است، وارد کنید

ورودی [8]:

import matplotlib.pyplot as plt

از تابع طرح Pandas برای ایجاد نمودار خطی از Pandas DataFrame استفاده کنید.

ورودی [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

خروجی [9]: bade7042c3033594.png

بررسی کنید که نوت بوک در GCS ذخیره شده است

اکنون باید اولین نوت بوک Jupyter خود را روی خوشه Dataproc خود راه اندازی کنید. نوت بوک خود را نامی بگذارید و به طور خودکار در سطل GCS که هنگام ایجاد خوشه استفاده می شود ذخیره می شود.

شما می توانید این را با استفاده از این دستور gsutil در پوسته ابری بررسی کنید

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

شما باید خروجی زیر را ببینید

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. نکته بهینه سازی - کش داده ها در حافظه

ممکن است سناریوهایی وجود داشته باشد که بخواهید داده ها را به جای خواندن هر بار از فضای ذخیره سازی BigQuery در حافظه نگه دارید.

این کار داده ها را از BigQuery می خواند و فیلتر را به BigQuery فشار می دهد. سپس تجمع در آپاچی اسپارک محاسبه خواهد شد.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

می‌توانید کار بالا را طوری تغییر دهید که یک کش جدول را شامل شود و اکنون فیلتر روی ستون ویکی توسط آپاچی اسپارک در حافظه اعمال می‌شود.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

سپس می‌توانید به جای خواندن دوباره داده‌ها از فضای ذخیره‌سازی BigQuery، زبان ویکی دیگری را با استفاده از داده‌های حافظه پنهان فیلتر کنید و بنابراین بسیار سریع‌تر اجرا می‌شود.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

می توانید کش را با اجرا حذف کنید

df_wiki_all.unpersist()

9. نمونه نوت بوک برای موارد استفاده بیشتر

مخزن Cloud Dataproc GitHub دارای نوت‌بوک‌های Jupyter با الگوهای رایج Apache Spark برای بارگیری داده‌ها، ذخیره داده‌ها و ترسیم داده‌های شما با محصولات مختلف Google Cloud Platform و ابزارهای منبع باز است:

10. پاکسازی کنید

برای جلوگیری از تحمیل هزینه‌های غیرضروری به حساب GCP خود پس از تکمیل این شروع سریع:

  1. سطل Cloud Storage را برای محیطی که ایجاد کرده اید حذف کنید
  2. محیط Dataproc را حذف کنید .

اگر پروژه ای را فقط برای این کد لبه ایجاد کرده اید، می توانید به صورت اختیاری پروژه را نیز حذف کنید:

  1. در کنسول GCP، به صفحه پروژه ها بروید.
  2. در لیست پروژه، پروژه ای را که می خواهید حذف کنید انتخاب کنید و روی Delete کلیک کنید.
  3. در کادر، ID پروژه را تایپ کنید و سپس بر روی Shut down کلیک کنید تا پروژه حذف شود.

مجوز

این اثر تحت مجوز Creative Commons Attribution 3.0 Generic و مجوز Apache 2.0 مجوز دارد.