۱. بررسی اجمالی - گوگل دیتاپروک
Dataproc یک سرویس کاملاً مدیریتشده و بسیار مقیاسپذیر برای اجرای Apache Spark، Apache Flink، Presto و بسیاری از ابزارها و چارچوبهای متنباز دیگر است. از Dataproc برای مدرنسازی دریاچه داده، ETL/ELT و علم داده امن در مقیاس جهانی استفاده کنید. Dataproc همچنین به طور کامل با چندین سرویس Google Cloud از جمله BigQuery ، Cloud Storage ، Vertex AI و Dataplex یکپارچه شده است.
Dataproc در سه نسخه موجود است:
- Dataproc Serverless به شما امکان میدهد کارهای PySpark را بدون نیاز به پیکربندی زیرساخت و مقیاسبندی خودکار اجرا کنید. Dataproc Serverless از بارهای کاری دستهای PySpark و جلسات/نوتبوکها پشتیبانی میکند.
- Dataproc در Google Compute Engine به شما امکان میدهد علاوه بر ابزارهای متنباز مانند Flink و Presto، یک خوشه Hadoop YARN را برای بارهای کاری Spark مبتنی بر YARN مدیریت کنید. میتوانید خوشههای مبتنی بر ابر خود را با هر میزان مقیاسبندی عمودی یا افقی که میخواهید، از جمله مقیاسبندی خودکار، تنظیم کنید.
- Dataproc در موتور Google Kubernetes به شما امکان میدهد خوشههای مجازی Dataproc را در زیرساخت GKE خود برای ارسال کارهای Spark، PySpark، SparkR یا Spark SQL پیکربندی کنید.
در این آزمایشگاه کد، روشهای مختلفی را یاد خواهید گرفت که میتوانید از Dataproc Serverless استفاده کنید.
آپاچی اسپارک در ابتدا برای اجرا روی خوشههای هادوپ ساخته شده بود و از YARN به عنوان مدیر منابع خود استفاده میکرد. نگهداری از خوشههای هادوپ نیاز به مجموعهای خاص از تخصصها و اطمینان از پیکربندی صحیح بسیاری از دکمههای مختلف روی خوشهها دارد. این علاوه بر مجموعهای جداگانه از دکمهها است که اسپارک نیز از کاربر میخواهد آنها را تنظیم کند. این امر منجر به سناریوهای بسیاری میشود که در آن توسعهدهندگان به جای کار بر روی کد خود اسپارک، زمان بیشتری را صرف پیکربندی زیرساخت خود میکنند.
Dataproc Serverless نیاز به پیکربندی دستی خوشههای Hadoop یا Spark را از بین میبرد. Dataproc Serverless روی Hadoop اجرا نمیشود و از تخصیص منابع پویای خود برای تعیین نیازهای منابع، از جمله مقیاسبندی خودکار، استفاده میکند. زیرمجموعه کوچکی از ویژگیهای Spark هنوز با Dataproc Serverless قابل تنظیم هستند، اما در بیشتر موارد نیازی به تغییر آنها نخواهید داشت.
۲. راهاندازی
شما با پیکربندی محیط و منابع مورد استفاده در این آزمایشگاه کد شروع خواهید کرد.
یک پروژه Google Cloud ایجاد کنید . میتوانید از یک پروژه موجود استفاده کنید.
با کلیک روی Cloud Shell در نوار ابزار Cloud Console ، آن را باز کنید.

Cloud Shell یک محیط Shell آماده برای استفاده فراهم میکند که میتوانید برای این آزمایشگاه کد از آن استفاده کنید.

Cloud Shell به طور پیشفرض نام پروژه شما را تعیین میکند. با اجرای echo $GOOGLE_CLOUD_PROJECT دوباره بررسی کنید. اگر شناسه پروژه خود را در خروجی مشاهده نمیکنید، آن را تنظیم کنید.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
یک منطقه موتور محاسباتی برای منابع خود تنظیم کنید، مانند us-central1 یا europe-west2 .
export REGION=<your-region>
فعال کردن APIها
Codelab از API های زیر استفاده می کند:
- بیگکوئری
- دیتاپروک
API های لازم را فعال کنید. این کار حدود یک دقیقه طول خواهد کشید و پس از اتمام، پیام موفقیت آمیز بودن آن نمایش داده خواهد شد.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
پیکربندی دسترسی به شبکه
Dataproc Serverless نیاز دارد که دسترسی خصوصی گوگل (Google Private Access) در منطقهای که قرار است کارهای اسپارک خود را اجرا کنید، فعال باشد، زیرا درایورها و اجراکنندههای اسپارک فقط IPهای خصوصی دارند. برای فعال کردن آن در زیرشبکه default دستور زیر را اجرا کنید.
gcloud compute networks subnets update default \
--region=${REGION} \
--enable-private-ip-google-access
میتوانید از طریق دستور زیر که خروجی آن True یا False خواهد بود، تأیید کنید که دسترسی خصوصی گوگل فعال شده است.
gcloud compute networks subnets describe default \
--region=${REGION} \
--format="get(privateIpGoogleAccess)"
یک سطل ذخیرهسازی ایجاد کنید
یک مخزن ذخیرهسازی ایجاد کنید که برای ذخیره فایلهای ایجاد شده در این آزمایشگاه کد استفاده خواهد شد.
یک نام برای سطل خود انتخاب کنید. نام سطلها باید در سطح جهانی برای همه کاربران منحصر به فرد باشد.
export BUCKET=<your-bucket-name>
سطل را در منطقهای که قصد اجرای کارهای اسپارک خود را دارید، ایجاد کنید.
gsutil mb -l ${REGION} gs://${BUCKET}
میتوانید ببینید که سطل شما در کنسول Cloud Storage موجود است. همچنین میتوانید gsutil ls برای دیدن سطل خود اجرا کنید.
ایجاد یک سرور تاریخچه دائمی
رابط کاربری Spark مجموعهای غنی از ابزارهای اشکالزدایی و بینشهایی در مورد کارهای Spark ارائه میدهد. برای مشاهده رابط کاربری Spark برای کارهای تکمیلشده Dataproc Serverless، باید یک خوشه Dataproc تک گرهای ایجاد کنید تا به عنوان یک سرور تاریخچه مداوم استفاده شود.
برای سرور تاریخچهی دائمی خود نامی تعیین کنید.
PHS_CLUSTER_NAME=my-phs
موارد زیر را اجرا کنید.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
--region=${REGION} \
--single-node \
--enable-component-gateway \
--properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
رابط کاربری Spark و سرور تاریخچهی پایدار بعداً در آزمایشگاه کد با جزئیات بیشتری بررسی خواهند شد.
۳. اجرای کارهای اسپارک بدون سرور با دستههای Dataproc
در این نمونه، شما با مجموعهای از دادهها از مجموعه داده عمومی سفرهای دوچرخهسواری سیتی (Citi Bike Trips) شهر نیویورک (NYC) کار خواهید کرد. NYC Citi Bikes یک سیستم اشتراک دوچرخه پولی در نیویورک است. شما چند تبدیل ساده انجام خواهید داد و ده شناسه محبوبترین ایستگاههای دوچرخهسواری سیتی را چاپ خواهید کرد. این نمونه همچنین به طور قابل توجهی از spark-bigquery-connector متنباز برای خواندن و نوشتن یکپارچه دادهها بین Spark و BigQuery استفاده میکند.
مخزن گیتهاب زیر را کلون کنید و cd به دایرکتوری حاوی فایل citibike.py بروید.
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
کار را با استفاده از Cloud SDK که به طور پیشفرض در Cloud Shell موجود است، به Serverless Spark ارسال کنید. دستور زیر را در shell خود اجرا کنید که از Cloud SDK و Dataproc Batches API برای ارسال کارهای Serverless Spark استفاده میکند.
gcloud dataproc batches submit pyspark citibike.py \
--batch=citibike-job \
--region=${REGION} \
--deps-bucket=gs://${BUCKET} \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
-- ${BUCKET}
برای تجزیه این موضوع:
-
gcloud dataproc batches submitبه API دستههای دادهای Dataproc ارجاع میدهند. -
pysparkنشان میدهد که شما در حال ارسال یک کار PySpark هستید. -
--batchنام کار است. در صورت عدم ارائه، از یک UUID تصادفی تولید شده استفاده خواهد شد. -
--region=${REGION}منطقه جغرافیایی است که کار در آن پردازش خواهد شد. -
--deps-bucket=${BUCKET}جایی است که فایل پایتون محلی شما قبل از اجرا در محیط Serverless در آن آپلود میشود. -
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jarشامل فایل jar مربوط به spark-bigquery-connector در محیط زمان اجرای Spark است. -
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}نام کامل سرور تاریخچه دائمی است. این جایی است که دادههای رویداد Spark (جدا از خروجی کنسول) ذخیره شده و از رابط کاربری Spark قابل مشاهده هستند. - علامت
--در انتهای آن نشان میدهد که هر چیزی فراتر از این، آرگومانهای زمان اجرا برای برنامه خواهد بود. در این حالت، شما نام سطل خود را، همانطور که توسط کار مورد نیاز است، ارسال میکنید.
هنگام ارسال دسته، خروجی زیر را مشاهده خواهید کرد.
Batch [citibike-job] submitted.
بعد از چند دقیقه، خروجی زیر را به همراه متادیتای کار مشاهده خواهید کرد.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
در بخش بعدی، نحوهی یافتن لاگهای مربوط به این کار را خواهید آموخت.
ویژگیهای اضافی
با Spark Serverless، گزینههای بیشتری برای اجرای کارهایتان دارید.
- شما میتوانید یک تصویر داکر سفارشی ایجاد کنید که job شما روی آن اجرا شود. این یک روش عالی برای اضافه کردن وابستگیهای اضافی، از جمله کتابخانههای پایتون و R است.
- شما میتوانید یک نمونه Dataproc Metastore را به job خود متصل کنید تا به متادیتای Hive دسترسی داشته باشید.
- برای کنترل بیشتر، Dataproc Serverless از پیکربندی مجموعه کوچکی از ویژگیهای Spark پشتیبانی میکند.
۴. معیارهای Dataproc و قابلیت مشاهده
کنسول Dataproc Batches تمام کارهای Dataproc Serverless شما را فهرست میکند. در کنسول، شناسه دستهای، مکان، وضعیت ، زمان ایجاد، زمان سپری شده و نوع هر کار را مشاهده خواهید کرد. برای مشاهده اطلاعات بیشتر در مورد آن، روی شناسه دستهای کار خود کلیک کنید.
در این صفحه، اطلاعاتی مانند «نظارت» را مشاهده خواهید کرد که نشان میدهد کار شما در طول زمان از چند Batch Spark Executor استفاده کرده است (که نشاندهنده میزان مقیاسبندی خودکار آن است).
در برگه جزئیات، ابردادههای بیشتری درباره کار، از جمله آرگومانها و پارامترهایی که همراه کار ارسال شدهاند، مشاهده خواهید کرد.
همچنین میتوانید از این صفحه به تمام گزارشها دسترسی داشته باشید. وقتی کارهای Dataproc Serverless اجرا میشوند، سه مجموعه گزارش مختلف ایجاد میشود:
- سطح خدمات
- خروجی کنسول
- ثبت وقایع اسپارک
سطح سرویس ، شامل گزارشهایی است که سرویس Dataproc Serverless ایجاد کرده است. این گزارشها شامل مواردی مانند درخواست Dataproc Serverless برای CPUهای اضافی جهت مقیاسبندی خودکار است. میتوانید این گزارشها را با کلیک روی «مشاهده گزارشها» که Cloud Logging را باز میکند، مشاهده کنید.
خروجی کنسول را میتوان در بخش خروجی مشاهده کرد. این خروجی تولید شده توسط کار است، از جمله ابردادههایی که اسپارک هنگام شروع یک کار چاپ میکند یا هر دستور چاپی که در کار گنجانده شده است.
ثبت وقایع Spark از رابط کاربری Spark قابل دسترسی است. از آنجا که شما یک سرور تاریخچه دائمی برای کار Spark خود فراهم کردهاید، میتوانید با کلیک روی «مشاهده سرور تاریخچه Spark» که شامل اطلاعاتی برای کارهای Spark اجرا شده قبلی شماست، به رابط کاربری Spark دسترسی پیدا کنید. میتوانید اطلاعات بیشتر در مورد رابط کاربری Spark را از مستندات رسمی Spark کسب کنید.
۵. قالبهای Dataproc: BQ -> GCS
قالبهای Dataproc ابزارهای متنبازی هستند که به سادهسازی بیشتر وظایف پردازش دادههای درون ابری کمک میکنند. این قالبها به عنوان پوششی برای Dataproc Serverless عمل میکنند و شامل قالبهایی برای بسیاری از وظایف واردات و صادرات دادهها، از جمله موارد زیر، هستند:
-
BigQuerytoGCSوGCStoBigQuery -
GCStoBigTable -
GCStoJDBCوJDBCtoGCS -
HivetoBigQuery -
MongotoGCSوGCStoMongo
لیست کامل در README موجود است.
در این بخش، از Dataproc Templates برای خروجی گرفتن دادهها از BigQuery به GCS استفاده خواهید کرد.
مخزن را کلون کنید
مخزن را کلون کنید و آن را به پوشه python تغییر دهید.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
پیکربندی محیط
اکنون متغیرهای محیطی را تنظیم خواهید کرد. قالبهای Dataproc از متغیر محیطی GCP_PROJECT برای شناسه پروژه شما استفاده میکنند، بنابراین این را برابر با GOOGLE_CLOUD_PROJECT.
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
منطقه شما باید از قبل در محیط تنظیم شده باشد. اگر اینطور نیست، آن را اینجا تنظیم کنید.
export REGION=<region>
قالبهای Dataproc از spark-bigquery-conector برای پردازش کارهای BigQuery استفاده میکنند و نیاز دارند که URI در یک متغیر محیطی JARS گنجانده شود. متغیر JARS را تنظیم کنید.
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
پیکربندی پارامترهای قالب
نام یک مخزن ذخیرهسازی (staging bucket) را برای استفاده سرویس تنظیم کنید.
export GCS_STAGING_LOCATION=gs://${BUCKET}
در مرحله بعد، برخی متغیرهای خاص شغل را تنظیم خواهید کرد. برای جدول ورودی، دوباره به مجموعه داده BigQuery NYC Citibike ارجاع خواهید داد.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
شما میتوانید csv ، parquet ، avro یا json را انتخاب کنید. برای این codelab، CSV را انتخاب کنید - در بخش بعدی نحوه استفاده از Dataproc Templates برای تبدیل انواع فایلها توضیح داده شده است.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
حالت خروجی را روی overwrite تنظیم کنید. میتوانید بین overwrite ، append ، ignore یا errorifexists.
BIGQUERY_GCS_OUTPUT_MODE=overwrite
محل خروجی GCS را به عنوان مسیری در سطل خود تنظیم کنید.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
قالب را اجرا کنید
الگوی BIGQUERYTOGCS را با مشخص کردن آن در زیر و ارائه پارامترهای ورودی که تنظیم کردهاید، اجرا کنید.
./bin/start.sh \
-- --template=BIGQUERYTOGCS \
--bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
--bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
--bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
--bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
خروجی نسبتاً نویزدار خواهد بود، اما پس از حدود یک دقیقه تصویر زیر را خواهید دید.
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
با اجرای دستور زیر میتوانید تأیید کنید که فایلها ایجاد شدهاند.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
اسپارک به طور پیشفرض، بسته به مقدار دادهها، در چندین فایل مینویسد. در این حالت، تقریباً 30 فایل تولید شده را مشاهده خواهید کرد. نام فایلهای خروجی اسپارک با part - و به دنبال آن یک عدد پنج رقمی (که شماره قطعه را نشان میدهد) و یک رشته هش قالببندی میشوند. برای مقادیر زیاد دادهها، اسپارک معمولاً در چندین فایل مینویسد. به عنوان مثال، نام فایل part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv است.
۶. قالبهای Dataproc: تبدیل CSV به پارکت
اکنون از قالبهای Dataproc برای تبدیل دادهها در GCS از یک نوع فایل به نوع دیگر با استفاده از GCSTOGCS استفاده خواهید کرد. این قالب از SparkSQL استفاده میکند و امکانی را نیز فراهم میکند تا یک کوئری SparkSQL نیز برای پردازش بیشتر در طول تبدیل ارسال شود.
تأیید متغیرهای محیطی
تأیید کنید که GCP_PROJECT ، REGION و GCS_STAGING_BUCKET از بخش قبل تنظیم شدهاند.
echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}
تنظیم پارامترهای الگو
اکنون پارامترهای پیکربندی GCStoGCS را تنظیم خواهید کرد. با محل فایلهای ورودی شروع کنید. توجه داشته باشید که این یک دایرکتوری است و نه یک فایل خاص، زیرا همه فایلهای موجود در دایرکتوری پردازش خواهند شد. این را روی BIGQUERY_GCS_OUTPUT_LOCATION تنظیم کنید.
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
فرمت فایل ورودی را تنظیم کنید.
GCS_TO_GCS_INPUT_FORMAT=csv
قالب خروجی مورد نظر را تنظیم کنید. میتوانید parquet، json، avro یا csv را انتخاب کنید.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
حالت خروجی را روی overwrite تنظیم کنید. میتوانید بین overwrite ، append ، ignore یا errorifexists.
GCS_TO_GCS_OUTPUT_MODE=overwrite
محل خروجی را تنظیم کنید.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
قالب را اجرا کنید
الگوی GCStoGCS را اجرا کنید.
./bin/start.sh \
-- --template=GCSTOGCS \
--gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
--gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
--gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
--gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
--gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
خروجی نسبتاً نویز خواهد داشت، اما پس از حدود یک دقیقه باید پیام موفقیتآمیزی مانند زیر را مشاهده کنید.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
با اجرای دستور زیر میتوانید تأیید کنید که فایلها ایجاد شدهاند.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
با استفاده از این الگو، شما همچنین میتوانید با ارسال gcs.to.gcs.temp.view.name و gcs.to.gcs.sql.query به الگو، کوئریهای SparkSQL را ارائه دهید و قبل از نوشتن در GCS، یک کوئری SparkSQL را روی دادهها اجرا کنید.
۷. منابع را پاکسازی کنید
برای جلوگیری از تحمیل هزینههای غیرضروری به حساب GCP خود پس از تکمیل این آزمایشگاه کد:
gsutil rm -r gs://${BUCKET}
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
--region=${REGION}
- کارهای Dataproc Serverless را حذف کنید. به کنسول دستهها بروید، روی کادر کنار هر کاری که میخواهید حذف کنید کلیک کنید و روی DELETE کلیک کنید.
اگر فقط برای این codelab پروژهای ایجاد کردهاید، میتوانید به صورت اختیاری پروژه را حذف کنید:
- در کنسول GCP، به صفحه پروژهها بروید.
- در لیست پروژهها، پروژهای را که میخواهید حذف کنید انتخاب کرده و روی حذف کلیک کنید.
- در کادر، شناسه پروژه را تایپ کنید و سپس برای حذف پروژه، روی خاموش کردن کلیک کنید.
۸. قدم بعدی چیست؟
منابع زیر روشهای بیشتری را برای استفاده از Serverless Spark ارائه میدهند:
- بیاموزید که چگونه گردشهای کاری Dataproc Serverless را با استفاده از Cloud Composer هماهنگ کنید .
- یاد بگیرید که چگونه Dataproc Serverless را با خطوط لوله Kubeflow ادغام کنید.