استفاده از نوت بوک با Google Cloud Dataflow

1. مقدمه

Cloud-Dataflow.png

Google Cloud Dataflow

آخرین به روز رسانی: 2023-Jul-5

Dataflow چیست؟

Dataflow یک سرویس مدیریت شده برای اجرای طیف گسترده ای از الگوهای پردازش داده است. اسناد موجود در این سایت به شما نشان می‌دهد که چگونه خطوط لوله پردازش داده‌های دسته‌ای و جریانی خود را با استفاده از Dataflow اجرا کنید، از جمله دستورالعمل‌هایی برای استفاده از ویژگی‌های سرویس.

Apache Beam SDK یک مدل برنامه نویسی متن باز است که به شما امکان می دهد خطوط لوله دسته ای و جریانی را توسعه دهید. شما خطوط لوله خود را با یک برنامه Apache Beam ایجاد می کنید و سپس آنها را در سرویس Dataflow اجرا می کنید. مستندات پرتو آپاچی اطلاعات مفهومی و مطالب مرجع عمیقی را برای مدل برنامه‌نویسی پرتو آپاچی، SDKها و سایر اجراکننده‌ها فراهم می‌کند.

جریان تجزیه و تحلیل داده ها با سرعت

جریان داده توسعه سریع و ساده خط لوله داده را با تأخیر داده کمتر امکان پذیر می کند.

عملیات و مدیریت را ساده کنید

به تیم ها اجازه دهید به جای مدیریت خوشه های سرور بر برنامه نویسی تمرکز کنند زیرا رویکرد بدون سرور Dataflow سربار عملیات را از بارهای کاری مهندسی داده حذف می کند.

هزینه کل مالکیت را کاهش دهید

مقیاس خودکار منابع همراه با قابلیت‌های پردازش دسته‌ای بهینه‌سازی‌شده هزینه به این معنی است که Dataflow ظرفیت تقریباً نامحدودی را برای مدیریت بارهای کاری فصلی و پراکنده بدون هزینه‌های بیش از حد ارائه می‌دهد.

ویژگی های کلیدی

مدیریت خودکار منابع و تعادل مجدد کار پویا

Dataflow تدارک و مدیریت منابع پردازش را خودکار می کند تا تأخیر را به حداقل برساند و استفاده را به حداکثر برساند، بنابراین نیازی به چرخش نمونه ها یا رزرو آنها به صورت دستی نیست. پارتیشن بندی کار نیز به صورت خودکار و بهینه سازی شده است تا به صورت پویا در کار عقب مانده تعادل ایجاد کند. نیازی به تعقیب "کلیدهای داغ" یا پیش پردازش داده های ورودی نیست.

مقیاس خودکار افقی

مقیاس خودکار افقی منابع کارگر برای توان عملیاتی بهینه منجر به عملکرد کلی قیمت به عملکرد بهتر می شود.

قیمت‌گذاری زمان‌بندی منابع انعطاف‌پذیر برای پردازش دسته‌ای

برای پردازش با انعطاف‌پذیری در زمان‌بندی کار، مانند کارهای شبانه، زمان‌بندی منابع انعطاف‌پذیر (FlexRS) قیمت پایین‌تری را برای پردازش دسته‌ای ارائه می‌دهد. این مشاغل انعطاف پذیر با تضمین بازیابی آنها برای اجرا در یک پنجره شش ساعته در یک صف قرار می گیرند.

آنچه شما به عنوان بخشی از این اجرا خواهید کرد

استفاده از رانر تعاملی Apache Beam با نوت‌بوک‌های JupyterLab به شما امکان می‌دهد به طور مکرر خطوط لوله را توسعه دهید، نمودار خط لوله خود را بررسی کنید، و PCCollection‌های فردی را در یک گردش کار حلقه خواندن-ارزیابی-چاپ (REPL) تجزیه کنید. این نوت‌بوک‌های Apache Beam از طریق Vertex AI Workbench در دسترس هستند، یک سرویس مدیریت‌شده که میزبان ماشین‌های مجازی نوت‌بوک از پیش نصب‌شده با جدیدترین چارچوب‌های علم داده و یادگیری ماشین است.

این کد لبه روی عملکرد معرفی شده توسط نوت بوک های Apache Beam تمرکز دارد.

چیزی که یاد خواهید گرفت

  • نحوه ایجاد یک نمونه نوت بوک
  • ایجاد یک خط لوله اساسی
  • خواندن داده ها از منبع نامحدود
  • تجسم داده ها
  • راه اندازی یک کار جریان داده از نوت بوک
  • ذخیره یک نوت بوک

آنچه شما نیاز دارید

  • یک پروژه Google Cloud Platform با فعال کردن صورت‌حساب.
  • Google Cloud Dataflow و Google Cloud PubSub فعال شدند.

2. راه اندازی

  1. در Cloud Console، در صفحه انتخاب پروژه، یک پروژه Cloud را انتخاب یا ایجاد کنید.

اطمینان حاصل کنید که API های زیر را فعال کرده اید:

  • Dataflow API
  • Cloud Pub/Sub API
  • موتور محاسباتی
  • Notebooks API

می‌توانید این مورد را با بررسی صفحه API و خدمات تأیید کنید.

در این راهنما، ما داده‌های یک اشتراک Pub/Sub را می‌خوانیم، بنابراین مطمئن شوید که حساب سرویس پیش‌فرض Compute Engine نقش ویرایشگر را دارد یا نقش ویرایشگر Pub/Sub را به آن اعطا کنید .

3. شروع به کار با نوت بوک های Apache Beam

راه اندازی یک نمونه نوت بوک Apache Beam

  1. جریان داده را در کنسول راه اندازی کنید:

  1. صفحه Workbench را با استفاده از منوی سمت چپ انتخاب کنید.
  2. مطمئن شوید که در برگه نوت‌بوک‌های مدیریت‌شده توسط کاربر هستید.
  3. در نوار ابزار، روی New Notebook کلیک کنید.
  4. Apache Beam > Without GPUs را انتخاب کنید.
  5. در صفحه نوت بوک جدید ، یک زیرشبکه برای VM نوت بوک انتخاب کنید و روی ایجاد کلیک کنید.
  6. وقتی پیوند فعال شد، روی Open JupyterLab کلیک کنید. Vertex AI Workbench یک نمونه نوت بوک جدید Apache Beam ایجاد می کند.

4. ایجاد خط لوله

ایجاد یک نمونه نوت بوک

به File > New > Notebook بروید و هسته ای را انتخاب کنید که Apache Beam 2.47 یا بالاتر است.

شروع به اضافه کردن کد به نوت بوک خود کنید

  • کد را از هر بخش در یک سلول جدید در دفترچه یادداشت خود کپی و جایگذاری کنید
  • سلول را اجرا کنید

6bd3dd86cc7cf802.png

استفاده از رانر تعاملی Apache Beam با نوت‌بوک‌های JupyterLab به شما امکان می‌دهد به طور مکرر خطوط لوله را توسعه دهید، نمودار خط لوله خود را بررسی کنید، و PCCollection‌های فردی را در یک گردش کار حلقه خواندن-ارزیابی-چاپ (REPL) تجزیه کنید.

Apache Beam روی نمونه نوت بوک شما نصب شده است، بنابراین ماژول های interactive_runner و interactive_beam را در نوت بوک خود قرار دهید.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

اگر نوت بوک شما از سایر خدمات Google استفاده می کند، عبارت های import زیر را اضافه کنید:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

تنظیم گزینه های تعامل

موارد زیر مدت زمان ضبط داده را روی 60 ثانیه تنظیم می کند. اگر می‌خواهید سریع‌تر تکرار کنید، آن را روی مدت زمان کمتری تنظیم کنید، برای مثال '10s'.

ib.options.recording_duration = '60s'

برای گزینه‌های تعاملی بیشتر، کلاس interactive_beam.options را ببینید.

خط لوله را با استفاده از یک شی InteractiveRunner راه اندازی کنید.

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

خواندن و تجسم داده ها

مثال زیر یک خط لوله Apache Beam را نشان می دهد که اشتراکی برای موضوع Pub/Sub داده شده ایجاد می کند و از اشتراک می خواند.

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

خط لوله کلمات را بر اساس پنجره ها از منبع شمارش می کند. این پنجره بندی ثابت با مدت زمان هر پنجره 10 ثانیه ایجاد می کند.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

پس از پنجره شدن داده ها، کلمات بر اساس پنجره شمارش می شوند.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

تجسم داده ها

متد show() PCCollection حاصل را در نوت بوک تجسم می کند.

ib.show(windowed_word_counts, include_window_info=True)

روش نمایش، تجسم یک مجموعه PC به شکل جدولی.

برای نمایش تجسم داده های خود، visualize_data=True به متد show() منتقل کنید. یک سلول جدید اضافه کنید:

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

می توانید چندین فیلتر را برای تجسم های خود اعمال کنید. تجسم زیر به شما امکان می دهد بر اساس برچسب و محور فیلتر کنید:

روش نمایش یک PCCollection را به عنوان مجموعه ای غنی از عناصر رابط کاربری قابل فیلتر تجسم می کند.

5. استفاده از پانداس دیتا فریم

یکی دیگر از تجسم های مفید در نوت بوک های Apache Beam یک Pandas DataFrame است. مثال زیر ابتدا کلمات را به حروف کوچک تبدیل می کند و سپس بسامد هر کلمه را محاسبه می کند.

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

متد collect() خروجی را در Pandas DataFrame فراهم می کند.

ib.collect(windowed_lower_word_counts, include_window_info=True)

متد جمع آوری که یک PCCollection را در یک Pandas DataFrame نشان می دهد.

6. (اختیاری) راه اندازی کارهای Dataflow از نوت بوک

  1. برای اجرای کارها در Dataflow، به مجوزهای اضافی نیاز دارید. مطمئن شوید که حساب سرویس پیش‌فرض Compute Engine دارای نقش ویرایشگر است یا نقش‌های IAM زیر را به آن اعطا کنید :
  • مدیر جریان داده
  • کارگر جریان داده
  • Storage Admin و
  • کاربر حساب سرویس (roles/iam.serviceAccountUser)

درباره نقش ها در مستندات بیشتر ببینید.

  1. (اختیاری) قبل از استفاده از نوت بوک خود برای اجرای کارهای Dataflow، هسته را مجددا راه اندازی کنید، همه سلول ها را مجددا اجرا کنید و خروجی را تأیید کنید.
  2. عبارات واردات زیر را حذف کنید:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. عبارت import زیر را اضافه کنید:
from apache_beam.runners import DataflowRunner
  1. گزینه مدت زمان ضبط زیر را حذف کنید:
ib.options.recording_duration = '60s'
  1. موارد زیر را به گزینه های خط لوله خود اضافه کنید. باید مکان ذخیره سازی ابری را طوری تنظیم کنید که به سطلی که قبلاً دارید اشاره کند، یا می توانید یک سطل جدید برای این منظور ایجاد کنید . همچنین می توانید مقدار منطقه را از us-central1 تغییر دهید.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. در سازنده beam.Pipeline() InteractiveRunner با DataflowRunner جایگزین کنید. p شیء خط لوله از ایجاد خط لوله شما است.
p = beam.Pipeline(DataflowRunner(), options=options)
  1. تماس های تعاملی را از کد خود حذف کنید. برای مثال، show() ، collect() ، head() ، show_graph() و watch() را از کد خود حذف کنید.
  2. برای اینکه بتوانید نتایج را مشاهده کنید، باید یک سینک اضافه کنید. در بخش قبلی، ما نتایج را در نوت بوک تجسم می‌کردیم، اما این بار، کار را خارج از این نوت بوک - در Dataflow - اجرا می‌کنیم. بنابراین، ما به یک مکان خارجی برای نتایج خود نیاز داریم. در این مثال، نتایج را در فایل‌های متنی در GCS (Google Cloud Storage) می‌نویسیم. از آنجایی که این یک خط لوله جریان است، با پنجره داده ها، ما می خواهیم یک فایل متنی در هر پنجره ایجاد کنیم. برای رسیدن به این هدف، مراحل زیر را به خط لوله خود اضافه کنید:
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. p.run() را در انتهای کد خط لوله خود اضافه کنید.
  2. اکنون کد نوت بوک خود را بررسی کنید تا تأیید کنید که همه تغییرات را اعمال کرده اید. باید شبیه این به نظر برسد:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. سلول ها را اجرا کنید.
  2. شما باید خروجی مشابه زیر را ببینید:
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. برای تأیید اینکه آیا کار در حال اجرا است، به صفحه Jobs برای Dataflow بروید. شما باید یک شغل جدید را در لیست ببینید. کار حدود 5-10 دقیقه طول می کشد تا پردازش داده ها شروع شود.
  2. پس از پردازش داده ها، به Cloud Storage بروید و به دایرکتوری که Dataflow نتایج را در آن ذخیره می کند ( output_gcs_location تعریف شده شما) بروید. شما باید لیستی از فایل های متنی را با یک فایل در هر پنجره ببینید. bfcc5ce9e46a8b14.png
  3. فایل را دانلود کنید و محتوا را بررسی کنید. باید حاوی لیستی از کلمات جفت شده با تعداد آنها باشد. همچنین، از رابط خط فرمان برای بازرسی فایل ها استفاده کنید. می توانید با اجرای موارد زیر در یک سلول جدید در نوت بوک خود این کار را انجام دهید:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. خروجی مشابه این را خواهید دید:

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. همین! فراموش نکنید که کاری را که ایجاد کرده اید تمیز کرده و متوقف کنید (مرحله آخر این کد لبه را ببینید).

برای مثالی در مورد نحوه انجام این تبدیل در یک نوت بوک تعاملی، به نوت بوک Dataflow Word Count در نمونه نوت بوک خود مراجعه کنید.

یا می توانید نوت بوک خود را به عنوان یک اسکریپت اجرایی صادر کنید، فایل .py تولید شده را با استفاده از مراحل قبلی تغییر دهید و سپس خط لوله خود را به سرویس Dataflow مستقر کنید .

7. ذخیره نوت بوک

نوت بوک هایی که ایجاد می کنید به صورت محلی در نمونه نوت بوک در حال اجرا شما ذخیره می شوند. اگر در حین توسعه، نمونه نوت بوک را بازنشانی یا خاموش کنید، آن نوت بوک های جدید تا زمانی که در دایرکتوری /home/jupyter ایجاد شده باشند، باقی خواهند ماند. با این حال، اگر یک نمونه نوت بوک حذف شود، آن نوت بوک نیز حذف می شود.

برای نگهداری نوت بوک های خود برای استفاده در آینده، آنها را به صورت محلی در ایستگاه کاری خود دانلود کنید، آنها را در GitHub ذخیره کنید ، یا آنها را به فرمت فایل دیگری صادر کنید.

8. تمیز کردن

پس از پایان استفاده از نمونه نوت‌بوک Apache Beam، منابعی را که در Google Cloud ایجاد کرده‌اید، با خاموش کردن نمونه نوت‌بوک و توقف کار پخش ، پاکسازی کنید، اگر یکی را اجرا کرده‌اید.

از طرف دیگر، اگر پروژه‌ای را تنها با هدف این نرم‌افزار ایجاد کرده‌اید، می‌توانید پروژه را کاملاً خاموش کنید .