1. مقدمه
Google Cloud Dataflow
آخرین به روز رسانی: 2023-Jul-5
Dataflow چیست؟
Dataflow یک سرویس مدیریت شده برای اجرای طیف گسترده ای از الگوهای پردازش داده است. اسناد موجود در این سایت به شما نشان میدهد که چگونه خطوط لوله پردازش دادههای دستهای و جریانی خود را با استفاده از Dataflow اجرا کنید، از جمله دستورالعملهایی برای استفاده از ویژگیهای سرویس.
Apache Beam SDK یک مدل برنامه نویسی متن باز است که به شما امکان می دهد خطوط لوله دسته ای و جریانی را توسعه دهید. شما خطوط لوله خود را با یک برنامه Apache Beam ایجاد می کنید و سپس آنها را در سرویس Dataflow اجرا می کنید. مستندات پرتو آپاچی اطلاعات مفهومی و مطالب مرجع عمیقی را برای مدل برنامهنویسی پرتو آپاچی، SDKها و سایر اجراکنندهها فراهم میکند.
جریان تجزیه و تحلیل داده ها با سرعت
جریان داده توسعه سریع و ساده خط لوله داده را با تأخیر داده کمتر امکان پذیر می کند.
عملیات و مدیریت را ساده کنید
به تیم ها اجازه دهید به جای مدیریت خوشه های سرور بر برنامه نویسی تمرکز کنند زیرا رویکرد بدون سرور Dataflow سربار عملیات را از بارهای کاری مهندسی داده حذف می کند.
هزینه کل مالکیت را کاهش دهید
مقیاس خودکار منابع همراه با قابلیتهای پردازش دستهای بهینهسازیشده هزینه به این معنی است که Dataflow ظرفیت تقریباً نامحدودی را برای مدیریت بارهای کاری فصلی و پراکنده بدون هزینههای بیش از حد ارائه میدهد.
ویژگی های کلیدی
مدیریت خودکار منابع و تعادل مجدد کار پویا
Dataflow تدارک و مدیریت منابع پردازش را خودکار می کند تا تأخیر را به حداقل برساند و استفاده را به حداکثر برساند، بنابراین نیازی به چرخش نمونه ها یا رزرو آنها به صورت دستی نیست. پارتیشن بندی کار نیز به صورت خودکار و بهینه سازی شده است تا به صورت پویا در کار عقب مانده تعادل ایجاد کند. نیازی به تعقیب "کلیدهای داغ" یا پیش پردازش داده های ورودی نیست.
مقیاس خودکار افقی
مقیاس خودکار افقی منابع کارگر برای توان عملیاتی بهینه منجر به عملکرد کلی قیمت به عملکرد بهتر می شود.
قیمتگذاری زمانبندی منابع انعطافپذیر برای پردازش دستهای
برای پردازش با انعطافپذیری در زمانبندی کار، مانند کارهای شبانه، زمانبندی منابع انعطافپذیر (FlexRS) قیمت پایینتری را برای پردازش دستهای ارائه میدهد. این مشاغل انعطاف پذیر با تضمین بازیابی آنها برای اجرا در یک پنجره شش ساعته در یک صف قرار می گیرند.
آنچه شما به عنوان بخشی از این اجرا خواهید کرد
استفاده از رانر تعاملی Apache Beam با نوتبوکهای JupyterLab به شما امکان میدهد به طور مکرر خطوط لوله را توسعه دهید، نمودار خط لوله خود را بررسی کنید، و PCCollectionهای فردی را در یک گردش کار حلقه خواندن-ارزیابی-چاپ (REPL) تجزیه کنید. این نوتبوکهای Apache Beam از طریق Vertex AI Workbench در دسترس هستند، یک سرویس مدیریتشده که میزبان ماشینهای مجازی نوتبوک از پیش نصبشده با جدیدترین چارچوبهای علم داده و یادگیری ماشین است.
این کد لبه روی عملکرد معرفی شده توسط نوت بوک های Apache Beam تمرکز دارد.
چیزی که یاد خواهید گرفت
- نحوه ایجاد یک نمونه نوت بوک
- ایجاد یک خط لوله اساسی
- خواندن داده ها از منبع نامحدود
- تجسم داده ها
- راه اندازی یک کار جریان داده از نوت بوک
- ذخیره یک نوت بوک
آنچه شما نیاز دارید
- یک پروژه Google Cloud Platform با فعال کردن صورتحساب.
- Google Cloud Dataflow و Google Cloud PubSub فعال شدند.
2. راه اندازی
- در Cloud Console، در صفحه انتخاب پروژه، یک پروژه Cloud را انتخاب یا ایجاد کنید.
اطمینان حاصل کنید که API های زیر را فعال کرده اید:
- Dataflow API
- Cloud Pub/Sub API
- موتور محاسباتی
- Notebooks API
میتوانید این مورد را با بررسی صفحه API و خدمات تأیید کنید.
در این راهنما، ما دادههای یک اشتراک Pub/Sub را میخوانیم، بنابراین مطمئن شوید که حساب سرویس پیشفرض Compute Engine نقش ویرایشگر را دارد یا نقش ویرایشگر Pub/Sub را به آن اعطا کنید .
3. شروع به کار با نوت بوک های Apache Beam
راه اندازی یک نمونه نوت بوک Apache Beam
- جریان داده را در کنسول راه اندازی کنید:
- صفحه Workbench را با استفاده از منوی سمت چپ انتخاب کنید.
- مطمئن شوید که در برگه نوتبوکهای مدیریتشده توسط کاربر هستید.
- در نوار ابزار، روی New Notebook کلیک کنید.
- Apache Beam > Without GPUs را انتخاب کنید.
- در صفحه نوت بوک جدید ، یک زیرشبکه برای VM نوت بوک انتخاب کنید و روی ایجاد کلیک کنید.
- وقتی پیوند فعال شد، روی Open JupyterLab کلیک کنید. Vertex AI Workbench یک نمونه نوت بوک جدید Apache Beam ایجاد می کند.
4. ایجاد خط لوله
ایجاد یک نمونه نوت بوک
به File > New > Notebook بروید و هسته ای را انتخاب کنید که Apache Beam 2.47 یا بالاتر است.
شروع به اضافه کردن کد به نوت بوک خود کنید
- کد را از هر بخش در یک سلول جدید در دفترچه یادداشت خود کپی و جایگذاری کنید
- سلول را اجرا کنید
استفاده از رانر تعاملی Apache Beam با نوتبوکهای JupyterLab به شما امکان میدهد به طور مکرر خطوط لوله را توسعه دهید، نمودار خط لوله خود را بررسی کنید، و PCCollectionهای فردی را در یک گردش کار حلقه خواندن-ارزیابی-چاپ (REPL) تجزیه کنید.
Apache Beam روی نمونه نوت بوک شما نصب شده است، بنابراین ماژول های interactive_runner
و interactive_beam
را در نوت بوک خود قرار دهید.
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
اگر نوت بوک شما از سایر خدمات Google استفاده می کند، عبارت های import زیر را اضافه کنید:
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
تنظیم گزینه های تعامل
موارد زیر مدت زمان ضبط داده را روی 60 ثانیه تنظیم می کند. اگر میخواهید سریعتر تکرار کنید، آن را روی مدت زمان کمتری تنظیم کنید، برای مثال '10s'.
ib.options.recording_duration = '60s'
برای گزینههای تعاملی بیشتر، کلاس interactive_beam.options را ببینید.
خط لوله را با استفاده از یک شی InteractiveRunner
راه اندازی کنید.
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(InteractiveRunner(), options=options)
خواندن و تجسم داده ها
مثال زیر یک خط لوله Apache Beam را نشان می دهد که اشتراکی برای موضوع Pub/Sub داده شده ایجاد می کند و از اشتراک می خواند.
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
خط لوله کلمات را بر اساس پنجره ها از منبع شمارش می کند. این پنجره بندی ثابت با مدت زمان هر پنجره 10 ثانیه ایجاد می کند.
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
پس از پنجره شدن داده ها، کلمات بر اساس پنجره شمارش می شوند.
windowed_word_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
تجسم داده ها
متد show()
PCCollection حاصل را در نوت بوک تجسم می کند.
ib.show(windowed_word_counts, include_window_info=True)
برای نمایش تجسم داده های خود، visualize_data=True
به متد show()
منتقل کنید. یک سلول جدید اضافه کنید:
ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)
می توانید چندین فیلتر را برای تجسم های خود اعمال کنید. تجسم زیر به شما امکان می دهد بر اساس برچسب و محور فیلتر کنید:
5. استفاده از پانداس دیتا فریم
یکی دیگر از تجسم های مفید در نوت بوک های Apache Beam یک Pandas DataFrame است. مثال زیر ابتدا کلمات را به حروف کوچک تبدیل می کند و سپس بسامد هر کلمه را محاسبه می کند.
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
متد collect()
خروجی را در Pandas DataFrame فراهم می کند.
ib.collect(windowed_lower_word_counts, include_window_info=True)
6. (اختیاری) راه اندازی کارهای Dataflow از نوت بوک
- برای اجرای کارها در Dataflow، به مجوزهای اضافی نیاز دارید. مطمئن شوید که حساب سرویس پیشفرض Compute Engine دارای نقش ویرایشگر است یا نقشهای IAM زیر را به آن اعطا کنید :
- مدیر جریان داده
- کارگر جریان داده
- Storage Admin و
- کاربر حساب سرویس (roles/iam.serviceAccountUser)
درباره نقش ها در مستندات بیشتر ببینید.
- (اختیاری) قبل از استفاده از نوت بوک خود برای اجرای کارهای Dataflow، هسته را مجددا راه اندازی کنید، همه سلول ها را مجددا اجرا کنید و خروجی را تأیید کنید.
- عبارات واردات زیر را حذف کنید:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
- عبارت import زیر را اضافه کنید:
from apache_beam.runners import DataflowRunner
- گزینه مدت زمان ضبط زیر را حذف کنید:
ib.options.recording_duration = '60s'
- موارد زیر را به گزینه های خط لوله خود اضافه کنید. باید مکان ذخیره سازی ابری را طوری تنظیم کنید که به سطلی که قبلاً دارید اشاره کند، یا می توانید یک سطل جدید برای این منظور ایجاد کنید . همچنین می توانید مقدار منطقه را از
us-central1
تغییر دهید.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
- در سازنده
beam.Pipeline()
InteractiveRunner
باDataflowRunner
جایگزین کنید.p
شیء خط لوله از ایجاد خط لوله شما است.
p = beam.Pipeline(DataflowRunner(), options=options)
- تماس های تعاملی را از کد خود حذف کنید. برای مثال،
show()
،collect()
،head()
،show_graph()
وwatch()
را از کد خود حذف کنید. - برای اینکه بتوانید نتایج را مشاهده کنید، باید یک سینک اضافه کنید. در بخش قبلی، ما نتایج را در نوت بوک تجسم میکردیم، اما این بار، کار را خارج از این نوت بوک - در Dataflow - اجرا میکنیم. بنابراین، ما به یک مکان خارجی برای نتایج خود نیاز داریم. در این مثال، نتایج را در فایلهای متنی در GCS (Google Cloud Storage) مینویسیم. از آنجایی که این یک خط لوله جریان است، با پنجره داده ها، ما می خواهیم یک فایل متنی در هر پنجره ایجاد کنیم. برای رسیدن به این هدف، مراحل زیر را به خط لوله خود اضافه کنید:
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
p.run()
را در انتهای کد خط لوله خود اضافه کنید.- اکنون کد نوت بوک خود را بررسی کنید تا تأیید کنید که همه تغییرات را اعمال کرده اید. باید شبیه این به نظر برسد:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
p.run()
- سلول ها را اجرا کنید.
- شما باید خروجی مشابه زیر را ببینید:
<DataflowPipelineResult <Job
clientRequestId: '20230623100011457336-8998'
createTime: '2023-06-23T10:00:33.447347Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2023-06-23_03_00_33-11346237320103246437'
location: 'us-central1'
name: 'beamapp-root-0623075553-503897-boh4u4wb'
projectId: 'your-project-id'
stageStates: []
startTime: '2023-06-23T10:00:33.447347Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
- برای تأیید اینکه آیا کار در حال اجرا است، به صفحه Jobs برای Dataflow بروید. شما باید یک شغل جدید را در لیست ببینید. کار حدود 5-10 دقیقه طول می کشد تا پردازش داده ها شروع شود.
- پس از پردازش داده ها، به Cloud Storage بروید و به دایرکتوری که Dataflow نتایج را در آن ذخیره می کند (
output_gcs_location
تعریف شده شما) بروید. شما باید لیستی از فایل های متنی را با یک فایل در هر پنجره ببینید. - فایل را دانلود کنید و محتوا را بررسی کنید. باید حاوی لیستی از کلمات جفت شده با تعداد آنها باشد. همچنین، از رابط خط فرمان برای بازرسی فایل ها استفاده کنید. می توانید با اجرای موارد زیر در یک سلول جدید در نوت بوک خود این کار را انجام دهید:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
- خروجی مشابه این را خواهید دید:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- همین! فراموش نکنید که کاری را که ایجاد کرده اید تمیز کرده و متوقف کنید (مرحله آخر این کد لبه را ببینید).
برای مثالی در مورد نحوه انجام این تبدیل در یک نوت بوک تعاملی، به نوت بوک Dataflow Word Count در نمونه نوت بوک خود مراجعه کنید.
یا می توانید نوت بوک خود را به عنوان یک اسکریپت اجرایی صادر کنید، فایل .py تولید شده را با استفاده از مراحل قبلی تغییر دهید و سپس خط لوله خود را به سرویس Dataflow مستقر کنید .
7. ذخیره نوت بوک
نوت بوک هایی که ایجاد می کنید به صورت محلی در نمونه نوت بوک در حال اجرا شما ذخیره می شوند. اگر در حین توسعه، نمونه نوت بوک را بازنشانی یا خاموش کنید، آن نوت بوک های جدید تا زمانی که در دایرکتوری /home/jupyter
ایجاد شده باشند، باقی خواهند ماند. با این حال، اگر یک نمونه نوت بوک حذف شود، آن نوت بوک نیز حذف می شود.
برای نگهداری نوت بوک های خود برای استفاده در آینده، آنها را به صورت محلی در ایستگاه کاری خود دانلود کنید، آنها را در GitHub ذخیره کنید ، یا آنها را به فرمت فایل دیگری صادر کنید.
8. تمیز کردن
پس از پایان استفاده از نمونه نوتبوک Apache Beam، منابعی را که در Google Cloud ایجاد کردهاید، با خاموش کردن نمونه نوتبوک و توقف کار پخش ، پاکسازی کنید، اگر یکی را اجرا کردهاید.
از طرف دیگر، اگر پروژهای را تنها با هدف این نرمافزار ایجاد کردهاید، میتوانید پروژه را کاملاً خاموش کنید .