استفاده از نوت بوک با Google Cloud Dataflow

۱. مقدمه

جریان-داده-ابری.png

جریان داده ابری گوگل

آخرین به‌روزرسانی: ۵ ژوئیه ۲۰۲۳

جریان داده چیست؟

Dataflow یک سرویس مدیریت‌شده برای اجرای طیف گسترده‌ای از الگوهای پردازش داده است. مستندات موجود در این سایت به شما نشان می‌دهد که چگونه خطوط لوله پردازش داده‌های دسته‌ای و جریانی خود را با استفاده از Dataflow مستقر کنید، از جمله دستورالعمل‌های استفاده از ویژگی‌های سرویس.

کیت توسعه نرم‌افزار آپاچی بیم (Apache Beam SDK) یک مدل برنامه‌نویسی متن‌باز است که شما را قادر می‌سازد تا هم پایپ‌لاین‌های دسته‌ای و هم پایپ‌لاین‌های استریمینگ را توسعه دهید. شما پایپ‌لاین‌های خود را با یک برنامه آپاچی بیم ایجاد می‌کنید و سپس آنها را روی سرویس Dataflow اجرا می‌کنید. مستندات آپاچی بیم اطلاعات مفهومی عمیق و مطالب مرجعی را برای مدل برنامه‌نویسی آپاچی بیم، SDKها و سایر اجراکننده‌ها ارائه می‌دهد.

تجزیه و تحلیل داده‌ها با سرعت بالا

جریان داده (Dataflow) امکان توسعه سریع و ساده خط لوله داده استریمینگ (Streaming Data Pipeline) را با تأخیر داده کمتر فراهم می‌کند.

ساده‌سازی عملیات و مدیریت

به تیم‌ها اجازه دهید به جای مدیریت خوشه‌های سرور، روی برنامه‌نویسی تمرکز کنند، زیرا رویکرد بدون سرور Dataflow سربار عملیاتی را از حجم کار مهندسی داده حذف می‌کند.

کاهش هزینه کل مالکیت

مقیاس‌بندی خودکار منابع همراه با قابلیت‌های پردازش دسته‌ای بهینه‌شده از نظر هزینه، به این معنی است که Dataflow ظرفیت تقریباً نامحدودی را برای مدیریت حجم کار فصلی و متغیر شما بدون صرف هزینه اضافی ارائه می‌دهد.

ویژگی‌های کلیدی

مدیریت خودکار منابع و متعادل‌سازی پویای کار

جریان داده، تأمین و مدیریت منابع پردازشی را خودکار می‌کند تا تأخیر را به حداقل و بهره‌برداری را به حداکثر برساند، به طوری که نیازی به راه‌اندازی دستی نمونه‌ها یا رزرو آنها نباشد. پارتیشن‌بندی کار نیز خودکار و بهینه شده است تا به صورت پویا کارهای عقب‌مانده را متعادل کند. نیازی به دنبال کردن "کلیدهای میانبر" یا پیش‌پردازش داده‌های ورودی شما نیست.

مقیاس‌بندی خودکار افقی

مقیاس‌بندی خودکار افقی منابع کارگر برای دستیابی به توان عملیاتی بهینه، منجر به نسبت قیمت به عملکرد کلی بهتر می‌شود.

قیمت‌گذاری انعطاف‌پذیر زمان‌بندی منابع برای پردازش دسته‌ای

برای پردازش‌هایی که زمان‌بندی کار در آن‌ها انعطاف‌پذیر است، مانند کارهای شبانه، زمان‌بندی منابع انعطاف‌پذیر (FlexRS) قیمت پایین‌تری برای پردازش دسته‌ای ارائه می‌دهد. این کارهای انعطاف‌پذیر در صف قرار می‌گیرند و تضمین می‌شود که ظرف یک بازه زمانی شش ساعته برای اجرا بازیابی شوند.

آنچه به عنوان بخشی از این اجرا خواهید کرد

استفاده از اجراکننده تعاملی Apache Beam با نوت‌بوک‌های JupyterLab به شما امکان می‌دهد تا به صورت تکراری خطوط لوله را توسعه دهید، نمودار خط لوله خود را بررسی کنید و PCollections های جداگانه را در یک گردش کار read-eval-print-loop (REPL) تجزیه کنید. این نوت‌بوک‌های Apache Beam از طریق Vertex AI Workbench ، یک سرویس مدیریت‌شده که میزبان ماشین‌های مجازی نوت‌بوک از پیش نصب‌شده با جدیدترین چارچوب‌های علم داده و یادگیری ماشین است، در دسترس قرار می‌گیرند.

این آزمایشگاه کد بر قابلیت‌های معرفی‌شده توسط نوت‌بوک‌های آپاچی بیم تمرکز دارد.

آنچه یاد خواهید گرفت

  • نحوه ایجاد یک نمونه دفترچه یادداشت
  • ایجاد یک خط لوله پایه
  • خواندن داده‌ها از منبع نامحدود
  • مصورسازی داده‌ها
  • راه‌اندازی یک کار جریان داده از طریق نوت‌بوک
  • ذخیره کردن دفترچه یادداشت

آنچه نیاز دارید

  • یک پروژه پلتفرم ابری گوگل با قابلیت پرداخت.
  • Google Cloud Dataflow و Google Cloud PubSub فعال شدند.

۲. راه‌اندازی

  1. در کنسول ابری، در صفحه انتخاب پروژه، یک پروژه ابری را انتخاب یا ایجاد کنید.

مطمئن شوید که API های زیر را فعال کرده اید:

  • رابط برنامه‌نویسی کاربردی جریان داده
  • API ابری Pub/Sub
  • موتور محاسباتی
  • API نوت‌بوک‌ها

می‌توانید با بررسی صفحه API و خدمات، این موضوع را تأیید کنید.

در این راهنما، ما داده‌ها را از یک اشتراک Pub/Sub می‌خوانیم، بنابراین مطمئن شوید که حساب سرویس پیش‌فرض Compute Engine نقش Editor را دارد، یا نقش Pub/Sub Editor را به آن اعطا کنید .

۳. شروع کار با نوت‌بوک‌های آپاچی بیم

راه‌اندازی یک نمونه از نوت‌بوک‌های آپاچی بیم

  1. اجرای Dataflow در کنسول:

  1. با استفاده از منوی سمت چپ، صفحه Workbench را انتخاب کنید.
  2. مطمئن شوید که در برگه «یادداشت‌های مدیریت‌شده توسط کاربر» هستید.
  3. در نوار ابزار، روی «دفترچه یادداشت جدید» کلیک کنید.
  4. آپاچی بیم > بدون پردازنده‌های گرافیکی (GPU) را انتخاب کنید.
  5. در صفحه نوت‌بوک جدید ، یک زیرشبکه برای ماشین مجازی نوت‌بوک انتخاب کنید و روی «ایجاد» کلیک کنید.
  6. وقتی لینک فعال شد، روی Open JupyterLab کلیک کنید. Vertex AI Workbench یک نمونه جدید از نوت‌بوک Apache Beam ایجاد می‌کند.

۴. ایجاد خط لوله

ایجاد نمونه دفترچه یادداشت

به مسیر File > New > Notebook بروید و کرنلی را انتخاب کنید که Apache Beam 2.47 یا بالاتر باشد.

شروع به اضافه کردن کد به دفترچه یادداشت خود کنید

  • کد هر بخش را کپی کرده و در یک سلول جدید در دفترچه یادداشت خود قرار دهید.
  • سلول را اجرا کنید

6bd3dd86cc7cf802.png

استفاده از اجراکننده تعاملی Apache Beam با نوت‌بوک‌های JupyterLab به شما امکان می‌دهد تا به صورت تکراری خطوط لوله را توسعه دهید، نمودار خط لوله خود را بررسی کنید و PCollections های جداگانه را در یک گردش کار read-eval-print-loop (REPL) تجزیه کنید.

آپاچی بیم روی نمونه نوت‌بوک شما نصب شده است، بنابراین ماژول‌های interactive_runner و interactive_beam را در نوت‌بوک خود وارد کنید.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

اگر نوت‌بوک شما از سرویس‌های دیگر گوگل استفاده می‌کند، دستورات import زیر را اضافه کنید:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

تنظیم گزینه‌های تعاملی

دستور زیر مدت زمان ثبت داده‌ها را روی ۶۰ ثانیه تنظیم می‌کند. اگر می‌خواهید سریع‌تر تکرار شود، مدت زمان کمتری، مثلاً «۱۰ ثانیه» را تنظیم کنید.

ib.options.recording_duration = '60s'

برای گزینه‌های تعاملی بیشتر، به کلاس interactive_beam.options مراجعه کنید.

خط لوله را با استفاده از یک شیء InteractiveRunner مقداردهی اولیه کنید.

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

خواندن و مصورسازی داده‌ها

مثال زیر یک خط لوله آپاچی بیم را نشان می‌دهد که یک اشتراک برای موضوع Pub/Sub داده شده ایجاد می‌کند و از اشتراک می‌خواند.

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

این خط لوله کلمات را بر اساس پنجره‌ها از منبع می‌شمارد. این خط لوله پنجره‌بندی ثابتی ایجاد می‌کند که هر پنجره 10 ثانیه طول می‌کشد.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

پس از پنجره‌بندی داده‌ها، کلمات به صورت پنجره‌ای شمارش می‌شوند.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

مصورسازی داده‌ها

متد show() ‎، PCollection حاصل را در دفترچه یادداشت به صورت بصری نمایش می‌دهد.

ib.show(windowed_word_counts, include_window_info=True)

متد show، یک PCollection را به صورت جدولی نمایش می‌دهد.

برای نمایش بصری داده‌های خود، visualize_data=True را به متد show() ارسال کنید. یک سلول جدید اضافه کنید:

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

شما می‌توانید چندین فیلتر را به مصورسازی‌های خود اعمال کنید. مصورسازی زیر به شما امکان می‌دهد تا بر اساس برچسب و محور فیلتر کنید:

متد show، یک PCollection را به صورت مجموعه‌ای غنی از عناصر رابط کاربری قابل فیلتر، نمایش می‌دهد.

۵. استفاده از دیتافریم Pandas

یکی دیگر از مصورسازی‌های مفید در نوت‌بوک‌های آپاچی بیم، یک قاب داده پانداس است. مثال زیر ابتدا کلمات را به حروف کوچک تبدیل می‌کند و سپس فراوانی هر کلمه را محاسبه می‌کند.

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

متد collect() خروجی را در یک DataFrame از Pandas ارائه می‌دهد.

ib.collect(windowed_lower_word_counts, include_window_info=True)

متد collect که نشان‌دهنده‌ی یک PCollection در یک DataFrame در Pandas است.

۶. (اختیاری) اجرای کارهای Dataflow از نوت‌بوک شما

  1. برای اجرای کارها در Dataflow، به مجوزهای اضافی نیاز دارید. مطمئن شوید که حساب سرویس پیش‌فرض Compute Engine دارای نقش Editor است، یا نقش‌های IAM زیر را به آن اعطا کنید :
  • مدیر جریان داده
  • کارگر جریان داده
  • مدیر ذخیره‌سازی، و
  • کاربر حساب سرویس (roles/iam.serviceAccountUser)

اطلاعات بیشتر در مورد نقش‌ها را در مستندات ببینید.

  1. (اختیاری) قبل از استفاده از نوت‌بوک خود برای اجرای کارهای Dataflow، هسته را مجدداً راه‌اندازی کنید، تمام سلول‌ها را دوباره اجرا کنید و خروجی را تأیید کنید.
  2. دستورات import زیر را حذف کنید:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. عبارت import زیر را اضافه کنید:
from apache_beam.runners import DataflowRunner
  1. گزینه مدت زمان ضبط زیر را حذف کنید:
ib.options.recording_duration = '60s'
  1. موارد زیر را به گزینه‌های خط لوله خود اضافه کنید. شما باید مکان ذخیره‌سازی ابری را طوری تنظیم کنید که به سطلی که از قبل دارید اشاره کند، یا می‌توانید برای این منظور یک سطل جدید ایجاد کنید . همچنین می‌توانید مقدار منطقه را از us-central1 تغییر دهید.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. در سازنده‌ی beam.Pipeline() ، InteractiveRunner را با DataflowRunner جایگزین کنید. p شیء خط لوله‌ای است که از ایجاد خط لوله شما ایجاد شده است.
p = beam.Pipeline(DataflowRunner(), options=options)
  1. فراخوانی‌های تعاملی را از کد خود حذف کنید. برای مثال، show() ، collect() ، head() ، show_graph() و watch() را از کد خود حذف کنید.
  2. برای اینکه بتوانید نتایج را ببینید، باید یک sink اضافه کنید. در بخش قبلی، نتایج را در دفترچه یادداشت به صورت بصری نمایش می‌دادیم، اما این بار، کار را خارج از این دفترچه یادداشت - در Dataflow - اجرا می‌کنیم. بنابراین، به یک مکان خارجی برای نتایج خود نیاز داریم. در این مثال، نتایج را در فایل‌های متنی در GCS (Google Cloud Storage) خواهیم نوشت. از آنجایی که این یک خط لوله جریانی با پنجره‌بندی داده‌ها است، می‌خواهیم برای هر پنجره یک فایل متنی ایجاد کنیم. برای دستیابی به این هدف، مراحل زیر را به خط لوله خود اضافه کنید:
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. تابع p.run() را به انتهای کد pipeline خود اضافه کنید.
  2. حالا کد نوت‌بوک خود را بررسی کنید تا مطمئن شوید که همه تغییرات را اعمال کرده‌اید. باید چیزی شبیه به این باشد:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. سلول‌ها را اجرا کنید.
  2. شما باید خروجی مشابه زیر را ببینید:
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. برای تأیید اینکه آیا کار در حال اجرا است، به صفحه مشاغل Dataflow بروید. باید یک کار جدید در لیست مشاهده کنید. شروع پردازش داده‌ها توسط این کار حدود ۵ تا ۱۰ دقیقه طول خواهد کشید.
  2. پس از پردازش داده‌ها، به Cloud Storage بروید و به دایرکتوری که Dataflow نتایج را در آن ذخیره می‌کند ( output_gcs_location تعریف شده شما) بروید. باید لیستی از فایل‌های متنی را مشاهده کنید که در هر پنجره یک فایل وجود دارد. bfcc5ce9e46a8b14.png
  3. فایل را دانلود کنید و محتوای آن را بررسی کنید. این فایل باید شامل لیستی از کلمات جفت شده به همراه تعداد آنها باشد. روش دیگر، استفاده از رابط خط فرمان برای بررسی فایل‌ها است. می‌توانید این کار را با اجرای دستور زیر در یک سلول جدید در دفترچه یادداشت خود انجام دهید:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. خروجی مشابهی با این خواهید دید:

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. همین! فراموش نکنید که job ایجاد شده را پاک‌سازی و متوقف کنید (به مرحله آخر این آزمایشگاه کد مراجعه کنید).

برای مثالی در مورد نحوه انجام این تبدیل در یک دفترچه یادداشت تعاملی، به دفترچه یادداشت Dataflow Word Count در نمونه دفترچه یادداشت خود مراجعه کنید.

از طرف دیگر، می‌توانید نوت‌بوک خود را به عنوان یک اسکریپت اجرایی صادر کنید، فایل .py تولید شده را با استفاده از مراحل قبلی تغییر دهید و سپس خط لوله خود را در سرویس Dataflow مستقر کنید .

۷. ذخیره دفترچه یادداشت

دفترچه‌هایی که ایجاد می‌کنید به صورت محلی در نمونه‌ی در حال اجرای دفترچه یادداشت شما ذخیره می‌شوند. اگر در حین توسعه، نمونه‌ی دفترچه یادداشت را ریست یا خاموش کنید، آن دفترچه‌های جدید تا زمانی که در دایرکتوری /home/jupyter ایجاد شوند، باقی می‌مانند. با این حال، اگر یک نمونه دفترچه یادداشت حذف شود، آن دفترچه‌ها نیز حذف می‌شوند.

برای نگه داشتن دفترچه‌های یادداشت خود برای استفاده‌های بعدی، آن‌ها را به صورت محلی در محل کار خود دانلود کنید، در GitHub ذخیره کنید یا آن‌ها را به فرمت فایل دیگری صادر کنید.

۸. تمیز کردن

پس از اتمام استفاده از نمونه نوت‌بوک آپاچی بیم، منابعی را که در گوگل کلود ایجاد کرده‌اید، با خاموش کردن نمونه نوت‌بوک و متوقف کردن کار استریمینگ (در صورت اجرا) پاک کنید.

از طرف دیگر، اگر پروژه‌ای را صرفاً برای هدف این آزمایشگاه کد ایجاد کرده‌اید، می‌توانید پروژه را به طور کامل متوقف کنید .