Dataproc بدون سرور

1. نمای کلی - Google Dataproc

Dataproc یک سرویس کاملاً مدیریت شده و بسیار مقیاس پذیر برای اجرای Apache Spark، Apache Flink، Presto و بسیاری دیگر از ابزارها و فریم ورک های منبع باز است. از Dataproc برای نوسازی دریاچه داده، ETL / ELT و علم داده ایمن در مقیاس سیاره استفاده کنید. Dataproc همچنین به طور کامل با چندین سرویس Google Cloud از جمله BigQuery ، Cloud Storage ، Vertex AI و Dataplex یکپارچه شده است.

Dataproc در سه طعم موجود است:

  • Dataproc Serverless به شما اجازه می دهد تا کارهای PySpark را بدون نیاز به پیکربندی زیرساخت و مقیاس خودکار اجرا کنید. Dataproc Serverless از بارهای کاری دسته ای PySpark و جلسات / نوت بوک ها پشتیبانی می کند.
  • Dataproc در موتور محاسباتی Google به شما امکان می دهد تا علاوه بر ابزارهای منبع باز مانند Flink و Presto، یک کلاستر Hadoop YARN را برای بارهای کاری Spark مبتنی بر YARN مدیریت کنید. می‌توانید خوشه‌های مبتنی بر ابر خود را با مقیاس‌بندی عمودی یا افقی که می‌خواهید، از جمله مقیاس‌بندی خودکار، تنظیم کنید.
  • Dataproc در Google Kubernetes Engine به شما اجازه می دهد تا خوشه های مجازی Dataproc را در زیرساخت GKE خود برای ارسال کارهای Spark، PySpark، SparkR یا Spark SQL پیکربندی کنید.

در این کد لبه، چندین روش مختلف را یاد خواهید گرفت که می توانید بدون سرور Dataproc مصرف کنید.

آپاچی اسپارک در ابتدا برای اجرا بر روی خوشه های Hadoop ساخته شد و از YARN به عنوان مدیر منابع خود استفاده کرد. نگهداری از خوشه های Hadoop مستلزم مجموعه ای خاص از تخصص و اطمینان از پیکربندی صحیح بسیاری از دستگیره های مختلف روی خوشه ها است. این علاوه بر مجموعه جداگانه ای از دستگیره ها است که Spark همچنین از کاربر می خواهد که آنها را تنظیم کند. این منجر به سناریوهای بسیاری می شود که در آن توسعه دهندگان به جای کار بر روی کد Spark، زمان بیشتری را صرف پیکربندی زیرساخت های خود می کنند.

Dataproc Serverless نیاز به پیکربندی دستی خوشه های Hadoop یا Spark را برطرف می کند. Dataproc Serverless روی Hadoop اجرا نمی‌شود و از تخصیص دینامیک منبع خود برای تعیین منابع مورد نیاز خود از جمله مقیاس‌بندی خودکار استفاده می‌کند. زیرمجموعه کوچکی از ویژگی‌های Spark هنوز با Dataproc Serverless قابل تنظیم هستند، اما در بیشتر موارد نیازی به تغییر این ویژگی‌ها ندارید.

2. راه اندازی کنید

شما با پیکربندی محیط و منابع مورد استفاده در این کد لبه شروع خواهید کرد.

یک پروژه Google Cloud ایجاد کنید . می توانید از یک موجود استفاده کنید.

Cloud Shell را با کلیک کردن روی آن در نوار ابزار Cloud Console باز کنید.

ba0bb17945a73543.png

Cloud Shell یک محیط شل آماده برای استفاده را فراهم می کند که می توانید برای این کد لبه استفاده کنید.

68c4ebd2a8539764.png

Cloud Shell نام پروژه شما را به طور پیش فرض تنظیم می کند. با اجرای echo $GOOGLE_CLOUD_PROJECT دوباره بررسی کنید. اگر ID پروژه خود را در خروجی نمی بینید، آن را تنظیم کنید.

export GOOGLE_CLOUD_PROJECT=<your-project-id>

یک منطقه Compute Engine برای منابع خود، مانند us-central1 یا europe-west2 تنظیم کنید.

export REGION=<your-region>

API ها را فعال کنید

نرم افزار Codelab از API های زیر استفاده می کند:

  • BigQuery
  • Dataproc

API های لازم را فعال کنید. این کار حدود یک دقیقه طول می کشد و پس از تکمیل پیام موفقیت آمیز ظاهر می شود.

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

پیکربندی دسترسی به شبکه

Dataproc Serverless نیاز دارد که Google Private Access در منطقه ای که کارهای Spark خود را اجرا می کنید فعال شود، زیرا درایورها و مجریان Spark فقط IP خصوصی دارند. برای فعال کردن آن در زیر شبکه default ، موارد زیر را اجرا کنید.

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

می‌توانید تأیید کنید که Google Private Access از طریق موارد زیر فعال است که خروجی True یا False را نشان می‌دهد.

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

یک سطل ذخیره سازی ایجاد کنید

یک سطل ذخیره سازی ایجاد کنید که برای ذخیره دارایی های ایجاد شده در این Codelab استفاده می شود.

یک نام برای سطل خود انتخاب کنید. نام‌های سطل باید در سطح جهانی برای همه کاربران منحصربه‌فرد باشند.

export BUCKET=<your-bucket-name>

سطل را در منطقه ای که می خواهید کارهای Spark خود را اجرا کنید ایجاد کنید.

gsutil mb -l ${REGION} gs://${BUCKET}

می توانید ببینید که سطل شما در کنسول Cloud Storage موجود است. همچنین می توانید gsutil ls برای دیدن سطل خود اجرا کنید.

یک سرور تاریخچه پایدار ایجاد کنید

Spark UI مجموعه ای غنی از ابزارهای اشکال زدایی و بینش در مورد مشاغل Spark را ارائه می دهد. برای مشاهده Spark UI برای کارهای تکمیل شده بدون سرور Dataproc، باید یک کلاستر Dataproc گره ایجاد کنید تا از آن به عنوان یک سرور تاریخچه پایدار استفاده کنید.

یک نام برای سرور تاریخچه دائمی خود تنظیم کنید.

PHS_CLUSTER_NAME=my-phs

زیر را اجرا کنید.

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

رابط کاربری Spark و سرور تاریخچه پایدار بعداً در لابلای کد با جزئیات بیشتر مورد بررسی قرار خواهند گرفت.

3. کارهای Spark بدون سرور را با دسته های Dataproc اجرا کنید

در این نمونه، شما با مجموعه ای از داده ها از مجموعه داده عمومی سیتی دوچرخه سفرهای نیویورک (NYC) کار خواهید کرد. NYC Citi Bikes یک سیستم اشتراک دوچرخه پولی در نیویورک است. شما تغییرات ساده ای را انجام خواهید داد و ده شناسه محبوب ایستگاه دوچرخه سیتی را چاپ خواهید کرد. این نمونه همچنین از رابط منبع باز spark-bigquery-connector برای خواندن و نوشتن یکپارچه داده ها بین Spark و BigQuery استفاده می کند.

مخزن و cd دی Github زیر را در دایرکتوری حاوی فایل citibike.py کلون کنید.

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

با استفاده از Cloud SDK که به طور پیش‌فرض در Cloud Shell موجود است، کار را به Spark بدون سرور ارسال کنید. دستور زیر را در پوسته خود اجرا کنید که از Cloud SDK و Dataproc Batches API برای ارسال کارهای Spark بدون سرور استفاده می کند.

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

برای تجزیه این:

  • gcloud dataproc batches submit ارجاعاتی را به Dataproc Batches API ارسال می کنند.
  • pyspark نشان می دهد که شما در حال ارسال یک کار PySpark هستید.
  • --batch نام کار است. اگر ارائه نشود، یک UUID تصادفی تولید شده استفاده خواهد شد.
  • --region=${REGION} منطقه جغرافیایی است که کار در آن پردازش خواهد شد.
  • --deps-bucket=${BUCKET} جایی است که فایل پایتون محلی شما قبل از اجرا در محیط بدون سرور در آن آپلود می شود.
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar شامل jar برای spark-bigquery-connector در محیط اجرای Spark است.
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER} نام کاملاً واجد شرایط سرور تاریخچه پایدار است. این جایی است که داده های رویداد Spark (جدا از خروجی کنسول) ذخیره می شود و از Spark UI قابل مشاهده است.
  • دنباله -- نشان می دهد که هر چیزی فراتر از این آرگ زمان اجرا برای برنامه خواهد بود. در این صورت، شما در حال ارسال نام سطل خود، مطابق با شغل هستید.

هنگام ارسال دسته، خروجی زیر را مشاهده خواهید کرد.

Batch [citibike-job] submitted.

بعد از چند دقیقه خروجی زیر را به همراه متادیتا از کار مشاهده خواهید کرد.

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

در بخش بعدی نحوه یافتن لاگ های مربوط به این کار را خواهید آموخت.

ویژگی های اضافی

با Spark Serverless، گزینه های بیشتری برای اجرای مشاغل خود دارید.

  • می توانید یک تصویر داکر سفارشی ایجاد کنید که کار شما روی آن اجرا می شود. این یک راه عالی برای اضافه کردن وابستگی‌های اضافی، از جمله کتابخانه‌های پایتون و R است.
  • می توانید یک نمونه Dataproc Metastore را به شغل خود متصل کنید تا به ابرداده Hive دسترسی پیدا کنید.
  • برای کنترل بیشتر، Dataproc Serverless از پیکربندی مجموعه کوچکی از ویژگی های Spark پشتیبانی می کند.

4. Dataproc Metrics and Observability

کنسول دسته ای Dataproc تمام کارهای بدون سرور Dataproc شما را فهرست می کند. در کنسول، Batch ID، موقعیت مکانی، وضعیت ، زمان ایجاد، زمان سپری شده و نوع هر کار را خواهید دید. برای مشاهده اطلاعات بیشتر در مورد آن، روی Batch ID شغل خود کلیک کنید.

در این صفحه، اطلاعاتی مانند مانیتورینگ را مشاهده خواهید کرد که نشان می دهد کار شما در طول زمان از چند Batch Spark Executors استفاده کرده است (که نشان می دهد چقدر مقیاس خودکار شده است).

در برگه جزئیات ، ابرداده‌های بیشتری در مورد شغل از جمله هر آرگومان و پارامتری که همراه کار ارسال شده است را مشاهده خواهید کرد.

همچنین می توانید از این صفحه به همه گزارش ها دسترسی داشته باشید. هنگامی که کارهای بدون سرور Dataproc اجرا می شوند، سه مجموعه گزارش مختلف تولید می شوند:

  • سطح سرویس
  • خروجی کنسول
  • ثبت رویداد جرقه

سطح سرویس ، شامل گزارش‌هایی است که سرویس بدون سرور Dataproc ایجاد کرده است. این موارد شامل مواردی مانند درخواست سرور بدون سرور Dataproc برای CPUهای اضافی برای مقیاس خودکار است. می‌توانید این موارد را با کلیک کردن روی View logs مشاهده کنید که Cloud Logging باز می‌شود.

خروجی کنسول در زیر خروجی قابل مشاهده است. این خروجی تولید شده توسط کار است، از جمله ابرداده هایی که Spark هنگام شروع یک کار چاپ می کند یا هر عبارت چاپی که در آن کار گنجانده شده است.

ثبت رویداد Spark از رابط کاربری Spark قابل دسترسی است. از آنجایی که کار Spark خود را با یک سرور تاریخچه دائمی ارائه کرده‌اید، می‌توانید با کلیک کردن روی View Spark History Server که حاوی اطلاعاتی برای کارهای Spark قبلا اجرا شده‌تان است، به Spark UI دسترسی پیدا کنید. می‌توانید درباره Spark UI از اسناد رسمی Spark اطلاعات بیشتری کسب کنید.

5. الگوهای Dataproc: BQ -> GCS

الگوهای Dataproc ابزارهای منبع باز هستند که به ساده‌تر کردن وظایف پردازش داده‌های درون ابری کمک می‌کنند. اینها به عنوان پوششی برای Dataproc Serverless عمل می کنند و شامل الگوهایی برای بسیاری از وظایف واردات و صادرات داده می شوند، از جمله:

  • BigQuerytoGCS و GCStoBigQuery
  • GCStoBigTable
  • GCStoJDBC و JDBCtoGCS
  • HivetoBigQuery
  • MongotoGCS و GCStoMongo

لیست کامل README در دسترس است.

در این بخش، از الگوهای Dataproc برای صادرات داده ها از BigQuery به GCS استفاده خواهید کرد.

مخزن را شبیه سازی کنید

مخزن را کلون کنید و به پوشه python تغییر دهید.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

محیط را پیکربندی کنید

اکنون متغیرهای محیطی را تنظیم خواهید کرد. الگوهای Dataproc از متغیر محیطی GCP_PROJECT برای شناسه پروژه شما استفاده می کنند، بنابراین آن را برابر با GOOGLE_CLOUD_PROJECT.

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

منطقه شما باید از قبل در محیط تنظیم شود. اگر نه، آن را در اینجا تنظیم کنید.

export REGION=<region>

الگوهای Dataproc از spark-bigquery-conector برای پردازش کارهای BigQuery استفاده می‌کنند و نیاز دارند که URI در یک متغیر محیطی JARS گنجانده شود. متغیر JARS را تنظیم کنید.

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

پیکربندی پارامترهای قالب

نام یک سطل مرحله‌بندی را برای استفاده از سرویس تنظیم کنید.

export GCS_STAGING_LOCATION=gs://${BUCKET}

در مرحله بعد، چند متغیر مخصوص کار را تنظیم خواهید کرد. برای جدول ورودی، دوباره به مجموعه داده BigQuery NYC Citibike ارجاع خواهید داد.

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

می توانید csv ، parquet ، avro یا json را انتخاب کنید. برای این کد لبه، CSV را انتخاب کنید - در بخش بعدی نحوه استفاده از الگوهای Dataproc برای تبدیل انواع فایل.

BIGQUERY_GCS_OUTPUT_FORMAT=csv

حالت خروجی را برای overwrite تنظیم کنید. شما می توانید بین overwrite ، append ، ignore یا errorifexists.

BIGQUERY_GCS_OUTPUT_MODE=overwrite

محل خروجی GCS را به عنوان مسیری در سطل خود تنظیم کنید.

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

قالب را اجرا کنید

الگوی BIGQUERYTOGCS را با مشخص کردن آن در زیر و ارائه پارامترهای ورودی که تنظیم کرده‌اید، اجرا کنید.

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

خروجی نسبتاً نویز خواهد بود اما پس از حدود یک دقیقه موارد زیر را مشاهده خواهید کرد.

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

با اجرای موارد زیر می توانید تأیید کنید که فایل ها تولید شده اند.

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

Spark به طور پیش فرض بسته به مقدار داده، روی چندین فایل می نویسد. در این حالت تقریباً 30 فایل تولید شده را مشاهده خواهید کرد. نام فایل های خروجی Spark با part - به دنبال آن یک عدد پنج رقمی (که شماره قطعه را نشان می دهد) و یک رشته هش فرمت می شود. برای مقادیر زیاد داده، Spark معمولاً روی چندین فایل می نویسد. نام فایل نمونه part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv است.

6. الگوهای Dataproc: CSV به پارکت

اکنون از الگوهای Dataproc برای تبدیل داده ها در GCS از یک نوع فایل به نوع دیگر با استفاده از GCSTOGCS استفاده خواهید کرد. این الگو از SparkSQL استفاده می کند و گزینه ای را برای ارسال یک پرس و جو SparkSQL فراهم می کند تا در طول تبدیل برای پردازش اضافی پردازش شود.

متغیرهای محیط را تأیید کنید

تأیید کنید که GCP_PROJECT ، REGION ، و GCS_STAGING_BUCKET از بخش قبلی تنظیم شده‌اند.

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

تنظیم پارامترهای قالب

اکنون پارامترهای پیکربندی را برای GCStoGCS تنظیم خواهید کرد. با محل فایل های ورودی شروع کنید. توجه داشته باشید که این یک دایرکتوری است و نه یک فایل خاص، زیرا تمام فایل های موجود در دایرکتوری پردازش می شوند. این را روی BIGQUERY_GCS_OUTPUT_LOCATION تنظیم کنید.

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

فرمت فایل ورودی را تنظیم کنید.

GCS_TO_GCS_INPUT_FORMAT=csv

فرمت خروجی مورد نظر را تنظیم کنید. می توانید پارکت، json، avro یا csv را انتخاب کنید.

GCS_TO_GCS_OUTPUT_FORMAT=parquet

حالت خروجی را برای overwrite تنظیم کنید. شما می توانید بین overwrite ، append ، ignore یا errorifexists.

GCS_TO_GCS_OUTPUT_MODE=overwrite

محل خروجی را تنظیم کنید

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

قالب را اجرا کنید

قالب GCStoGCS را اجرا کنید.

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

خروجی نسبتاً پر سر و صدا خواهد بود، اما پس از حدود یک دقیقه باید یک پیام موفقیت آمیز مانند زیر مشاهده کنید.

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

با اجرای موارد زیر می توانید تأیید کنید که فایل ها تولید شده اند.

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

با استفاده از این الگو، شما همچنین می‌توانید با ارسال gcs.to.gcs.temp.view.name و gcs.to.gcs.sql.query به الگو، Query SparkSQL را تامین کنید، و این امکان را فراهم می‌کند که یک Query SparkSQL روی داده‌ها قبل از آن اجرا شود. نوشتن به GCS

7. منابع را پاکسازی کنید

برای جلوگیری از تحمیل هزینه‌های غیر ضروری به حساب GCP خود پس از تکمیل این آزمایشگاه کد:

  1. سطل Cloud Storage را برای محیطی که ایجاد کرده اید حذف کنید .
gsutil rm -r gs://${BUCKET}
  1. خوشه Dataproc مورد استفاده برای سرور تاریخچه دائمی خود را حذف کنید .
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. کارهای بدون سرور Dataproc را حذف کنید. به Batches Console بروید، روی کادر کنار هر کاری که می‌خواهید حذف کنید کلیک کنید و روی DELETE کلیک کنید.

اگر پروژه ای را فقط برای این کد لبه ایجاد کرده اید، می توانید به صورت اختیاری پروژه را نیز حذف کنید:

  1. در کنسول GCP، به صفحه پروژه ها بروید.
  2. در لیست پروژه، پروژه ای را که می خواهید حذف کنید انتخاب کنید و روی Delete کلیک کنید.
  3. در کادر، ID پروژه را تایپ کنید و سپس بر روی Shut down کلیک کنید تا پروژه حذف شود.

8. بعدی چه خواهد شد

منابع زیر راه های دیگری را برای استفاده از Spark بدون سرور ارائه می دهند: