1. نمای کلی - Google Dataproc
Dataproc یک سرویس کاملاً مدیریت شده و بسیار مقیاس پذیر برای اجرای Apache Spark، Apache Flink، Presto و بسیاری دیگر از ابزارها و فریم ورک های منبع باز است. از Dataproc برای نوسازی دریاچه داده، ETL / ELT و علم داده ایمن در مقیاس سیاره استفاده کنید. Dataproc همچنین به طور کامل با چندین سرویس Google Cloud از جمله BigQuery ، Cloud Storage ، Vertex AI و Dataplex یکپارچه شده است.
Dataproc در سه طعم موجود است:
- Dataproc Serverless به شما اجازه می دهد تا کارهای PySpark را بدون نیاز به پیکربندی زیرساخت و مقیاس خودکار اجرا کنید. Dataproc Serverless از بارهای کاری دسته ای PySpark و جلسات / نوت بوک ها پشتیبانی می کند.
- Dataproc در موتور محاسباتی Google به شما امکان می دهد تا علاوه بر ابزارهای منبع باز مانند Flink و Presto، یک کلاستر Hadoop YARN را برای بارهای کاری Spark مبتنی بر YARN مدیریت کنید. میتوانید خوشههای مبتنی بر ابر خود را با مقیاسبندی عمودی یا افقی که میخواهید، از جمله مقیاسبندی خودکار، تنظیم کنید.
- Dataproc در Google Kubernetes Engine به شما اجازه می دهد تا خوشه های مجازی Dataproc را در زیرساخت GKE خود برای ارسال کارهای Spark، PySpark، SparkR یا Spark SQL پیکربندی کنید.
در این کد لبه، چندین روش مختلف را یاد خواهید گرفت که می توانید بدون سرور Dataproc مصرف کنید.
آپاچی اسپارک در ابتدا برای اجرا بر روی خوشه های Hadoop ساخته شد و از YARN به عنوان مدیر منابع خود استفاده کرد. نگهداری از خوشه های Hadoop مستلزم مجموعه ای خاص از تخصص و اطمینان از پیکربندی صحیح بسیاری از دستگیره های مختلف روی خوشه ها است. این علاوه بر مجموعه جداگانه ای از دستگیره ها است که Spark همچنین از کاربر می خواهد که آنها را تنظیم کند. این منجر به سناریوهای بسیاری می شود که در آن توسعه دهندگان به جای کار بر روی کد Spark، زمان بیشتری را صرف پیکربندی زیرساخت های خود می کنند.
Dataproc Serverless نیاز به پیکربندی دستی خوشه های Hadoop یا Spark را برطرف می کند. Dataproc Serverless روی Hadoop اجرا نمیشود و از تخصیص دینامیک منبع خود برای تعیین منابع مورد نیاز خود از جمله مقیاسبندی خودکار استفاده میکند. زیرمجموعه کوچکی از ویژگیهای Spark هنوز با Dataproc Serverless قابل تنظیم هستند، اما در بیشتر موارد نیازی به تغییر این ویژگیها ندارید.
2. راه اندازی کنید
شما با پیکربندی محیط و منابع مورد استفاده در این کد لبه شروع خواهید کرد.
یک پروژه Google Cloud ایجاد کنید . می توانید از یک موجود استفاده کنید.
Cloud Shell را با کلیک کردن روی آن در نوار ابزار Cloud Console باز کنید.
Cloud Shell یک محیط شل آماده برای استفاده را فراهم می کند که می توانید برای این کد لبه استفاده کنید.
Cloud Shell نام پروژه شما را به طور پیش فرض تنظیم می کند. با اجرای echo $GOOGLE_CLOUD_PROJECT
دوباره بررسی کنید. اگر ID پروژه خود را در خروجی نمی بینید، آن را تنظیم کنید.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
یک منطقه Compute Engine برای منابع خود، مانند us-central1
یا europe-west2
تنظیم کنید.
export REGION=<your-region>
API ها را فعال کنید
نرم افزار Codelab از API های زیر استفاده می کند:
- BigQuery
- Dataproc
API های لازم را فعال کنید. این کار حدود یک دقیقه طول می کشد و پس از تکمیل پیام موفقیت آمیز ظاهر می شود.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
پیکربندی دسترسی به شبکه
Dataproc Serverless نیاز دارد که Google Private Access در منطقه ای که کارهای Spark خود را اجرا می کنید فعال شود، زیرا درایورها و مجریان Spark فقط IP خصوصی دارند. برای فعال کردن آن در زیر شبکه default
، موارد زیر را اجرا کنید.
gcloud compute networks subnets update default \ --region=${REGION} \ --enable-private-ip-google-access
میتوانید تأیید کنید که Google Private Access از طریق موارد زیر فعال است که خروجی True
یا False
را نشان میدهد.
gcloud compute networks subnets describe default \ --region=${REGION} \ --format="get(privateIpGoogleAccess)"
یک سطل ذخیره سازی ایجاد کنید
یک سطل ذخیره سازی ایجاد کنید که برای ذخیره دارایی های ایجاد شده در این Codelab استفاده می شود.
یک نام برای سطل خود انتخاب کنید. نامهای سطل باید در سطح جهانی برای همه کاربران منحصربهفرد باشند.
export BUCKET=<your-bucket-name>
سطل را در منطقه ای که می خواهید کارهای Spark خود را اجرا کنید ایجاد کنید.
gsutil mb -l ${REGION} gs://${BUCKET}
می توانید ببینید که سطل شما در کنسول Cloud Storage موجود است. همچنین می توانید gsutil ls
برای دیدن سطل خود اجرا کنید.
یک سرور تاریخچه پایدار ایجاد کنید
Spark UI مجموعه ای غنی از ابزارهای اشکال زدایی و بینش در مورد مشاغل Spark را ارائه می دهد. برای مشاهده Spark UI برای کارهای تکمیل شده بدون سرور Dataproc، باید یک کلاستر Dataproc گره ایجاد کنید تا از آن به عنوان یک سرور تاریخچه پایدار استفاده کنید.
یک نام برای سرور تاریخچه دائمی خود تنظیم کنید.
PHS_CLUSTER_NAME=my-phs
زیر را اجرا کنید.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \ --region=${REGION} \ --single-node \ --enable-component-gateway \ --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
رابط کاربری Spark و سرور تاریخچه پایدار بعداً در لابلای کد با جزئیات بیشتر مورد بررسی قرار خواهند گرفت.
3. کارهای Spark بدون سرور را با دسته های Dataproc اجرا کنید
در این نمونه، شما با مجموعه ای از داده ها از مجموعه داده عمومی سیتی دوچرخه سفرهای نیویورک (NYC) کار خواهید کرد. NYC Citi Bikes یک سیستم اشتراک دوچرخه پولی در نیویورک است. شما تغییرات ساده ای را انجام خواهید داد و ده شناسه محبوب ایستگاه دوچرخه سیتی را چاپ خواهید کرد. این نمونه همچنین از رابط منبع باز spark-bigquery-connector برای خواندن و نوشتن یکپارچه داده ها بین Spark و BigQuery استفاده می کند.
مخزن و cd
دی Github زیر را در دایرکتوری حاوی فایل citibike.py
کلون کنید.
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
با استفاده از Cloud SDK که به طور پیشفرض در Cloud Shell موجود است، کار را به Spark بدون سرور ارسال کنید. دستور زیر را در پوسته خود اجرا کنید که از Cloud SDK و Dataproc Batches API برای ارسال کارهای Spark بدون سرور استفاده می کند.
gcloud dataproc batches submit pyspark citibike.py \ --batch=citibike-job \ --region=${REGION} \ --deps-bucket=gs://${BUCKET} \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \ --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \ -- ${BUCKET}
برای تجزیه این:
-
gcloud dataproc batches submit
ارجاعاتی را به Dataproc Batches API ارسال می کنند. -
pyspark
نشان می دهد که شما در حال ارسال یک کار PySpark هستید. -
--batch
نام کار است. اگر ارائه نشود، یک UUID تصادفی تولید شده استفاده خواهد شد. -
--region=${REGION}
منطقه جغرافیایی است که کار در آن پردازش خواهد شد. -
--deps-bucket=${BUCKET}
جایی است که فایل پایتون محلی شما قبل از اجرا در محیط بدون سرور در آن آپلود می شود. -
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar
شامل jar برای spark-bigquery-connector در محیط اجرای Spark است. -
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}
نام کاملاً واجد شرایط سرور تاریخچه پایدار است. این جایی است که داده های رویداد Spark (جدا از خروجی کنسول) ذخیره می شود و از Spark UI قابل مشاهده است. - دنباله
--
نشان می دهد که هر چیزی فراتر از این آرگ زمان اجرا برای برنامه خواهد بود. در این صورت، شما در حال ارسال نام سطل خود، مطابق با شغل هستید.
هنگام ارسال دسته، خروجی زیر را مشاهده خواهید کرد.
Batch [citibike-job] submitted.
بعد از چند دقیقه خروجی زیر را به همراه متادیتا از کار مشاهده خواهید کرد.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
در بخش بعدی نحوه یافتن لاگ های مربوط به این کار را خواهید آموخت.
ویژگی های اضافی
با Spark Serverless، گزینه های بیشتری برای اجرای مشاغل خود دارید.
- می توانید یک تصویر داکر سفارشی ایجاد کنید که کار شما روی آن اجرا می شود. این یک راه عالی برای اضافه کردن وابستگیهای اضافی، از جمله کتابخانههای پایتون و R است.
- می توانید یک نمونه Dataproc Metastore را به شغل خود متصل کنید تا به ابرداده Hive دسترسی پیدا کنید.
- برای کنترل بیشتر، Dataproc Serverless از پیکربندی مجموعه کوچکی از ویژگی های Spark پشتیبانی می کند.
4. Dataproc Metrics and Observability
کنسول دسته ای Dataproc تمام کارهای بدون سرور Dataproc شما را فهرست می کند. در کنسول، Batch ID، موقعیت مکانی، وضعیت ، زمان ایجاد، زمان سپری شده و نوع هر کار را خواهید دید. برای مشاهده اطلاعات بیشتر در مورد آن، روی Batch ID شغل خود کلیک کنید.
در این صفحه، اطلاعاتی مانند مانیتورینگ را مشاهده خواهید کرد که نشان می دهد کار شما در طول زمان از چند Batch Spark Executors استفاده کرده است (که نشان می دهد چقدر مقیاس خودکار شده است).
در برگه جزئیات ، ابردادههای بیشتری در مورد شغل از جمله هر آرگومان و پارامتری که همراه کار ارسال شده است را مشاهده خواهید کرد.
همچنین می توانید از این صفحه به همه گزارش ها دسترسی داشته باشید. هنگامی که کارهای بدون سرور Dataproc اجرا می شوند، سه مجموعه گزارش مختلف تولید می شوند:
- در سطح خدمات
- خروجی کنسول
- ثبت رویداد جرقه
سطح سرویس ، شامل گزارشهایی است که سرویس بدون سرور Dataproc ایجاد کرده است. این موارد شامل مواردی مانند درخواست سرور بدون سرور Dataproc برای CPUهای اضافی برای مقیاس خودکار است. میتوانید این موارد را با کلیک کردن روی View logs مشاهده کنید که Cloud Logging باز میشود.
خروجی کنسول در زیر خروجی قابل مشاهده است. این خروجی تولید شده توسط کار است، از جمله ابرداده هایی که Spark هنگام شروع یک کار چاپ می کند یا هر عبارت چاپی که در آن کار گنجانده شده است.
ثبت رویداد Spark از رابط کاربری Spark قابل دسترسی است. از آنجایی که کار Spark خود را با یک سرور تاریخچه دائمی ارائه کردهاید، میتوانید با کلیک کردن روی View Spark History Server که حاوی اطلاعاتی برای کارهای Spark قبلا اجرا شدهتان است، به Spark UI دسترسی پیدا کنید. میتوانید درباره Spark UI از اسناد رسمی Spark اطلاعات بیشتری کسب کنید.
5. الگوهای Dataproc: BQ -> GCS
الگوهای Dataproc ابزارهای منبع باز هستند که به سادهتر کردن وظایف پردازش دادههای درون ابری کمک میکنند. اینها به عنوان پوششی برای Dataproc Serverless عمل می کنند و شامل الگوهایی برای بسیاری از وظایف واردات و صادرات داده می شوند، از جمله:
-
BigQuerytoGCS
وGCStoBigQuery
-
GCStoBigTable
-
GCStoJDBC
وJDBCtoGCS
-
HivetoBigQuery
-
MongotoGCS
وGCStoMongo
لیست کامل README در دسترس است.
در این بخش، از الگوهای Dataproc برای صادرات داده ها از BigQuery به GCS استفاده خواهید کرد.
مخزن را شبیه سازی کنید
مخزن را کلون کنید و به پوشه python
تغییر دهید.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
محیط را پیکربندی کنید
اکنون متغیرهای محیطی را تنظیم خواهید کرد. الگوهای Dataproc از متغیر محیطی GCP_PROJECT
برای شناسه پروژه شما استفاده می کنند، بنابراین آن را برابر با GOOGLE_CLOUD_PROJECT.
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
منطقه شما باید از قبل در محیط تنظیم شود. اگر نه، آن را در اینجا تنظیم کنید.
export REGION=<region>
الگوهای Dataproc از spark-bigquery-conector برای پردازش کارهای BigQuery استفاده میکنند و نیاز دارند که URI در یک متغیر محیطی JARS
گنجانده شود. متغیر JARS
را تنظیم کنید.
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
پیکربندی پارامترهای قالب
نام یک سطل مرحلهبندی را برای استفاده از سرویس تنظیم کنید.
export GCS_STAGING_LOCATION=gs://${BUCKET}
در مرحله بعد، چند متغیر مخصوص کار را تنظیم خواهید کرد. برای جدول ورودی، دوباره به مجموعه داده BigQuery NYC Citibike ارجاع خواهید داد.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
می توانید csv
، parquet
، avro
یا json
را انتخاب کنید. برای این کد لبه، CSV را انتخاب کنید - در بخش بعدی نحوه استفاده از الگوهای Dataproc برای تبدیل انواع فایل.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
حالت خروجی را برای overwrite
تنظیم کنید. شما می توانید بین overwrite
، append
، ignore
یا errorifexists.
BIGQUERY_GCS_OUTPUT_MODE=overwrite
محل خروجی GCS را به عنوان مسیری در سطل خود تنظیم کنید.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
قالب را اجرا کنید
الگوی BIGQUERYTOGCS
را با مشخص کردن آن در زیر و ارائه پارامترهای ورودی که تنظیم کردهاید، اجرا کنید.
./bin/start.sh \ -- --template=BIGQUERYTOGCS \ --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \ --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \ --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \ --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
خروجی نسبتاً نویز خواهد بود اما پس از حدود یک دقیقه موارد زیر را مشاهده خواهید کرد.
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
با اجرای موارد زیر می توانید تأیید کنید که فایل ها تولید شده اند.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
Spark به طور پیش فرض بسته به مقدار داده، روی چندین فایل می نویسد. در این حالت تقریباً 30 فایل تولید شده را مشاهده خواهید کرد. نام فایل های خروجی Spark با part
- به دنبال آن یک عدد پنج رقمی (که شماره قطعه را نشان می دهد) و یک رشته هش فرمت می شود. برای مقادیر زیاد داده، Spark معمولاً روی چندین فایل می نویسد. نام فایل نمونه part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv
است.
6. الگوهای Dataproc: CSV به پارکت
اکنون از الگوهای Dataproc برای تبدیل داده ها در GCS از یک نوع فایل به نوع دیگر با استفاده از GCSTOGCS استفاده خواهید کرد. این الگو از SparkSQL استفاده می کند و گزینه ای را برای ارسال یک پرس و جو SparkSQL فراهم می کند تا در طول تبدیل برای پردازش اضافی پردازش شود.
متغیرهای محیط را تأیید کنید
تأیید کنید که GCP_PROJECT
، REGION
، و GCS_STAGING_BUCKET
از بخش قبلی تنظیم شدهاند.
echo ${GCP_PROJECT} echo ${REGION} echo ${GCS_STAGING_LOCATION}
تنظیم پارامترهای قالب
اکنون پارامترهای پیکربندی را برای GCStoGCS
تنظیم خواهید کرد. با محل فایل های ورودی شروع کنید. توجه داشته باشید که این یک دایرکتوری است و نه یک فایل خاص، زیرا تمام فایل های موجود در دایرکتوری پردازش می شوند. این را روی BIGQUERY_GCS_OUTPUT_LOCATION
تنظیم کنید.
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
فرمت فایل ورودی را تنظیم کنید.
GCS_TO_GCS_INPUT_FORMAT=csv
فرمت خروجی مورد نظر را تنظیم کنید. می توانید پارکت، json، avro یا csv را انتخاب کنید.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
حالت خروجی را برای overwrite
تنظیم کنید. شما می توانید بین overwrite
، append
، ignore
یا errorifexists.
GCS_TO_GCS_OUTPUT_MODE=overwrite
محل خروجی را تنظیم کنید.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
قالب را اجرا کنید
قالب GCStoGCS
را اجرا کنید.
./bin/start.sh \ -- --template=GCSTOGCS \ --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \ --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \ --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \ --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \ --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
خروجی نسبتاً پر سر و صدا خواهد بود، اما پس از حدود یک دقیقه باید یک پیام موفقیت آمیز مانند زیر مشاهده کنید.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
با اجرای موارد زیر می توانید تأیید کنید که فایل ها تولید شده اند.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
با استفاده از این الگو، شما همچنین میتوانید با ارسال gcs.to.gcs.temp.view.name
و gcs.to.gcs.sql.query
به الگو، Query SparkSQL را تامین کنید، و این امکان را فراهم میکند که یک Query SparkSQL روی دادهها قبل از آن اجرا شود. نوشتن به GCS
7. منابع را پاکسازی کنید
برای جلوگیری از تحمیل هزینههای غیر ضروری به حساب GCP خود پس از تکمیل این آزمایشگاه کد:
gsutil rm -r gs://${BUCKET}
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \ --region=${REGION}
- کارهای بدون سرور Dataproc را حذف کنید. به Batches Console بروید، روی کادر کنار هر کاری که میخواهید حذف کنید کلیک کنید و روی DELETE کلیک کنید.
اگر پروژه ای را فقط برای این کد لبه ایجاد کرده اید، می توانید به صورت اختیاری پروژه را نیز حذف کنید:
- در کنسول GCP، به صفحه پروژه ها بروید.
- در لیست پروژه، پروژه ای را که می خواهید حذف کنید انتخاب کنید و روی Delete کلیک کنید.
- در کادر، ID پروژه را تایپ کنید و سپس بر روی Shut down کلیک کنید تا پروژه حذف شود.
8. بعدی چه خواهد شد
منابع زیر راه های دیگری را برای استفاده از Spark بدون سرور ارائه می دهند:
- بیاموزید که چگونه گردشهای کاری بدون سرور Dataproc را با استفاده از Cloud Composer هماهنگ کنید .
- نحوه ادغام Dataproc Serverless با خطوط لوله Kubeflow را بیاموزید.