نوت بوک های Apache Spark و Jupyter در Cloud Dataproc

۱. مرور کلی

این آزمایشگاه نحوه راه‌اندازی و استفاده از نوت‌بوک‌های آپاچی اسپارک و ژوپیتر در Cloud Dataproc را پوشش خواهد داد.

نوت‌بوک‌های ژوپیتر به طور گسترده برای تجزیه و تحلیل داده‌های اکتشافی و ساخت مدل‌های یادگیری ماشین مورد استفاده قرار می‌گیرند، زیرا به شما امکان می‌دهند کد خود را به صورت تعاملی اجرا کنید و بلافاصله نتایج خود را مشاهده کنید.

با این حال، راه‌اندازی و استفاده از Apache Spark و Jupyter Notebooks می‌تواند پیچیده باشد.

b9ed855863c57d6.png

Cloud Dataproc این کار را سریع و آسان می‌کند و به شما امکان می‌دهد در حدود ۹۰ ثانیه یک کلاستر Dataproc با Apache Spark، Jupyter component و Component Gateway ایجاد کنید.

آنچه یاد خواهید گرفت

در این آزمایشگاه کد، شما یاد خواهید گرفت که چگونه:

  • یک سطل ذخیره‌سازی ابری گوگل برای کلاستر خود ایجاد کنید
  • ایجاد یک کلاستر Dataproc با Jupyter و Component Gateway
  • دسترسی به رابط کاربری وب JupyterLab در Dataproc
  • با استفاده از رابط ذخیره‌سازی Spark BigQuery، یک نوت‌بوک ایجاد کنید
  • اجرای یک کار Spark و ترسیم نتایج.

هزینه کل اجرای این آزمایشگاه در Google Cloud حدود ۱ دلار است. جزئیات کامل در مورد قیمت‌گذاری Cloud Dataproc را می‌توانید اینجا بیابید.

۲. ایجاد یک پروژه

وارد کنسول پلتفرم ابری گوگل در console.cloud.google.com شوید و یک پروژه جدید ایجاد کنید:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

در مرحله بعد، برای استفاده از منابع گوگل کلود، باید صورتحساب را در کنسول کلود فعال کنید .

اجرای این آزمایشگاه کد نباید بیش از چند دلار برای شما هزینه داشته باشد، اما اگر تصمیم به استفاده از منابع بیشتر بگیرید یا اگر آنها را در حال اجرا رها کنید، می‌تواند بیشتر هم بشود. بخش آخر این آزمایشگاه کد، شما را در تمیز کردن پروژه‌تان راهنمایی خواهد کرد.

کاربران جدید پلتفرم ابری گوگل واجد شرایط دریافت یک دوره آزمایشی رایگان ۳۰۰ دلاری هستند.

۳. آماده‌سازی محیط

ابتدا، با کلیک روی دکمه‌ای که در گوشه سمت راست بالای کنسول ابری قرار دارد، Cloud Shell را باز کنید:

a10c47ee6ca41c54.png

پس از بارگذاری Cloud Shell، دستور زیر را برای تنظیم شناسه پروژه از مرحله قبل اجرا کنید**:**

gcloud config set project <project_id>

شناسه پروژه را می‌توان با کلیک روی پروژه خود در سمت چپ بالای کنسول ابری نیز پیدا کرد:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

در مرحله بعد، APIهای Dataproc، Compute Engine و BigQuery Storage را فعال کنید.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

روش دیگر این است که این کار را می‌توان در کنسول ابری انجام داد. روی نماد منو در سمت چپ بالای صفحه کلیک کنید.

2bfc27ef9ba2ec7d.png

از منوی کشویی، گزینه API Manager را انتخاب کنید.

408af5f32c4b7c25.png

روی فعال کردن APIها و خدمات کلیک کنید.

a9c0e84296a7ba5b.png

API های زیر را جستجو و فعال کنید:

  • رابط برنامه‌نویسی کاربردی موتور محاسباتی
  • رابط برنامه‌نویسی کاربردی Dataproc
  • رابط برنامه‌نویسی کاربردی بیگ‌کوئری
  • رابط برنامه‌نویسی کاربردی ذخیره‌سازی بیگ‌کوئری

۴. یک سطل GCS ایجاد کنید

یک سطل ذخیره‌سازی ابری گوگل در نزدیک‌ترین منطقه به داده‌های خود ایجاد کنید و یک نام منحصر به فرد به آن بدهید.

این برای کلاستر Dataproc استفاده خواهد شد.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

باید خروجی زیر را مشاهده کنید

Creating gs://<your-bucket-name>/...

۵. خوشه Dataproc خود را با Jupyter و Component Gateway ایجاد کنید

ایجاد خوشه شما

متغیرهای env را برای خوشه خود تنظیم کنید

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

سپس این دستور gcloud را اجرا کنید تا خوشه خود را با تمام اجزای لازم برای کار با Jupyter روی خوشه خود ایجاد کنید.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

هنگام ایجاد خوشه، باید خروجی زیر را مشاهده کنید.

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

ایجاد خوشه شما حدود ۹۰ ثانیه طول می‌کشد و پس از آماده شدن، می‌توانید از طریق رابط کاربری کنسول Dataproc Cloud به خوشه خود دسترسی داشته باشید.

در حالی که منتظر هستید، می‌توانید به خواندن ادامه دهید تا درباره پرچم‌های مورد استفاده در دستور gcloud بیشتر بدانید.

پس از ایجاد خوشه، باید خروجی زیر را مشاهده کنید:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

پرچم‌های مورد استفاده در دستور gcloud dataproc create

در اینجا خلاصه‌ای از پرچم‌های مورد استفاده در دستور gcloud dataproc create آورده شده است.

--region=${REGION}

منطقه و ناحیه‌ای را که خوشه در آن ایجاد خواهد شد، مشخص می‌کند. می‌توانید لیست مناطق موجود را اینجا مشاهده کنید.

--image-version=1.4

نسخه ایمیج مورد استفاده در کلاستر شما. می‌توانید لیست نسخه‌های موجود را اینجا مشاهده کنید.

--bucket=${BUCKET_NAME}

سطل ذخیره‌سازی ابری گوگل (Google Cloud Storage Bucket) که قبلاً ایجاد کرده‌اید را برای استفاده در کلاستر مشخص کنید. اگر سطل GCS را ارائه ندهید، این سطل برای شما ایجاد خواهد شد.

اینجا همچنین جایی است که نوت‌بوک‌های شما ذخیره می‌شوند، حتی اگر کلاستر خود را حذف کنید، زیرا باکت GCS حذف نمی‌شود.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

انواع ماشین‌هایی که برای کلاستر Dataproc شما استفاده می‌شوند. می‌توانید لیستی از انواع ماشین‌های موجود را اینجا مشاهده کنید.

به طور پیش‌فرض، اگر پرچم –num-workers را تنظیم نکنید، ۱ گره اصلی و ۲ گره کارگر ایجاد می‌شوند.

--optional-components=ANACONDA,JUPYTER

تنظیم این مقادیر برای اجزای اختیاری، تمام کتابخانه‌های لازم برای Jupyter و Anaconda (که برای Jupyter notebooks مورد نیاز است) را روی کلاستر شما نصب خواهد کرد.

--enable-component-gateway

فعال کردن Component Gateway با استفاده از Apache Knox و Inverting Proxy یک لینک App Engine ایجاد می‌کند که دسترسی آسان، ایمن و احراز هویت شده به رابط‌های وب Jupyter و JupyterLab را فراهم می‌کند، به این معنی که دیگر نیازی به ایجاد تونل‌های SSH ندارید.

همچنین پیوندهایی برای سایر ابزارها در کلاستر از جمله Yarn Resource manager و Spark History Server ایجاد می‌کند که برای مشاهده عملکرد کارها و الگوهای استفاده از کلاستر مفید هستند.

۶. یک دفترچه یادداشت آپاچی اسپارک ایجاد کنید

دسترسی به رابط وب JupyterLab

پس از آماده شدن کلاستر، می‌توانید با رفتن به کنسول Dataproc Clusters - Cloud ، کلیک روی کلاستری که ایجاد کرده‌اید و رفتن به تب Web Interfaces، لینک Component Gateway به رابط وب JupyterLab را پیدا کنید.

afc40202d555de47.png

متوجه خواهید شد که به Jupyter که رابط کاربری کلاسیک نوت‌بوک است یا JupyterLab که به عنوان رابط کاربری نسل بعدی برای پروژه Jupyter توصیف شده است، دسترسی دارید.

ویژگی‌های رابط کاربری جدید و عالی زیادی در JupyterLab وجود دارد، بنابراین اگر در استفاده از نوت‌بوک‌ها تازه‌کار هستید یا به دنبال جدیدترین پیشرفت‌ها هستید، توصیه می‌شود از JupyterLab استفاده کنید زیرا طبق اسناد رسمی، در نهایت جایگزین رابط کاربری کلاسیک Jupyter خواهد شد.

ایجاد یک دفترچه یادداشت با هسته پایتون ۳

a463623f2ebf0518.png

از تب لانچر، روی آیکون دفترچه یادداشت پایتون ۳ کلیک کنید تا یک دفترچه یادداشت با هسته پایتون ۳ (نه هسته PySpark) ایجاد شود که به شما امکان می‌دهد SparkSession را در دفترچه یادداشت پیکربندی کنید و spark-bigquery-connector مورد نیاز برای استفاده از BigQuery Storage API را نیز در آن بگنجانید.

تغییر نام دفترچه یادداشت

۱۹۶a۳۲۷۶ed۰۷e۱f۳.png

روی نام نوت‌بوک در نوار کناری سمت چپ یا بالای صفحه کلیک راست کنید و نام نوت‌بوک را به "BigQuery Storage & Spark DataFrames.ipynb" تغییر دهید.

کد اسپارک خود را در نوت‌بوک اجرا کنید

fbac38062e5bb9cf.png

در این دفترچه یادداشت، شما از spark-bigquery-connector استفاده خواهید کرد که ابزاری برای خواندن و نوشتن داده‌ها بین BigQuery و Spark با استفاده از BigQuery Storage API است.

رابط برنامه‌نویسی کاربردی ذخیره‌سازی BigQuery با استفاده از یک پروتکل مبتنی بر RPC، پیشرفت‌های قابل توجهی در دسترسی به داده‌ها در BigQuery ایجاد می‌کند. این رابط از خواندن و نوشتن داده‌ها به صورت موازی و همچنین فرمت‌های سریال‌سازی مختلف مانند Apache Avro و Apache Arrow پشتیبانی می‌کند. در سطح بالا، این به معنای بهبود قابل توجه عملکرد، به ویژه در مجموعه داده‌های بزرگتر است.

در سلول اول، نسخه اسکالای کلاستر خود را بررسی کنید تا بتوانید نسخه صحیح فایل jar مربوط به spark-bigquery-connector را وارد کنید.

ورودی [1]:

!scala -version

خروجی [1]: f580e442576b8b1f.png یک جلسه Spark ایجاد کنید و بسته spark-bigquery-connector را در آن قرار دهید.

اگر نسخه اسکالا شما ۲.۱۱ است، از بسته زیر استفاده کنید.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

اگر نسخه اسکالا شما ۲.۱۲ است، از بسته زیر استفاده کنید.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

ورودی [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

فعال کردن repl.eagerEval

این کار نتایج DataFrames را در هر مرحله بدون نیاز جدید به نمایش df.show() نمایش می‌دهد و همچنین قالب‌بندی خروجی را بهبود می‌بخشد.

ورودی [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

خواندن جدول BigQuery در Spark DataFrame

با خواندن داده‌ها از یک مجموعه داده عمومی BigQuery، یک Spark DataFrame ایجاد کنید. این کار از spark-bigquery-connector و BigQuery Storage API برای بارگذاری داده‌ها در کلاستر Spark استفاده می‌کند.

یک قاب داده Spark ایجاد کنید و داده‌ها را از مجموعه داده عمومی BigQuery برای بازدید از صفحات ویکی‌پدیا بارگذاری کنید. متوجه خواهید شد که در حال اجرای پرس‌وجو روی داده‌ها نیستید، زیرا از spark-bigquery-connector برای بارگذاری داده‌ها در Spark استفاده می‌کنید، جایی که پردازش داده‌ها انجام خواهد شد. وقتی این کد اجرا می‌شود، در واقع جدول بارگذاری نمی‌شود زیرا یک ارزیابی تنبل در Spark است و اجرا در مرحله بعدی رخ می‌دهد.

ورودی [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

خروجی [4]:

c107a33f6fc30ca.png

ستون‌های مورد نیاز را انتخاب کنید و با استفاده از where() که نام مستعار filter() است، یک فیلتر اعمال کنید.

وقتی این کد اجرا می‌شود، یک عمل Spark را فعال می‌کند و داده‌ها در این مرحله از BigQuery Storage خوانده می‌شوند.

ورودی [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

خروجی [5]:

ad363cbe510d625a.png

برای دیدن صفحات برتر، بر اساس عنوان گروه‌بندی کنید و بر اساس تعداد بازدید صفحات مرتب کنید

ورودی [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

خروجی [6]: f718abd05afc0f4.png

۷. از کتابخانه‌های رسم نمودار پایتون در دفترچه یادداشت استفاده کنید

شما می‌توانید از کتابخانه‌های مختلف رسم نمودار که در پایتون موجود هستند، برای رسم نمودار خروجی کارهای Spark خود استفاده کنید.

تبدیل دیتافریم اسپارک به دیتافریم پانداس

قاب داده Spark را به قاب داده Pandas تبدیل کنید و datehour را به عنوان اندیس تنظیم کنید. این کار در صورتی مفید است که بخواهید مستقیماً در پایتون با داده‌ها کار کنید و داده‌ها را با استفاده از کتابخانه‌های رسم نمودار پایتون که در دسترس هستند، رسم کنید.

ورودی [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

خروجی [7]:

3df2aaa2351f028d.png

ترسیم دیتافریم پانداس

کتابخانه matplotlib را که برای نمایش نمودارها در دفترچه یادداشت لازم است، وارد کنید.

ورودی [8]:

import matplotlib.pyplot as plt

از تابع رسم نمودار پانداس برای ایجاد نمودار خطی از قاب داده پانداس استفاده کنید.

ورودی [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

خروجی [9]: bade7042c3033594.png

بررسی کنید که دفترچه یادداشت در GCS ذخیره شده باشد

اکنون باید اولین نوت‌بوک ژوپیتر خود را روی کلاستر Dataproc خود راه‌اندازی و اجرا کرده باشید. برای نوت‌بوک خود یک نام انتخاب کنید تا به طور خودکار در باکت GCS که هنگام ایجاد کلاستر استفاده شده است، ذخیره شود.

می‌توانید این را با استفاده از دستور gsutil در پوسته ابری بررسی کنید.

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

باید خروجی زیر را مشاهده کنید

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

۸. نکته بهینه‌سازی - داده‌ها را در حافظه پنهان کنید

ممکن است سناریوهایی وجود داشته باشد که در آنها بخواهید داده‌ها را به جای خواندن از BigQuery Storage هر بار در حافظه داشته باشید.

این کار داده‌ها را از BigQuery می‌خواند و فیلتر را به BigQuery ارسال می‌کند. سپس تجمیع در Apache Spark محاسبه می‌شود.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

می‌توانید کار بالا را طوری تغییر دهید که شامل یک حافظه پنهان از جدول باشد و اکنون فیلتر روی ستون ویکی توسط آپاچی اسپارک در حافظه اعمال می‌شود.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

سپس می‌توانید به جای خواندن مجدد داده‌ها از حافظه BigQuery، با استفاده از داده‌های ذخیره شده، زبان ویکی دیگری را فیلتر کنید و بنابراین بسیار سریع‌تر اجرا خواهد شد.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

می‌توانید با اجرای دستور زیر، حافظه پنهان (cache) را حذف کنید.

df_wiki_all.unpersist()

۹. دفترچه‌های نمونه برای موارد استفاده بیشتر

مخزن گیت‌هاب Cloud Dataproc شامل نوت‌بوک‌های Jupyter با الگوهای رایج آپاچی اسپارک برای بارگذاری داده‌ها، ذخیره داده‌ها و ترسیم داده‌های شما با محصولات مختلف پلتفرم ابری گوگل و ابزارهای متن‌باز است:

۱۰. تمیز کردن

برای جلوگیری از تحمیل هزینه‌های غیرضروری به حساب GCP خود پس از تکمیل این راهنمای سریع:

  1. سطل ذخیره‌سازی ابری مربوط به محیط و آنچه ایجاد کرده‌اید را حذف کنید.
  2. محیط Dataproc را حذف کنید .

اگر فقط برای این codelab پروژه‌ای ایجاد کرده‌اید، می‌توانید به صورت اختیاری پروژه را حذف کنید:

  1. در کنسول GCP، به صفحه پروژه‌ها بروید.
  2. در لیست پروژه‌ها، پروژه‌ای را که می‌خواهید حذف کنید انتخاب کرده و روی حذف کلیک کنید.
  3. در کادر، شناسه پروژه را تایپ کنید و سپس برای حذف پروژه، روی خاموش کردن کلیک کنید.

مجوز

این اثر تحت مجوز عمومی Creative Commons Attribution 3.0 و مجوز Apache 2.0 منتشر شده است.