۱. مقدمه

جریان داده ابری گوگل
آخرین بهروزرسانی: ۵ ژوئیه ۲۰۲۳
جریان داده چیست؟
Dataflow یک سرویس مدیریتشده برای اجرای طیف گستردهای از الگوهای پردازش داده است. مستندات موجود در این سایت به شما نشان میدهد که چگونه خطوط لوله پردازش دادههای دستهای و جریانی خود را با استفاده از Dataflow مستقر کنید، از جمله دستورالعملهای استفاده از ویژگیهای سرویس.
کیت توسعه نرمافزار آپاچی بیم (Apache Beam SDK) یک مدل برنامهنویسی متنباز است که شما را قادر میسازد تا هم پایپلاینهای دستهای و هم پایپلاینهای استریمینگ را توسعه دهید. شما پایپلاینهای خود را با یک برنامه آپاچی بیم ایجاد میکنید و سپس آنها را روی سرویس Dataflow اجرا میکنید. مستندات آپاچی بیم اطلاعات مفهومی عمیق و مطالب مرجعی را برای مدل برنامهنویسی آپاچی بیم، SDKها و سایر اجراکنندهها ارائه میدهد.
تجزیه و تحلیل دادهها با سرعت بالا
جریان داده (Dataflow) امکان توسعه سریع و ساده خط لوله داده استریمینگ (Streaming Data Pipeline) را با تأخیر داده کمتر فراهم میکند.
سادهسازی عملیات و مدیریت
به تیمها اجازه دهید به جای مدیریت خوشههای سرور، روی برنامهنویسی تمرکز کنند، زیرا رویکرد بدون سرور Dataflow سربار عملیاتی را از حجم کار مهندسی داده حذف میکند.
کاهش هزینه کل مالکیت
مقیاسبندی خودکار منابع همراه با قابلیتهای پردازش دستهای بهینهشده از نظر هزینه، به این معنی است که Dataflow ظرفیت تقریباً نامحدودی را برای مدیریت حجم کار فصلی و متغیر شما بدون صرف هزینه اضافی ارائه میدهد.
ویژگیهای کلیدی
مدیریت خودکار منابع و متعادلسازی پویای کار
جریان داده، تأمین و مدیریت منابع پردازشی را خودکار میکند تا تأخیر را به حداقل و بهرهبرداری را به حداکثر برساند، به طوری که نیازی به راهاندازی دستی نمونهها یا رزرو آنها نباشد. پارتیشنبندی کار نیز خودکار و بهینه شده است تا به صورت پویا کارهای عقبمانده را متعادل کند. نیازی به دنبال کردن "کلیدهای میانبر" یا پیشپردازش دادههای ورودی شما نیست.
مقیاسبندی خودکار افقی
مقیاسبندی خودکار افقی منابع کارگر برای دستیابی به توان عملیاتی بهینه، منجر به نسبت قیمت به عملکرد کلی بهتر میشود.
قیمتگذاری انعطافپذیر زمانبندی منابع برای پردازش دستهای
برای پردازشهایی که زمانبندی کار در آنها انعطافپذیر است، مانند کارهای شبانه، زمانبندی منابع انعطافپذیر (FlexRS) قیمت پایینتری برای پردازش دستهای ارائه میدهد. این کارهای انعطافپذیر در صف قرار میگیرند و تضمین میشود که ظرف یک بازه زمانی شش ساعته برای اجرا بازیابی شوند.
آنچه به عنوان بخشی از این اجرا خواهید کرد
استفاده از اجراکننده تعاملی Apache Beam با نوتبوکهای JupyterLab به شما امکان میدهد تا به صورت تکراری خطوط لوله را توسعه دهید، نمودار خط لوله خود را بررسی کنید و PCollections های جداگانه را در یک گردش کار read-eval-print-loop (REPL) تجزیه کنید. این نوتبوکهای Apache Beam از طریق Vertex AI Workbench ، یک سرویس مدیریتشده که میزبان ماشینهای مجازی نوتبوک از پیش نصبشده با جدیدترین چارچوبهای علم داده و یادگیری ماشین است، در دسترس قرار میگیرند.
این آزمایشگاه کد بر قابلیتهای معرفیشده توسط نوتبوکهای آپاچی بیم تمرکز دارد.
آنچه یاد خواهید گرفت
- نحوه ایجاد یک نمونه دفترچه یادداشت
- ایجاد یک خط لوله پایه
- خواندن دادهها از منبع نامحدود
- مصورسازی دادهها
- راهاندازی یک کار جریان داده از طریق نوتبوک
- ذخیره کردن دفترچه یادداشت
آنچه نیاز دارید
- یک پروژه پلتفرم ابری گوگل با قابلیت پرداخت.
- Google Cloud Dataflow و Google Cloud PubSub فعال شدند.
۲. راهاندازی
- در کنسول ابری، در صفحه انتخاب پروژه، یک پروژه ابری را انتخاب یا ایجاد کنید.
مطمئن شوید که API های زیر را فعال کرده اید:
- رابط برنامهنویسی کاربردی جریان داده
- API ابری Pub/Sub
- موتور محاسباتی
- API نوتبوکها
میتوانید با بررسی صفحه API و خدمات، این موضوع را تأیید کنید.
در این راهنما، ما دادهها را از یک اشتراک Pub/Sub میخوانیم، بنابراین مطمئن شوید که حساب سرویس پیشفرض Compute Engine نقش Editor را دارد، یا نقش Pub/Sub Editor را به آن اعطا کنید .
۳. شروع کار با نوتبوکهای آپاچی بیم
راهاندازی یک نمونه از نوتبوکهای آپاچی بیم
- اجرای Dataflow در کنسول:
- با استفاده از منوی سمت چپ، صفحه Workbench را انتخاب کنید.
- مطمئن شوید که در برگه «یادداشتهای مدیریتشده توسط کاربر» هستید.
- در نوار ابزار، روی «دفترچه یادداشت جدید» کلیک کنید.
- آپاچی بیم > بدون پردازندههای گرافیکی (GPU) را انتخاب کنید.
- در صفحه نوتبوک جدید ، یک زیرشبکه برای ماشین مجازی نوتبوک انتخاب کنید و روی «ایجاد» کلیک کنید.
- وقتی لینک فعال شد، روی Open JupyterLab کلیک کنید. Vertex AI Workbench یک نمونه جدید از نوتبوک Apache Beam ایجاد میکند.
۴. ایجاد خط لوله
ایجاد نمونه دفترچه یادداشت
به مسیر File > New > Notebook بروید و کرنلی را انتخاب کنید که Apache Beam 2.47 یا بالاتر باشد.
شروع به اضافه کردن کد به دفترچه یادداشت خود کنید
- کد هر بخش را کپی کرده و در یک سلول جدید در دفترچه یادداشت خود قرار دهید.
- سلول را اجرا کنید

استفاده از اجراکننده تعاملی Apache Beam با نوتبوکهای JupyterLab به شما امکان میدهد تا به صورت تکراری خطوط لوله را توسعه دهید، نمودار خط لوله خود را بررسی کنید و PCollections های جداگانه را در یک گردش کار read-eval-print-loop (REPL) تجزیه کنید.
آپاچی بیم روی نمونه نوتبوک شما نصب شده است، بنابراین ماژولهای interactive_runner و interactive_beam را در نوتبوک خود وارد کنید.
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
اگر نوتبوک شما از سرویسهای دیگر گوگل استفاده میکند، دستورات import زیر را اضافه کنید:
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
تنظیم گزینههای تعاملی
دستور زیر مدت زمان ثبت دادهها را روی ۶۰ ثانیه تنظیم میکند. اگر میخواهید سریعتر تکرار شود، مدت زمان کمتری، مثلاً «۱۰ ثانیه» را تنظیم کنید.
ib.options.recording_duration = '60s'
برای گزینههای تعاملی بیشتر، به کلاس interactive_beam.options مراجعه کنید.
خط لوله را با استفاده از یک شیء InteractiveRunner مقداردهی اولیه کنید.
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(InteractiveRunner(), options=options)
خواندن و مصورسازی دادهها
مثال زیر یک خط لوله آپاچی بیم را نشان میدهد که یک اشتراک برای موضوع Pub/Sub داده شده ایجاد میکند و از اشتراک میخواند.
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
این خط لوله کلمات را بر اساس پنجرهها از منبع میشمارد. این خط لوله پنجرهبندی ثابتی ایجاد میکند که هر پنجره 10 ثانیه طول میکشد.
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
پس از پنجرهبندی دادهها، کلمات به صورت پنجرهای شمارش میشوند.
windowed_word_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
مصورسازی دادهها
متد show() ، PCollection حاصل را در دفترچه یادداشت به صورت بصری نمایش میدهد.
ib.show(windowed_word_counts, include_window_info=True)

برای نمایش بصری دادههای خود، visualize_data=True را به متد show() ارسال کنید. یک سلول جدید اضافه کنید:
ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)
شما میتوانید چندین فیلتر را به مصورسازیهای خود اعمال کنید. مصورسازی زیر به شما امکان میدهد تا بر اساس برچسب و محور فیلتر کنید:

۵. استفاده از دیتافریم Pandas
یکی دیگر از مصورسازیهای مفید در نوتبوکهای آپاچی بیم، یک قاب داده پانداس است. مثال زیر ابتدا کلمات را به حروف کوچک تبدیل میکند و سپس فراوانی هر کلمه را محاسبه میکند.
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
متد collect() خروجی را در یک DataFrame از Pandas ارائه میدهد.
ib.collect(windowed_lower_word_counts, include_window_info=True)

۶. (اختیاری) اجرای کارهای Dataflow از نوتبوک شما
- برای اجرای کارها در Dataflow، به مجوزهای اضافی نیاز دارید. مطمئن شوید که حساب سرویس پیشفرض Compute Engine دارای نقش Editor است، یا نقشهای IAM زیر را به آن اعطا کنید :
- مدیر جریان داده
- کارگر جریان داده
- مدیر ذخیرهسازی، و
- کاربر حساب سرویس (roles/iam.serviceAccountUser)
اطلاعات بیشتر در مورد نقشها را در مستندات ببینید.
- (اختیاری) قبل از استفاده از نوتبوک خود برای اجرای کارهای Dataflow، هسته را مجدداً راهاندازی کنید، تمام سلولها را دوباره اجرا کنید و خروجی را تأیید کنید.
- دستورات import زیر را حذف کنید:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
- عبارت import زیر را اضافه کنید:
from apache_beam.runners import DataflowRunner
- گزینه مدت زمان ضبط زیر را حذف کنید:
ib.options.recording_duration = '60s'
- موارد زیر را به گزینههای خط لوله خود اضافه کنید. شما باید مکان ذخیرهسازی ابری را طوری تنظیم کنید که به سطلی که از قبل دارید اشاره کند، یا میتوانید برای این منظور یک سطل جدید ایجاد کنید . همچنین میتوانید مقدار منطقه را از
us-central1تغییر دهید.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
- در سازندهی
beam.Pipeline()،InteractiveRunnerرا باDataflowRunnerجایگزین کنید.pشیء خط لولهای است که از ایجاد خط لوله شما ایجاد شده است.
p = beam.Pipeline(DataflowRunner(), options=options)
- فراخوانیهای تعاملی را از کد خود حذف کنید. برای مثال،
show()،collect()،head()،show_graph()وwatch()را از کد خود حذف کنید. - برای اینکه بتوانید نتایج را ببینید، باید یک sink اضافه کنید. در بخش قبلی، نتایج را در دفترچه یادداشت به صورت بصری نمایش میدادیم، اما این بار، کار را خارج از این دفترچه یادداشت - در Dataflow - اجرا میکنیم. بنابراین، به یک مکان خارجی برای نتایج خود نیاز داریم. در این مثال، نتایج را در فایلهای متنی در GCS (Google Cloud Storage) خواهیم نوشت. از آنجایی که این یک خط لوله جریانی با پنجرهبندی دادهها است، میخواهیم برای هر پنجره یک فایل متنی ایجاد کنیم. برای دستیابی به این هدف، مراحل زیر را به خط لوله خود اضافه کنید:
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
- تابع
p.run()را به انتهای کد pipeline خود اضافه کنید. - حالا کد نوتبوک خود را بررسی کنید تا مطمئن شوید که همه تغییرات را اعمال کردهاید. باید چیزی شبیه به این باشد:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
p.run()
- سلولها را اجرا کنید.
- شما باید خروجی مشابه زیر را ببینید:
<DataflowPipelineResult <Job
clientRequestId: '20230623100011457336-8998'
createTime: '2023-06-23T10:00:33.447347Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2023-06-23_03_00_33-11346237320103246437'
location: 'us-central1'
name: 'beamapp-root-0623075553-503897-boh4u4wb'
projectId: 'your-project-id'
stageStates: []
startTime: '2023-06-23T10:00:33.447347Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
- برای تأیید اینکه آیا کار در حال اجرا است، به صفحه مشاغل Dataflow بروید. باید یک کار جدید در لیست مشاهده کنید. شروع پردازش دادهها توسط این کار حدود ۵ تا ۱۰ دقیقه طول خواهد کشید.
- پس از پردازش دادهها، به Cloud Storage بروید و به دایرکتوری که Dataflow نتایج را در آن ذخیره میکند (
output_gcs_locationتعریف شده شما) بروید. باید لیستی از فایلهای متنی را مشاهده کنید که در هر پنجره یک فایل وجود دارد.
- فایل را دانلود کنید و محتوای آن را بررسی کنید. این فایل باید شامل لیستی از کلمات جفت شده به همراه تعداد آنها باشد. روش دیگر، استفاده از رابط خط فرمان برای بررسی فایلها است. میتوانید این کار را با اجرای دستور زیر در یک سلول جدید در دفترچه یادداشت خود انجام دهید:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
- خروجی مشابهی با این خواهید دید:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- همین! فراموش نکنید که job ایجاد شده را پاکسازی و متوقف کنید (به مرحله آخر این آزمایشگاه کد مراجعه کنید).
برای مثالی در مورد نحوه انجام این تبدیل در یک دفترچه یادداشت تعاملی، به دفترچه یادداشت Dataflow Word Count در نمونه دفترچه یادداشت خود مراجعه کنید.
از طرف دیگر، میتوانید نوتبوک خود را به عنوان یک اسکریپت اجرایی صادر کنید، فایل .py تولید شده را با استفاده از مراحل قبلی تغییر دهید و سپس خط لوله خود را در سرویس Dataflow مستقر کنید .
۷. ذخیره دفترچه یادداشت
دفترچههایی که ایجاد میکنید به صورت محلی در نمونهی در حال اجرای دفترچه یادداشت شما ذخیره میشوند. اگر در حین توسعه، نمونهی دفترچه یادداشت را ریست یا خاموش کنید، آن دفترچههای جدید تا زمانی که در دایرکتوری /home/jupyter ایجاد شوند، باقی میمانند. با این حال، اگر یک نمونه دفترچه یادداشت حذف شود، آن دفترچهها نیز حذف میشوند.
برای نگه داشتن دفترچههای یادداشت خود برای استفادههای بعدی، آنها را به صورت محلی در محل کار خود دانلود کنید، در GitHub ذخیره کنید یا آنها را به فرمت فایل دیگری صادر کنید.
۸. تمیز کردن
پس از اتمام استفاده از نمونه نوتبوک آپاچی بیم، منابعی را که در گوگل کلود ایجاد کردهاید، با خاموش کردن نمونه نوتبوک و متوقف کردن کار استریمینگ (در صورت اجرا) پاک کنید.
از طرف دیگر، اگر پروژهای را صرفاً برای هدف این آزمایشگاه کد ایجاد کردهاید، میتوانید پروژه را به طور کامل متوقف کنید .