۱. مرور کلی

جریان داده چیست؟
Dataflow یک سرویس مدیریتشده برای اجرای طیف گستردهای از الگوهای پردازش داده است. مستندات موجود در این سایت به شما نشان میدهد که چگونه خطوط لوله پردازش دادههای دستهای و جریانی خود را با استفاده از Dataflow مستقر کنید، از جمله دستورالعملهای استفاده از ویژگیهای سرویس.
کیت توسعه نرمافزار آپاچی بیم (Apache Beam SDK) یک مدل برنامهنویسی متنباز است که شما را قادر میسازد تا هم پایپلاینهای دستهای و هم پایپلاینهای استریمینگ را توسعه دهید. شما پایپلاینهای خود را با یک برنامه آپاچی بیم ایجاد میکنید و سپس آنها را روی سرویس Dataflow اجرا میکنید. مستندات آپاچی بیم اطلاعات مفهومی عمیق و مطالب مرجعی را برای مدل برنامهنویسی آپاچی بیم، SDKها و سایر اجراکنندهها ارائه میدهد.
تجزیه و تحلیل دادهها با سرعت بالا
جریان داده (Dataflow) امکان توسعه سریع و ساده خط لوله داده استریمینگ (Streaming Data Pipeline) را با تأخیر داده کمتر فراهم میکند.
سادهسازی عملیات و مدیریت
به تیمها اجازه دهید به جای مدیریت خوشههای سرور، روی برنامهنویسی تمرکز کنند، زیرا رویکرد بدون سرور Dataflow سربار عملیاتی را از حجم کار مهندسی داده حذف میکند.
کاهش هزینه کل مالکیت
مقیاسبندی خودکار منابع همراه با قابلیتهای پردازش دستهای بهینهشده از نظر هزینه، به این معنی است که Dataflow ظرفیت تقریباً نامحدودی را برای مدیریت حجم کار فصلی و متغیر شما بدون صرف هزینه اضافی ارائه میدهد.
ویژگیهای کلیدی
مدیریت خودکار منابع و متعادلسازی پویای کار
جریان داده، تأمین و مدیریت منابع پردازشی را خودکار میکند تا تأخیر را به حداقل و بهرهبرداری را به حداکثر برساند، به طوری که نیازی به راهاندازی دستی نمونهها یا رزرو آنها نباشد. پارتیشنبندی کار نیز خودکار و بهینه شده است تا به صورت پویا کارهای عقبمانده را متعادل کند. نیازی به دنبال کردن "کلیدهای میانبر" یا پیشپردازش دادههای ورودی شما نیست.
مقیاسبندی خودکار افقی
مقیاسبندی خودکار افقی منابع کارگر برای دستیابی به توان عملیاتی بهینه، منجر به نسبت قیمت به عملکرد کلی بهتر میشود.
قیمتگذاری انعطافپذیر زمانبندی منابع برای پردازش دستهای
برای پردازشهایی که زمانبندی کار در آنها انعطافپذیر است، مانند کارهای شبانه، زمانبندی منابع انعطافپذیر (FlexRS) قیمت پایینتری برای پردازش دستهای ارائه میدهد. این کارهای انعطافپذیر در صف قرار میگیرند و تضمین میشود که ظرف یک بازه زمانی شش ساعته برای اجرا بازیابی شوند.
این آموزش از https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven اقتباس شده است.
آنچه یاد خواهید گرفت
- نحوه ایجاد یک پروژه Maven با Apache Beam، با استفاده از Java SDK
- اجرای یک نمونه خط لوله با استفاده از کنسول پلتفرم گوگل کلود
- نحوه حذف سطل ذخیرهسازی ابری مرتبط و محتویات آن
آنچه نیاز دارید
چگونه از این آموزش استفاده خواهید کرد؟
تجربه خود را در استفاده از خدمات پلتفرم ابری گوگل چگونه ارزیابی میکنید؟
۲. تنظیمات و الزامات
تنظیم محیط خودتنظیم
- وارد Cloud Console شوید و یک پروژه جدید ایجاد کنید یا از یک پروژه موجود دوباره استفاده کنید. (اگر از قبل حساب Gmail یا G Suite ندارید، باید یکی ایجاد کنید .)
شناسه پروژه را به خاطر بسپارید، یک نام منحصر به فرد در تمام پروژههای Google Cloud (نام بالا قبلاً گرفته شده و برای شما کار نخواهد کرد، متاسفیم!). بعداً در این آزمایشگاه کد به آن PROJECT_ID گفته خواهد شد.
- در مرحله بعد، برای استفاده از منابع گوگل کلود، باید پرداخت را در Cloud Console فعال کنید .
اجرای این آزمایشگاه کد، اگر اصلاً هزینهای نداشته باشد، نباید هزینه زیادی داشته باشد. حتماً دستورالعملهای بخش «پاکسازی» را که به شما نحوه خاموش کردن منابع را آموزش میدهد، دنبال کنید تا پس از این آموزش، متحمل هزینه نشوید. کاربران جدید Google Cloud واجد شرایط برنامه آزمایشی رایگان ۳۰۰ دلاری هستند.
فعال کردن APIها
روی آیکون منو در سمت چپ بالای صفحه کلیک کنید.

از منوی کشویی ، APIها و خدمات > داشبورد را انتخاب کنید.

گزینه + فعال کردن APIها و سرویسها را انتخاب کنید.

در کادر جستجو عبارت "Compute Engine" را جستجو کنید. در لیست نتایج ظاهر شده، روی "Compute Engine API" کلیک کنید.

در صفحه Google Compute Engine روی Enable کلیک کنید.

پس از فعال شدن، برای بازگشت، روی فلش کلیک کنید.
حالا API های زیر را جستجو کنید و آنها را نیز فعال کنید:
- جریان داده ابری
- Stackdriver
- فضای ذخیرهسازی ابری
- ذخیرهسازی ابری با فرمت JSON
- بیگکوئری
- میخانه/زیرشبکه ابری
- فروشگاه داده ابری
- رابطهای برنامهنویسی کاربردی (API) مدیریت منابع ابری
۳. یک فضای ذخیرهسازی ابری جدید ایجاد کنید
در کنسول پلتفرم گوگل کلود ، روی آیکون منو در سمت چپ بالای صفحه کلیک کنید:

به پایین اسکرول کنید و در زیربخش Storage، Cloud Storage > Browser را انتخاب کنید:

اکنون باید مرورگر فضای ذخیرهسازی ابری را ببینید، و با فرض اینکه از پروژهای استفاده میکنید که در حال حاضر هیچ سطل فضای ذخیرهسازی ابری ندارد، دعوتی برای ایجاد یک سطل جدید خواهید دید. برای ایجاد آن، دکمه ایجاد سطل را فشار دهید:

یک نام برای سطل خود وارد کنید. همانطور که در کادر محاورهای ذکر شده است، نام سطلها باید در کل فضای ذخیرهسازی ابری منحصر به فرد باشد. بنابراین اگر نامی واضح مانند "test" انتخاب کنید، احتمالاً متوجه خواهید شد که شخص دیگری قبلاً سطلی با آن نام ایجاد کرده است و با خطا مواجه خواهید شد.
همچنین قوانینی در مورد اینکه چه کاراکترهایی در نام سطلها مجاز هستند وجود دارد. اگر نام سطل خود را با یک حرف یا عدد شروع و پایان دهید و فقط از خط تیره در وسط استفاده کنید، مشکلی نخواهید داشت. اگر سعی کنید از کاراکترهای ویژه استفاده کنید، یا سعی کنید نام سطل خود را با چیزی غیر از حرف یا عدد شروع یا پایان دهید، کادر محاورهای قوانین را به شما یادآوری میکند.

یک نام منحصر به فرد برای سطل خود وارد کنید و دکمه ایجاد را فشار دهید. اگر چیزی را انتخاب کنید که از قبل در حال استفاده است، پیام خطایی که در بالا نشان داده شده است را مشاهده خواهید کرد. هنگامی که با موفقیت یک سطل ایجاد کردید، به سطل جدید و خالی خود در مرورگر منتقل خواهید شد:

نام سطلی که میبینید، البته، متفاوت خواهد بود، زیرا آنها باید در تمام پروژهها منحصر به فرد باشند.
۴. شروع Cloud Shell
فعال کردن پوسته ابری
- از کنسول ابری، روی فعال کردن پوسته ابری کلیک کنید
.
اگر قبلاً Cloud Shell را شروع نکردهاید، یک صفحه میانی (در زیر صفحه) به شما نمایش داده میشود که توضیح میدهد چیست. در این صورت، روی ادامه کلیک کنید (و دیگر هرگز آن را نخواهید دید). آن صفحه یکبار مصرف به این شکل است:
آمادهسازی و اتصال به Cloud Shell فقط چند لحظه طول میکشد.
این ماشین مجازی با تمام ابزارهای توسعهای که نیاز دارید، مجهز شده است. این ماشین یک دایرکتوری خانگی ۵ گیگابایتی پایدار ارائه میدهد و در فضای ابری گوگل اجرا میشود که عملکرد شبکه و احراز هویت را تا حد زیادی بهبود میبخشد. بخش عمدهای از کار شما در این آزمایشگاه کد، اگر نگوییم همه، را میتوان به سادگی با یک مرورگر یا کرومبوک انجام داد.
پس از اتصال به Cloud Shell، باید ببینید که از قبل احراز هویت شدهاید و پروژه از قبل روی شناسه پروژه شما تنظیم شده است.
- برای تأیید احراز هویت، دستور زیر را در Cloud Shell اجرا کنید:
gcloud auth list
خروجی دستور
Credentialed Accounts
ACTIVE ACCOUNT
* <my_account>@<my_domain.com>
To set the active account, run:
$ gcloud config set account `ACCOUNT`
gcloud config list project
خروجی دستور
[core] project = <PROJECT_ID>
اگر اینطور نیست، میتوانید با این دستور آن را تنظیم کنید:
gcloud config set project <PROJECT_ID>
خروجی دستور
Updated property [core/project].
۵. ایجاد یک پروژه Maven
پس از راهاندازی Cloud Shell، بیایید با ایجاد یک پروژه Maven با استفاده از Java SDK برای Apache Beam شروع کنیم.
آپاچی بیم یک مدل برنامهنویسی متنباز برای خطوط لوله داده است. شما این خطوط لوله را با یک برنامه آپاچی بیم تعریف میکنید و میتوانید یک اجراکننده، مانند دیتافلو، را برای اجرای خط لوله خود انتخاب کنید.
دستور mvn archetype:generate را در پوسته خود به صورت زیر اجرا کنید:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
پس از اجرای دستور، باید یک دایرکتوری جدید به نام first-dataflow در دایرکتوری فعلی خود مشاهده کنید. first-dataflow شامل یک پروژه Maven است که شامل Cloud Dataflow SDK برای جاوا و نمونههایی از pipelineها میشود.
۶. اجرای یک خط لوله پردازش متن روی Cloud Dataflow
بیایید با ذخیره شناسه پروژه و نامهای سطل ذخیرهسازی ابری خود به عنوان متغیرهای محیطی شروع کنیم. میتوانید این کار را در Cloud Shell انجام دهید. حتماً <your_project_id> با شناسه پروژه خود جایگزین کنید.
export PROJECT_ID=<your_project_id>
حالا همین کار را برای مخزن ذخیرهسازی ابری انجام خواهیم داد. به یاد داشته باشید که <your_bucket_name> با نام منحصر به فردی که برای ایجاد مخزن خود در مرحله قبل استفاده کردهاید، جایگزین کنید.
export BUCKET_NAME=<your_bucket_name>
به دایرکتوری first-dataflow/ بروید.
cd first-dataflow
ما قصد داریم یک خط لوله به نام WordCount اجرا کنیم که متن را میخواند، خطوط متن را به کلمات مجزا توکنسازی میکند و شمارش فراوانی را روی هر یک از آن کلمات انجام میدهد. ابتدا خط لوله را اجرا میکنیم و در حین اجرا، نگاهی به آنچه در هر مرحله اتفاق میافتد، میاندازیم.
با اجرای دستور mvn compile exec:java در پوسته یا پنجره ترمینال خود، pipeline را شروع کنید. برای آرگومانهای --project, --stagingLocation, و --output ، دستور زیر به متغیرهای محیطی که قبلاً در این مرحله تنظیم کردهاید، اشاره دارد.
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
در حالی که کار در حال اجرا است، بیایید کار را در لیست کارها پیدا کنیم.
رابط کاربری وب Cloud Dataflow را در کنسول پلتفرم Google Cloud باز کنید. باید وظیفه شمارش کلمات خود را با وضعیت در حال اجرا (Running) ببینید:

حالا، بیایید به پارامترهای pipeline نگاهی بیندازیم. با کلیک روی نام شغل خود شروع کنید:

وقتی یک کار را انتخاب میکنید، میتوانید نمودار اجرا را مشاهده کنید. نمودار اجرای یک خط لوله، هر تبدیل در خط لوله را به صورت یک جعبه نشان میدهد که شامل نام تبدیل و برخی اطلاعات وضعیت است. برای مشاهده جزئیات بیشتر میتوانید روی عیار (carat) در گوشه سمت راست بالای هر مرحله کلیک کنید:

بیایید ببینیم که خط لوله چگونه دادهها را در هر مرحله تبدیل میکند:
- خواندن : در این مرحله، خط لوله از یک منبع ورودی میخواند. در این مورد، این یک فایل متنی از Cloud Storage با کل متن نمایشنامه شکسپیر «شاه لیر» است. خط لوله ما فایل را خط به خط میخواند و هر خط را به صورت
PCollectionخروجی میدهد، که در آن هر خط در فایل متنی ما یک عنصر در مجموعه است. - CountWords : مرحله
CountWordsدو بخش دارد. اول، از یک تابع موازی do (ParDo) به نامExtractWordsبرای توکنیزه کردن هر خط به کلمات منفرد استفاده میکند. خروجی ExtractWords یک PCollection جدید است که در آن هر عنصر یک کلمه است. مرحله بعدی،Count، از یک تبدیل ارائه شده توسط Java SDK استفاده میکند که جفتهای کلید و مقدار را برمیگرداند که در آن کلید یک کلمه منحصر به فرد است و مقدار تعداد دفعاتی است که آن کلمه رخ میدهد. در اینجا روش پیادهسازیCountWordsآمده است و میتوانید فایل کامل WordCount.java را در GitHub بررسی کنید:
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements : این تابع
FormatAsTextFnکه در زیر کپی شده است را فراخوانی میکند، که هر جفت کلید و مقدار را به یک رشته قابل چاپ قالببندی میکند.
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts : در این مرحله رشتههای قابل چاپ را در چندین فایل متنی خرد شده مینویسیم.
چند دقیقه دیگر نگاهی به خروجی حاصل از خط لوله خواهیم انداخت.
حالا به صفحه اطلاعات کار (Job info) در سمت راست نمودار نگاهی بیندازید، که شامل پارامترهای خط لوله (pipeline) است که ما در دستور mvn compile exec:java گنجاندهایم.


همچنین میتوانید شمارندههای سفارشی برای خط لوله را مشاهده کنید، که در این مورد نشان میدهد که تاکنون در طول اجرا با چند خط خالی مواجه شدهاید. میتوانید شمارندههای جدیدی را به خط لوله خود اضافه کنید تا معیارهای خاص برنامه را ردیابی کنید.

برای مشاهده پیامهای خطای خاص، میتوانید روی نماد Logs در پایین کنسول کلیک کنید.

این پنل به طور پیشفرض پیامهای گزارش کار را نشان میدهد که وضعیت کار را به طور کلی گزارش میدهند. میتوانید از انتخابگر حداقل شدت برای فیلتر کردن پیشرفت کار و پیامهای وضعیت استفاده کنید.

انتخاب یک مرحله از خط لوله در نمودار، نمای آن را به گزارشهای تولید شده توسط کد شما و کد تولید شده در حال اجرا در مرحله خط لوله تغییر میدهد.
برای بازگشت به گزارشهای کار، با کلیک کردن در خارج از نمودار یا استفاده از دکمه بستن در پنل سمت راست، مرحله را از حالت انتخاب خارج کنید.
شما میتوانید از دکمه Worker Logs در تب logs برای مشاهده گزارشهای Worker برای نمونههای Compute Engine که pipeline شما را اجرا میکنند، استفاده کنید. گزارشهای Worker شامل خطوط گزارش تولید شده توسط کد شما و کد تولید شده توسط Dataflow است که آن را اجرا میکند.
اگر میخواهید یک خطا را در خط لوله اشکالزدایی کنید، اغلب اوقات گزارشهای اضافی در Worker Logs وجود دارد که به حل مشکل کمک میکند. به خاطر داشته باشید که این گزارشها در تمام Workerها جمعآوری میشوند و میتوان آنها را فیلتر و جستجو کرد.

رابط نظارت بر جریان داده (Dataflow Monitoring Interface) فقط جدیدترین پیامهای گزارش (log messages) را نشان میدهد. میتوانید با کلیک روی پیوند Google Cloud Observability در سمت راست پنجره گزارشها، همه گزارشها را مشاهده کنید.

در اینجا خلاصهای از انواع مختلف گزارشهای موجود برای مشاهده از صفحه Monitoring→Logs آمده است:
- لاگهای پیامهای کاری شامل پیامهای سطح کاری هستند که اجزای مختلف Dataflow تولید میکنند. مثالهایی از این دست عبارتند از پیکربندی مقیاسبندی خودکار، زمان شروع یا خاموش شدن workerها، پیشرفت در مرحله کار و خطاهای کار. خطاهای سطح کاری که از خرابی کد کاربر سرچشمه میگیرند و در لاگهای کاری وجود دارند، به لاگهای پیامهای کاری نیز منتقل میشوند.
- لاگهای Worker توسط Workerهای Dataflow تولید میشوند. Workerها بیشتر کارهای مربوط به خط لوله (pipeline) را انجام میدهند (برای مثال، اعمال ParDos شما روی دادهها). لاگهای Worker حاوی پیامهایی هستند که توسط کد شما و Dataflow ثبت میشوند.
- لاگهای مربوط به راهاندازی کارگر (worker-startup logs) در اکثر کارهای Dataflow وجود دارند و میتوانند پیامهای مربوط به فرآیند راهاندازی را ثبت کنند. فرآیند راهاندازی شامل دانلود فایلهای jar یک کار از Cloud Storage و سپس راهاندازی کارگرها (workerها) است. اگر در راهاندازی کارگرها مشکلی وجود داشته باشد، این لاگها جای خوبی برای بررسی هستند.
- لاگهای shuffler حاوی پیامهایی از workerها هستند که نتایج عملیات موازی pipeline را تجمیع میکنند.
- لاگهای docker و kubelet حاوی پیامهایی مربوط به این فناوریهای عمومی هستند که در Workerهای Dataflow استفاده میشوند.
در مرحله بعد، بررسی خواهیم کرد که کار شما با موفقیت انجام شده است یا خیر.
۷. بررسی کنید که کار شما موفقیتآمیز بوده است یا خیر
رابط کاربری وب Cloud Dataflow را در کنسول پلتفرم Google Cloud باز کنید.
شما باید کار شمارش کلمات خود را با وضعیت «در حال اجرا» در ابتدا و سپس «موفق» ببینید:

اجرای این کار تقریباً 3-4 دقیقه طول خواهد کشید.
یادتان هست که پایپلاین را اجرا کردید و یک سطل خروجی مشخص کردید؟ بیایید نگاهی به نتیجه بیندازیم (چون نمیخواهید ببینید هر کلمه در شاه لیر چند بار تکرار شده است؟!). به مرورگر فضای ذخیرهسازی ابری در کنسول پلتفرم ابری گوگل برگردید. در سطل خود، باید فایلهای خروجی و فایلهای مرحلهبندی که کار شما ایجاد کرده است را ببینید:

۸. منابع خود را خاموش کنید
شما میتوانید منابع خود را از طریق کنسول پلتفرم ابری گوگل (Google Cloud Platform Console) غیرفعال کنید.
مرورگر Cloud Storage را در کنسول پلتفرم Google Cloud باز کنید.

کادر کنار سطلی که ایجاد کردهاید را علامت بزنید و برای حذف دائمی سطل و محتویات آن، روی DELETE کلیک کنید.


۹. تبریک میگویم!
شما یاد گرفتید که چگونه یک پروژه Maven با Cloud Dataflow SDK ایجاد کنید، یک نمونه pipeline را با استفاده از کنسول Google Cloud Platform اجرا کنید و مخزن Cloud Storage مرتبط و محتویات آن را حذف کنید.
اطلاعات بیشتر
- مستندات جریان داده: https://cloud.google.com/dataflow/docs/
مجوز
این اثر تحت مجوز عمومی Creative Commons Attribution 3.0 و مجوز Apache 2.0 منتشر شده است.