با استفاده از Cloud Run Jobs، BigQuery، و Gemini، خط لوله Insights Video خود را مقیاس دهید

۱. مرور کلی

در دنیای غنی از داده‌های امروزی، استخراج بینش‌های معنادار از محتوای بدون ساختار، به ویژه ویدیو، یک ضرورت مهم است. تصور کنید که نیاز به تجزیه و تحلیل صدها یا هزاران URL ویدیو، خلاصه کردن محتوای آنها، استخراج فناوری‌های کلیدی و حتی ایجاد جفت‌های پرسش و پاسخ برای مطالب آموزشی دارید. انجام این کارها به صورت تک تک نه تنها زمان‌بر است، بلکه ناکارآمد نیز می‌باشد. اینجاست که معماری‌های ابری مدرن می‌درخشند.

در این آزمایش، ما با استفاده از مجموعه قدرتمند سرویس‌های گوگل کلود، یعنی Cloud Run، BigQuery و Generative AI گوگل (Gemini)، به بررسی راهکار مقیاس‌پذیر و بدون سرور برای پردازش محتوای ویدیویی خواهیم پرداخت. ما سفر خود را از پردازش یک URL واحد تا هماهنگ‌سازی اجرای موازی در یک مجموعه داده بزرگ، بدون سربار مدیریت صف‌های پیچیده پیام‌رسانی و ادغام‌ها، به تفصیل شرح خواهیم داد.

چالش

وظیفه ما پردازش مجموعه‌ای بزرگ از محتوای ویدیویی، به ویژه با تمرکز بر جلسات آزمایشگاهی عملی بود. هدف، تجزیه و تحلیل هر ویدیو و تولید خلاصه‌ای ساختارمند، شامل عناوین فصل‌ها، مقدمه، دستورالعمل‌های گام به گام، فناوری‌های مورد استفاده و پرسش و پاسخ‌های مرتبط بود. این خروجی باید به طور کارآمد برای استفاده‌های بعدی در ساخت مواد آموزشی ذخیره می‌شد.

در ابتدا، ما یک سرویس Cloud Run ساده مبتنی بر HTTP داشتیم که می‌توانست یک URL را در یک زمان پردازش کند. این سرویس برای آزمایش و تحلیل‌های موردی به خوبی کار می‌کرد. با این حال، وقتی با لیستی از هزاران URL که از BigQuery تهیه شده بودند مواجه شدیم، محدودیت‌های این مدل تک درخواست-تک پاسخ آشکار شد. پردازش متوالی روزها، اگر نگوییم هفته‌ها، طول می‌کشید.

این فرصت ، تبدیل یک فرآیند دستی یا متوالی کند به یک گردش کار خودکار و موازی بود. با بهره‌گیری از فضای ابری، هدف ما این بود که:

  • پردازش داده‌ها به صورت موازی: کاهش قابل توجه زمان پردازش برای مجموعه داده‌های بزرگ.
  • از قابلیت‌های موجود هوش مصنوعی بهره ببرید: از قدرت Gemini برای تحلیل محتوای پیچیده استفاده کنید.
  • معماری بدون سرور را حفظ کنید: از مدیریت سرورها یا زیرساخت‌های پیچیده خودداری کنید.
  • متمرکزسازی داده‌ها: از BigQuery به عنوان تنها منبع معتبر برای URLهای ورودی و یک مقصد قابل اعتماد برای نتایج پردازش‌شده استفاده کنید.
  • ساخت یک خط لوله قوی: سیستمی ایجاد کنید که در برابر خرابی‌ها مقاوم باشد و به راحتی قابل مدیریت و نظارت باشد.

هدف

هماهنگ‌سازی پردازش موازی هوش مصنوعی با مشاغل ابری:

راهکار ما حول یک Cloud Run Job می‌چرخد که به عنوان یک هماهنگ‌کننده عمل می‌کند. این سرویس به طور هوشمندانه دسته‌هایی از URLها را از BigQuery می‌خواند، این URLها را به سرویس Cloud Run موجود و مستقر ما (که پردازش هوش مصنوعی را برای یک URL واحد انجام می‌دهد) ارسال می‌کند و سپس نتایج را برای نوشتن مجدد در BigQuery جمع‌آوری می‌کند. این رویکرد به ما امکان می‌دهد:

  • جداسازی هماهنگی از پردازش: وظیفه، گردش کار را مدیریت می‌کند، در حالی که سرویس جداگانه بر وظیفه هوش مصنوعی تمرکز دارد.
  • از موازی بودن کار Cloud Run استفاده کنید: این کار می‌تواند چندین نمونه کانتینر را برای فراخوانی همزمان سرویس هوش مصنوعی، مقیاس‌پذیر کند.
  • کاهش پیچیدگی: ما با مدیریت مستقیم فراخوانی‌های همزمان HTTP توسط وظیفه، به موازی‌سازی دست می‌یابیم و معماری را ساده می‌کنیم.

مورد استفاده

بینش‌های مبتنی بر هوش مصنوعی از ویدیوهای جلسه Code Vipassana

مورد استفاده خاص ما، تجزیه و تحلیل ویدیوهای جلسات Google Cloud از آزمایشگاه‌های عملی Code Vipassana بود. هدف، تولید خودکار مستندات ساختاریافته (طرح کلی فصل‌های کتاب) بود، از جمله:

  • عناوین فصل‌ها: عناوین مختصر برای هر بخش ویدیویی
  • مقدمه، زمینه: توضیح ارتباط ویدیو با یک مسیر یادگیری گسترده‌تر
  • چه چیزی ساخته خواهد شد: وظیفه یا هدف اصلی جلسه
  • فناوری‌های مورد استفاده: فهرستی از سرویس‌های ابری و سایر فناوری‌های ذکر شده
  • دستورالعمل‌های گام به گام: نحوه انجام کار، شامل قطعه کدها
  • آدرس‌های کد منبع/نسخه آزمایشی: لینک‌ها در ویدیو ارائه شده‌اند
  • بخش پرسش و پاسخ: ایجاد پرسش‌ها و پاسخ‌های مرتبط برای سنجش دانش.

جریان

8d7e83c296095fe0.png

جریان معماری

کلود ران چیست؟ مشاغل کلود ران کدامند؟

اجرای ابری

یک پلتفرم کاملاً مدیریت‌شده بدون سرور که به شما امکان اجرای کانتینرهای بدون وضعیت را می‌دهد. این پلتفرم برای سرویس‌های وب، APIها و میکروسرویس‌هایی که می‌توانند به‌طور خودکار بر اساس درخواست‌های ورودی مقیاس‌پذیر شوند، ایده‌آل است. شما یک تصویر کانتینر ارائه می‌دهید و Cloud Run بقیه کارها را انجام می‌دهد - از استقرار و مقیاس‌پذیری گرفته تا مدیریت زیرساخت. این پلتفرم در مدیریت بارهای کاری همزمان و درخواست-پاسخ عالی است.

مشاغل ابری

پیشنهادی که مکمل سرویس‌های Cloud Run است. Cloud Run Jobs برای وظایف پردازش دسته‌ای طراحی شده‌اند که باید تکمیل و سپس متوقف شوند. آن‌ها برای پردازش داده‌ها، ETL، استنتاج دسته‌ای یادگیری ماشین و هر وظیفه‌ای که شامل پردازش یک مجموعه داده به جای ارائه درخواست‌های زنده است، عالی هستند. یکی از ویژگی‌های کلیدی آن‌ها توانایی آن‌ها در مقیاس‌بندی تعداد نمونه‌های کانتینر (وظایف) در حال اجرا به طور همزمان برای پردازش یک دسته از کارها است و می‌توانند توسط منابع رویداد مختلف یا به صورت دستی فعال شوند.

تفاوت کلیدی

سرویس‌های Cloud Run برای برنامه‌های کاربردی طولانی‌مدت و مبتنی بر درخواست هستند. کارهای Cloud Run برای پردازش دسته‌ای محدود و وظیفه‌محور هستند که تا زمان تکمیل اجرا می‌شوند.

آنچه خواهید ساخت

یک برنامه جستجوی خرده فروشی

به عنوان بخشی از این، شما:

  1. ایجاد یک مجموعه داده BigQuery، جدول و دریافت داده‌ها (کد فراداده Vipassana)
  2. ایجاد یک تابع پایتون کلود ران برای پیاده‌سازی قابلیت هوش مصنوعی مولد (تبدیل ویدیو به فصل کتاب با فرمت json)
  3. یک برنامه پایتون برای انتقال داده‌ها به هوش مصنوعی ایجاد کنید - از BigQuery بخوانید و Cloud Run Functions Endpoint را برای دریافت اطلاعات فراخوانی کنید و متن را در BigQuery بنویسید.
  4. ساخت و کانتینرایز کردن برنامه
  5. پیکربندی یک کار Cloud Run با این کانتینر
  6. اجرا و نظارت بر کار
  7. گزارش نتیجه

الزامات

  • یک مرورگر، مانند کروم یا فایرفاکس
  • یک پروژه گوگل کلود با قابلیت پرداخت.

۲. قبل از شروع

ایجاد یک پروژه

  1. در کنسول گوگل کلود ، در صفحه انتخاب پروژه، یک پروژه گوگل کلود را انتخاب یا ایجاد کنید.
  2. مطمئن شوید که صورتحساب برای پروژه ابری شما فعال است. یاد بگیرید که چگونه بررسی کنید که آیا صورتحساب در یک پروژه فعال است یا خیر .

برای اعتبارهای Google Cloud: اگر می‌خواهید برای شروع کار از اعتبارهای Google Cloud استفاده کنید، از این لینک برای استفاده از اعتبارها استفاده کنید. می‌توانید دستورالعمل‌های اینجا را برای استفاده از آن دنبال کنید.

  1. شما از Cloud Shell ، یک محیط خط فرمان که در Google Cloud اجرا می‌شود، استفاده خواهید کرد. روی Activate Cloud Shell در بالای کنسول Google Cloud کلیک کنید.

تصویر دکمه فعال کردن Cloud Shell

  1. پس از اتصال به Cloud Shell، با استفاده از دستور زیر بررسی می‌کنید که آیا از قبل احراز هویت شده‌اید و پروژه روی شناسه پروژه شما تنظیم شده است یا خیر:
gcloud auth list
  1. دستور زیر را در Cloud Shell اجرا کنید تا تأیید شود که دستور gcloud از پروژه شما اطلاع دارد.
gcloud config list project
  1. اگر پروژه شما تنظیم نشده است، از دستور زیر برای تنظیم آن استفاده کنید:
gcloud config set project <YOUR_PROJECT_ID>
  1. فعال کردن API های مورد نیاز: روی لینک کلیک کنید و API ها را فعال کنید.

به عنوان یک روش جایگزین، می‌توانید از دستور gcloud برای این کار استفاده کنید. برای مشاهده دستورات و نحوه استفاده از gcloud به مستندات آن مراجعه کنید.

۳. راه‌اندازی پایگاه داده/انبار داده

بیگ‌کوئری (BigQuery) به عنوان ستون فقرات خط لوله داده ما عمل می‌کرد. ماهیت بدون سرور و بسیار مقیاس‌پذیر آن، آن را برای ذخیره داده‌های ورودی و همچنین نگهداری نتایج پردازش‌شده ایده‌آل می‌کند.

  • ذخیره‌سازی داده‌ها: BigQuery نقش انبار داده ما را ایفا می‌کرد. این انبار، فهرست URLهای ویدیو، وضعیت آنها (مثلاً در انتظار، در حال پردازش، تکمیل‌شده) و متن نهایی تولید شده را ذخیره می‌کند. این تنها منبع حقیقتی است که ویدیوها برای آن نیاز به پردازش دارند.
  • مقصد: جایی است که بینش‌های تولید شده توسط هوش مصنوعی در آن ذخیره می‌شوند و به راحتی برای برنامه‌های پایین‌دستی یا بررسی دستی قابل پرس‌وجو هستند. مجموعه داده‌های ما شامل جزئیات جلسات ویدیویی، به ویژه از محتوای «فصل‌های کدنویسی ویپاسانا» بود که اغلب شامل نمایش‌های فنی دقیق است.
  • جدول منبع: یک جدول BigQuery (مثلاً post_session_labs) حاوی رکوردهایی مانند:
  • شناسه (id): یک شناسه منحصر به فرد برای هر جلسه/ردیف.
  • url: آدرس اینترنتی ویدیو (مثلاً لینک یوتیوب یا لینک درایو قابل دسترسی).
  • وضعیت: رشته‌ای که وضعیت پردازش را نشان می‌دهد (مثلاً، در حال انتظار، در حال پردازش، تکمیل شده، ناموفق در پردازش).
  • context: یک فیلد رشته‌ای برای ذخیره خلاصه تولید شده توسط هوش مصنوعی.
  • دریافت داده‌ها: در این سناریو، داده‌ها با استفاده از اسکریپت‌های INSERT به BigQuery وارد می‌شدند. برای خط لوله ما، BigQuery نقطه شروع بود.

به کنسول BigQuery بروید، یک تب جدید باز کنید و دستورات SQL زیر را اجرا کنید:

--1. Create your dataset for the project

CREATE SCHEMA `<<YOUR_PROJECT_ID>>.cv_metadata`
OPTIONS(
  location = 'us-central1', -- Specify the location (e.g., 'US', 'EU', 'asia-east1')
  description = 'Code Vipassana Sessions Metadata' -- Optional: Add a description
);

--2. Create table

create table cv_metadata.post_session_labs(id STRING, descr STRING, url STRING, context STRING, status STRING);

۴. دریافت داده‌ها

حالا وقت آن رسیده که یک جدول با داده‌های مربوط به فروشگاه اضافه کنیم. به یک تب در BigQuery Studio بروید و دستورات SQL زیر را برای درج رکوردهای نمونه اجرا کنید:

--Insert sample data

insert into cv_metadata.post_session_labs(id,descr,url) values('10-1','Gen AI to Agents, where do I begin? Get started with building a single agent application on ADK Python SDK','https://youtu.be/tyqnQQXpxtI');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-2','Build an E2E multi-agent kitchen renovation app on ADK in Python with AlloyDB data and multiple tools','https://youtu.be/RdrMo2lNh0o');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-3','Augment your multiagent app with tools from MCP Toolbox for AlloyDB','https://youtu.be/9VVNh77Q3ZU?si=oQ4fhAX59Y3D5iWa');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-4','Build an agentic MCP client application using MCP Toolbox for BigQuery','https://youtu.be/HmluMag5s20');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-5','Build a travel agent using ADK & MCP Toolbox for Cloud SQL','https://youtu.be/IWg5CH6ZNs0');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-6','Build an E2E Patent Analysis Agent using ADK and Advanced Vector Search with AlloyDB','https://youtu.be/yCXJ3sk3Lxc');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-7','Getting Started with MCP, ADK and A2A','https://youtu.be/JcQ_DyWc0X0');

update cv_metadata.post_session_labs set status = PENDING' where id is not null;

۵. ایجاد عملکرد بینش ویدیویی

ما باید یک تابع Cloud Run ایجاد و مستقر کنیم تا هسته اصلی عملکرد، یعنی ایجاد یک فصل کتاب ساختاریافته از URL ویدیو، را پیاده‌سازی کنیم. برای اینکه بتوانیم به عنوان یک ابزار مستقل در جعبه ابزار endpoint به این تابع دسترسی داشته باشیم، ما یک تابع Cloud Run ایجاد و مستقر کرده‌ایم. همچنین می‌توانید این تابع را به عنوان یک تابع جداگانه در برنامه پایتون واقعی برای کار Cloud Run بگنجانید:

  1. در کنسول گوگل کلود، به صفحه Cloud Run بروید
  2. روی نوشتن یک تابع کلیک کنید.
  3. در فیلد نام سرویس، نامی را برای توصیف عملکرد خود وارد کنید. نام سرویس‌ها فقط باید با یک حرف شروع شوند و حداکثر شامل ۴۹ کاراکتر یا کمتر، شامل حروف، اعداد یا خط فاصله باشند. نام سرویس‌ها نمی‌توانند با خط فاصله تمام شوند و باید برای هر منطقه و پروژه منحصر به فرد باشند. نام سرویس بعداً قابل تغییر نیست و به صورت عمومی قابل مشاهده است. ( generate-video-insights **)**
  4. در لیست منطقه، از مقدار پیش‌فرض استفاده کنید، یا منطقه‌ای را که می‌خواهید عملکرد خود را در آن مستقر کنید، انتخاب کنید. (us-central1 را انتخاب کنید)
  5. در لیست Runtime، از مقدار پیش‌فرض استفاده کنید یا یک نسخه runtime انتخاب کنید. (پایتون ۳.۱۱ را انتخاب کنید)
  6. در بخش احراز هویت، گزینه «اجازه دسترسی عمومی» را انتخاب کنید.
  7. روی دکمه "ایجاد" کلیک کنید
  8. تابع ایجاد شده و با قالب main.py و requirements.txt بارگذاری می‌شود.
  9. آن را با فایل‌های main.py و requirements.txt از مخزن این پروژه جایگزین کنید.

نکته مهم: در فایل main.py، به یاد داشته باشید که <<YOUR_PROJECT_ID>> را با شناسه پروژه خود جایگزین کنید.

  1. نقطه پایانی را مستقر و ذخیره کنید تا بتوانید از آن در منبع خود برای کار Cloud Run استفاده کنید.

نقطه پایانی شما باید چیزی شبیه به این (یا چیزی شبیه به آن) باشد: https://generate-video-insights-<<YOUR_POJECT_NUMBER>>.us-central1.run.app

این قابلیت Cloud Run چیست؟

فلش Gemini 2.5 برای پردازش ویدیو

برای وظیفه اصلی درک و خلاصه‌سازی محتوای ویدیویی، ما از مدل Gemini 2.5 Flash گوگل استفاده کردیم. مدل‌های Gemini، مدل‌های هوش مصنوعی قدرتمند و چندوجهی هستند که قادر به درک و پردازش انواع مختلف ورودی، از جمله متن و با ادغام‌های خاص، ویدیو، می‌باشند.

در تنظیمات ما، ما مستقیماً فایل ویدیویی را به Gemini ندادیم. در عوض، یک پیام متنی ارسال کردیم که شامل URL ویدیو بود و به Gemini دستور دادیم که چگونه محتوای (فرضی) یک ویدیو را در آن URL تجزیه و تحلیل کند. در حالی که Gemini 2.5 Flash قادر به ورودی چندوجهی است، این خط لوله خاص از یک پیام متنی استفاده کرد که ماهیت ویدیو را توصیف می‌کرد (یک جلسه آزمایشگاهی عملی) و یک خروجی JSON ساختاریافته درخواست کرد. این امر از استدلال پیشرفته و درک زبان طبیعی Gemini برای استنباط و ترکیب اطلاعات بر اساس متن پیام استفاده می‌کند.

راهنمای Gemini: هدایت هوش مصنوعی

یک اعلان خوش‌ساخت برای مدل‌های هوش مصنوعی بسیار مهم است. اعلان ما طوری طراحی شده بود که اطلاعات بسیار خاصی را استخراج کرده و آن را در قالب JSON ساختاردهی کند، که باعث می‌شود برنامه ما به راحتی آن را تجزیه و تحلیل کند.

PROMPT_TEMPLATE = """
In the video at the following URL: {youtube_url}, which is a hands-on lab session:
Ignore the credits set-up part particularly the coupon code and credits link aspect should not be included in your analysis or the extaction of context. Also exclude any credentials that are explicit in the video.
Take only the first 30-40 minutes of the video without throwing any error.
Analyze the rest of the content of the video.
Extract and synthesize information to create a book chapter section with the following structure, formatted as a JSON string:
1. **chapter_title:** A concise and engaging title for the chapter.
2. **introduction_context:** Briefly explain the relevance of this video segment within a broader learning context.
3. **what_will_build:** Clearly state the specific task or goal accomplished in this video segment.
4. **technologies_and_services:** List all mentioned Google Cloud services and any other relevant technologies (e.g., programming languages, tools, frameworks).
5. **how_we_did_it:** Provide a clear, numbered step-by-step guide of the actions performed. Include any exact commands or code snippets as they appear in the video. Format code/commands using markdown backticks (e.g., `my-command`).
6. **source_code_url:** Provide a URL to the source code repository if mentioned or implied. If not available, use "N/A".
7. **demo_url:** Provide a URL to a demo if mentioned or implied. If not available, use "N/A".
8. **qa_segment:** Generate 10–15 relevant questions based on the content of this segment, along with concise answers. Ensure the questions are thought-provoking and test understanding of the material.
REMEMBER: Ignore the credits set-up part particularly the coupon code and credits link aspect should not be included in your analysis or the extaction of context. Also exclude any credentials that are explicit in the video.
Format the entire output as a JSON string. Ensure all keys and string values are enclosed in double quotes.
Example structure:
...
"""

این درخواست بسیار خاص است و Gemini را راهنمایی می‌کند تا به نوعی به عنوان یک مربی عمل کند. درخواست یک رشته JSON، خروجی ساختار یافته و قابل خواندن توسط ماشین را تضمین می‌کند.

در اینجا کد مربوط به تجزیه و تحلیل ورودی ویدیو و بازگرداندن متن آن آمده است:

def process_videos_batch(video_url: str, PROMPT_TEMPLATE: str) -> str:
    """
    Processes a video URL, generates chapter content using Gemini
    """
    formatted_prompt = PROMPT_TEMPLATE.format(youtube_url=video_url)
    try:

        client = genai.Client(vertexai=True,project='<<YOUR_PROJECT_ID>>',location='us-central1',http_options=HttpOptions(api_version="v1"))
        response = client.models.generate_content(
            model="gemini-2.5-flash",
            contents=formatted_prompt,
        )
        print(response.text)
    except Exception as e:
        print(f"An error occurred during content generation: {e}")
        return f"Error processing video: {e}"
    print(response.text)
    return response.text

این قطعه کد بالا عملکرد اصلی این مورد استفاده را نشان می‌دهد. این قطعه کد یک URL ویدیویی را دریافت می‌کند و از مدل Gemini از طریق کلاینت هوش مصنوعی Vertex برای تجزیه و تحلیل محتوای ویدیو و استخراج بینش‌های مرتبط با هر اعلان استفاده می‌کند. سپس زمینه استخراج شده برای پردازش بیشتر بازگردانده می‌شود. این نشان دهنده یک عملیات همزمان است که در آن Cloud Run Job منتظر تکمیل سرویس می‌ماند.

۶. توسعه اپلیکیشن پایپ‌لاین (پایتون)

منطق خط لوله مرکزی ما در کد منبع برنامه قرار دارد که در یک کار اجرای ابری (Cloud Run Job) کانتینر می‌شود و کل اجرای موازی را هماهنگ می‌کند. در اینجا نگاهی به بخش‌های کلیدی می‌اندازیم:

نقش هماهنگ‌کننده در مدیریت گردش کار و تضمین یکپارچگی داده‌ها:

# ... (imports and configuration) ...

def process_batch_from_bq(request_or_trigger_data=None):
    # ... (initial checks for config) ...
    BATCH_SIZE = 5 # Fetch 5 URLs at a time per job instance
    
    query = f"""
        SELECT url, id
        FROM `{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE_SOURCE}`
        WHERE status = 'PENDING'
        LIMIT {BATCH_SIZE}
    """
    try:
        logging.info(f"Fetching up to {BATCH_SIZE} pending URLs from BigQuery...")
        rows = bq_client.query(query).result() # job_should_wait=True is default for result()
        pending_urls_data = []
        for row in rows:
            pending_urls_data.append({"url": row.url, "id": row.id})
        
        if not pending_urls_data:
            logging.info("No pending URLs found. Job finished.")
            return "No pending URLs found. Job finished.", 200

        row_ids_to_process = [item["id"] for item in pending_urls_data]

        # --- Mark as PROCESSING to prevent duplicate work ---
        update_status_query = f"""
            UPDATE `{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE_SOURCE}`
            SET status = 'PROCESSING'
            WHERE id IN UNNEST(@row_ids_to_process)
        """
        
        status_update_job_config = bigquery.QueryJobConfig(
            query_parameters=[
                bigquery.ArrayQueryParameter("row_ids_to_process", "STRING", values=row_ids_to_process)
            ]
        )
        
        update_status_job = bq_client.query(update_status_query, job_config=status_update_job_config)
        update_status_job.result()
        logging.info(f"Marked {len(row_ids_to_process)} URLs as 'PROCESSING'.")

        # ... (rest of the code for parallel processing and writing) ...
    
    except Exception as e:
        # ... (error handling) ...

این قطعه کد بالا با دریافت دسته‌ای از URLهای ویدیو با وضعیت «در انتظار» از جدول منبع BigQuery آغاز می‌شود. سپس وضعیت این URLها را در BigQuery به «در حال پردازش» به‌روزرسانی می‌کند و از پردازش تکراری جلوگیری می‌کند.

پردازش موازی با ThreadPoolExecutor و فراخوانی سرویس پردازنده:

# ... (inside process_batch_from_bq function) ...

        # --- Step 3: Call the external URL Processor Service in parallel ---
        processed_results = {}
        futures = []

        # ThreadPoolExecutor for I/O-bound tasks (HTTP requests to the processor service)
        # MAX_CONCURRENT_TASKS_PER_INSTANCE controls parallelism within one job instance.
        with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_TASKS_PER_INSTANCE) as executor:
            for item in pending_urls_data:
                url = item["url"]
                row_id = item["id"]
                # Submit the task: call the processor service for this URL
                future = executor.submit(call_url_processor_service, url)
                futures.append((row_id, future)) 

            # Collect results as they complete
            for row_id, future in futures:
                try:
                    content = future.result(timeout=URL_PROCESSOR_TIMEOUT_SECONDS)
                    
                    # Check if the processor service returned an error message
                    if content.startswith("ERROR:"):
                        processed_results[row_id] = {"context": content, "status": "FAILED_PROCESSING"}
                    else:
                        processed_results[row_id] = {"context": content, "status": "COMPLETED"}
                        
                except TimeoutError:
                    logging.warning(f"URL processing timed out (service call for row ID {row_id}). Marking as FAILED.")
                    processed_results[row_id] = {"context": f"ERROR: Processing timed out for '{row_id}'.", "status": "FAILED_PROCESSING"}
                except Exception as e:
                    logging.error(f"Exception during future result retrieval for row ID {row_id}: {e}")
                    processed_results[row_id] = {"context": f"ERROR: Unexpected error during result retrieval for '{row_id}'. Details: {e}", "status": "FAILED_PROCESSING"}

این بخش از کد، از ThreadPoolExecutor برای دستیابی به پردازش موازی URLهای ویدیوی واکشی شده استفاده می‌کند. برای هر URL، وظیفه‌ای را برای فراخوانی سرویس Cloud Run (پردازنده URL) به صورت غیرهمزمان ارسال می‌کند. این امر به Cloud Run Job اجازه می‌دهد تا چندین ویدیو را به طور همزمان پردازش کند و عملکرد کلی خط لوله را افزایش دهد. این قطعه کد همچنین وقفه‌ها و خطاهای احتمالی از سرویس پردازنده را مدیریت می‌کند.

خواندن و نوشتن از & به BigQuery

تعامل اصلی با BigQuery شامل دریافت URL های در انتظار و سپس به‌روزرسانی آنها با نتایج پردازش شده است.

# ... (inside process_batch_from_bq) ...
    BATCH_SIZE = 5 
    query = f"""
        SELECT url, id
        FROM `{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE_SOURCE}`
        WHERE status = 'PENDING'
        LIMIT {BATCH_SIZE}
    """
    rows = bq_client.query(query).result()
    
    pending_urls_data = []
    for row in rows:
        pending_urls_data.append({"url": row.url, "id": row.id})
# ... (rest of fetching and marking as PROCESSING) ...

نوشتن نتایج به BigQuery:

# --- Step 4: Write results back to BigQuery ---
        logging.info(f"Writing {len(processed_results)} results back to BigQuery...")
        successful_updates = 0
        for row_id, data in processed_results.items():
            if update_bq_row(row_id, data["context"], data["status"]):
                successful_updates += 1
        
        logging.info(f"Finished processing. {successful_updates} out of {len(processed_results)} rows updated successfully.")
        # ... (return statement) ...

# --- Helper to update a single row in BigQuery ---
def update_bq_row(row_id, context, status="COMPLETED"):
    """Updates a specific row in the target BigQuery table."""
    # ... (checks for config) ...

    update_query = f"""
        UPDATE `{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE_TARGET}`
        SET
            context = @context,
            status = @status
        WHERE id = @row_id
    """

    # Correctly defining query parameters for the UPDATE statement
    job_config = bigquery.QueryJobConfig(
            query_parameters=[
                bigquery.ScalarQueryParameter("context", "STRING", value=context),
                bigquery.ScalarQueryParameter("status", "STRING", value=status),
                # Assuming 'id' column is STRING. Adjust if it's INT64.
                bigquery.ScalarQueryParameter("row_id", "STRING", value=row_id) 
            ]
        )

    try:
        update_job = bq_client.query(update_query, job_config=job_config)
        update_job.result() # Wait for the job to complete
        logging.info(f"Successfully updated BigQuery row ID {row_id} with status {status}.")
        return True
    except Exception as e:
        logging.error(f"Failed to update BigQuery row ID {row_id}: {e}")
        return False

قطعه کدهای بالا بر تعامل داده‌ها بین Cloud Run Job و BigQuery تمرکز دارند. این قطعه کد، مجموعه‌ای از URLهای ویدیویی «در انتظار» و شناسه‌های آنها را از جدول منبع بازیابی می‌کند. پس از پردازش URLها، این قطعه کد، نوشتن متن و وضعیت استخراج‌شده («COMPLETED» یا «FAILED_PROCESSING») را با استفاده از یک کوئری UPDATE به جدول BigQuery هدف نشان می‌دهد. این قطعه کد، حلقه پردازش داده‌ها را تکمیل می‌کند. همچنین شامل تابع کمکی update_bq_row است که نحوه تعریف پارامترهای دستور به‌روزرسانی را نشان می‌دهد.

تنظیمات برنامه

این برنامه به صورت یک اسکریپت پایتون واحد ساختار یافته است که به صورت کانتینری (containerize) در خواهد آمد. این برنامه از کتابخانه‌های کلاینت Google Cloud و functions-framework برای تعریف نقطه ورود خود استفاده می‌کند.

  • وابستگی‌ها: google-cloud-bigquery، درخواست‌ها
  • پیکربندی: تمام تنظیمات حیاتی (پروژه/مجموعه داده/جدول BigQuery، URL سرویس پردازشگر URL) از متغیرهای محیطی بارگذاری می‌شوند و این امر باعث می‌شود برنامه قابل حمل و ایمن باشد.
  • منطق اصلی: تابع process_batch_from_bq کل گردش کار را هماهنگ می‌کند.
  • یکپارچه‌سازی سرویس‌های خارجی: تابع call_url_processor_service ارتباط با سرویس جداگانه Cloud Run را مدیریت می‌کند.
  • تعامل BigQuery: bq_client برای دریافت URLها و به‌روزرسانی نتایج، با مدیریت صحیح پارامترها، استفاده می‌شود.
  • موازی‌سازی: concurrent.futures.ThreadPoolExecutor فراخوانی‌های همزمان به سرویس خارجی را مدیریت می‌کند.
  • نقطه ورود: کد پایتون با نام main.py به عنوان نقطه ورود عمل می‌کند که پردازش دسته‌ای را آغاز می‌کند.

حالا بیایید برنامه را راه‌اندازی کنیم:

  1. می‌توانید با رفتن به ترمینال Cloud Shell خود و کلون کردن مخزن شروع کنید:
git clone https://github.com/AbiramiSukumaran/video-context-crj
  1. به ویرایشگر Cloud Shell بروید ، جایی که می‌توانید پوشه تازه ایجاد شده video-context-crj را مشاهده کنید.
  2. موارد زیر را حذف کنید زیرا این مراحل قبلاً در بخش‌های قبلی انجام شده‌اند:
  3. پوشه Cloud_Run_Function را حذف کنید
  4. به پوشه پروژه video-context-crj بروید و باید ساختار پروژه را ببینید:

84ace76f8e20c668.png

۷. راه‌اندازی و کانتینرسازی داکرفایل

برای استقرار این منطق به عنوان یک کار ابری، باید آن را کانتینرایز کنیم. کانتینرایز کردن فرآیند بسته‌بندی کد برنامه، وابستگی‌های آن و زمان اجرا در یک تصویر قابل حمل است.

مطمئن شوید که متغیرهای متغیر (متن پررنگ) را با مقادیر خود در Dockerfile جایگزین می‌کنید:

# Use an official Python runtime as a parent image
FROM python:3.12-alpine

# Set the working directory in the container
WORKDIR /app

# Copy the requirements file into the container
COPY requirements.txt .

# Install any needed packages specified in requirements.txt
RUN pip install --trusted-host pypi.python.org -r requirements.txt

# Copy the rest of the application code
COPY . .

# Define environment variables for configuration (these will be overridden during deployment)
ENV BIGQUERY_PROJECT="YOUR-project"
ENV BIGQUERY_DATASET="YOUR-dataset"
ENV BIGQUERY_TABLE_SOURCE="YOUR-source-table"
ENV URL_PROCESSOR_SERVICE_URL="ENDPOINT FOR VIDEO PROCESSING" 
ENV BIGQUERY_TABLE_TARGET = "YOUR-destination-table"

ENTRYPOINT ["python", "main.py"]

قطعه کد Dockerfile بالا، ایمیج پایه را تعریف می‌کند، وابستگی‌ها را نصب می‌کند، کد ما را کپی می‌کند و دستور اجرای برنامه ما را با استفاده از functions-framework با تابع هدف صحیح (process_batch_from_bq) تنظیم می‌کند. سپس این ایمیج به رجیستری Artifact منتقل می‌شود.

کانتینر کردن

برای کانتینرایز کردن آن، به ترمینال Cloud Shell بروید و دستورات زیر را اجرا کنید (به یاد داشته باشید که جای خالی <<YOUR_PROJECT_ID>> را جایگزین کنید):

export CONTAINER_IMAGE="gcr.io/<<YOUR_PROJECT_ID>>/batch-url-processor-orchestrator:latest"

gcloud builds submit --tag $CONTAINER_IMAGE .

پس از ایجاد تصویر کانتینر، باید خروجی را مشاهده کنید:

eec4f4a2bc5745f2.png

اکنون کانتینر ما ایجاد و در رجیستری مصنوعات ذخیره شده است. می‌توانیم به مرحله بعدی برویم.

۸. ایجاد مشاغل ابری

استقرار این کار شامل ساخت تصویر کانتینر و سپس ایجاد یک منبع Cloud Run Job است.

ما قبلاً تصویر کانتینر را ایجاد و در رجیستری مصنوعات ذخیره کرده‌ایم. حالا بیایید کار را ایجاد کنیم.

  1. به کنسول Cloud Run Jobs بروید و روی Deploy Container کلیک کنید:

f3a1f4775000186e.png

  1. تصویر کانتینری که تازه ایجاد کردیم را انتخاب کنید:

90989f396ad6c30a.png

  1. سایر جزئیات پیکربندی را به شرح زیر وارد کنید:

b07fe386a4ae2797.png

  1. ظرفیت وظیفه را به صورت زیر تنظیم کنید:

۳۲۷a۰۵d۶۱e۱۳۳۷c۳.png

از آنجایی که ما نوشتن در پایگاه داده را داریم و این واقعیت که موازی‌سازی (max_instances و همزمانی وظایف) از قبل در کد مدیریت شده است، تعداد وظایف همزمان را روی ۱ تنظیم می‌کنیم. اما می‌توانید آن را طبق نیاز خود افزایش دهید. هدف در اینجا این است که وظایف طبق پیکربندی با سطح همزمانی تنظیم شده در موازی‌سازی، تا زمان تکمیل اجرا شوند.

  1. روی ایجاد کلیک کنید

کار Cloud Run شما با موفقیت ایجاد خواهد شد.

چگونه کار می‌کند؟

یک نمونه کانتینر از کار ما شروع می‌شود. این کار از BigQuery درخواست می‌کند تا یک دسته کوچک (BATCH_SIZE) از URLهایی که به عنوان PENDING علامت‌گذاری شده‌اند را دریافت کند. بلافاصله وضعیت این URLهای واکشی شده را در BigQuery به PROCESSING به‌روزرسانی می‌کند تا از انتخاب آنها توسط سایر نمونه‌های کار جلوگیری کند. این کار یک ThreadPoolExecutor ایجاد می‌کند و برای هر URL در دسته، یک وظیفه ارسال می‌کند. هر وظیفه تابع call_url_processor_service را فراخوانی می‌کند. همانطور که call_url_processor_service درخواست تکمیل (یا اتمام زمان/شکست) را می‌دهد، نتایج آنها (یا زمینه تولید شده توسط هوش مصنوعی یا یک پیام خطا) جمع‌آوری شده و به row_id اصلی نگاشت می‌شوند. پس از اتمام تمام وظایف برای دسته، کار از طریق نتایج جمع‌آوری شده تکرار می‌شود و زمینه و فیلدهای وضعیت را برای هر ردیف مربوطه در BigQuery به‌روزرسانی می‌کند. در صورت موفقیت، نمونه کار به طور کامل خارج می‌شود. اگر با خطاهای مدیریت نشده مواجه شود، یک استثنا ایجاد می‌کند که به طور بالقوه باعث تلاش مجدد توسط Cloud Run Jobs (بسته به پیکربندی کار) می‌شود.

چگونه مشاغل اجرای ابری در این حوزه قرار می‌گیرند: ارکستراسیون

اینجاست که Cloud Run Jobs واقعاً می‌درخشد.

پردازش دسته‌ای بدون سرور: ما زیرساخت مدیریت‌شده‌ای داریم که می‌تواند به تعداد مورد نیاز (تا MAX_INSTANCES) نمونه کانتینر ایجاد کند تا داده‌های ما را به صورت همزمان پردازش کند.

کنترل موازی‌سازی: ما MAX_INSTANCES (تعداد کارهایی که می‌توانند به صورت موازی اجرا شوند) و TASK_CONCURRENCY (تعداد عملیاتی که هر نمونه کار به صورت موازی انجام می‌دهد) را تعریف می‌کنیم. این امر کنترل دقیقی بر توان عملیاتی و میزان استفاده از منابع فراهم می‌کند.

تحمل خطا: اگر یک نمونه کار در میانه راه با شکست مواجه شود، می‌توان Cloud Run Jobs را طوری پیکربندی کرد که کل کار یا وظایف خاص را دوباره امتحان کند و اطمینان حاصل شود که پردازش داده‌ها از دست نمی‌رود.

معماری ساده‌شده: با هماهنگ‌سازی مستقیم فراخوانی‌های HTTP درون Job و استفاده از BigQuery برای مدیریت وضعیت، از پیچیدگی راه‌اندازی و مدیریت Pub/Sub، موضوعات آن، اشتراک‌ها و منطق تأیید جلوگیری می‌کنیم.

MAX_INSTANCES در مقابل TASK_CONCURRENCY:

MAX_INSTANCES: تعداد کل نمونه‌های کاری که می‌توانند به طور همزمان در کل اجرای کار شما اجرا شوند. این اهرم موازی‌سازی اصلی شما برای پردازش همزمان بسیاری از URLها است.

TASK_CONCURRENCY: تعداد عملیات موازی (فراخوانی‌های سرویس پردازنده شما) که یک نمونه از کار شما انجام خواهد داد. این به اشباع CPU/شبکه ​​از یک نمونه کمک می‌کند.

۹. اجرا و نظارت بر کار Cloud Run

فراداده ویدئو

قبل از اینکه روی اجرا کلیک کنیم، بیایید وضعیت داده‌ها را مشاهده کنیم.

به BigQuery Studio بروید و کوئری زیر را اجرا کنید:

Select id, descr, url, status from cv_metadata.post_session_labs where status = PENDING'

e9d99c2ed84d265f.png

ما چند رکورد نمونه با URL های ویدیویی و در وضعیت PENDING داریم. هدف ما این است که فیلد "context" را با اطلاعاتی از ویدیو در قالبی که در اعلان توضیح داده شده است، پر کنیم.

محرک شغلی

بیایید با کلیک بر روی دکمه EXECUTE روی کار در کنسول Cloud Run Jobs، کار را اجرا کنیم و باید بتوانید پیشرفت و وضعیت کارها را در کنسول مشاهده کنید:

۱۳f6a8892e6fd2bf.png

برای مشاهده مراحل نظارت و سایر جزئیات مربوط به کار و وظایف، می‌توانید تگ LOGS را در OBSERVAILITY بررسی کنید.

۱۰. تحلیل نتایج

پس از اتمام کار، باید بتوانید متن مربوط به هر URL ویدیویی که در جدول به‌روزرسانی شده است را مشاهده کنید:

۱۳۵f۸۵ad۱۴۱c۰۷۰b.png

زمینه خروجی (برای یکی از رکوردها)

{
  "chapter_title": "Building a Travel Agent with ADK and MCP Toolbox",
  "introduction_context": "This chapter section is derived from a hands-on lab session focused on building a travel agent. It details the process of integrating various Google Cloud services and tools to create an intelligent agent capable of querying a database and interacting with users.",
  "what_will_build": "The goal is to build and deploy a travel agent that can answer user queries about hotels using the Agent Development Kit (ADK) and the MCP Toolbox for Databases, connecting to a PostgreSQL database.",
  "technologies_and_services": [
    "Google Cloud Platform",
    "Cloud SQL for PostgreSQL",
    "Agent Development Kit (ADK)",
    "MCP Toolbox for Databases",
    "Cloud Shell",
    "Cloud Run",
    "Python",
    "Docker"
  ],
  "how_we_did_it": [
    "Provision a Cloud SQL instance for PostgreSQL with the 'hoteldb-instance'.",
    "Prepare the 'hotels' database by creating a table with relevant schema and populating it with sample data.",
    "Set up the MCP Toolbox for Databases by downloading and configuring the necessary components.",
    "Install the Agent Development Kit (ADK) and its dependencies.",
    "Create a new agent using the ADK, specifying the model (Gemini 2.0-flash) and backend (Vertex AI).",
    "Modify the agent's code to connect to the PostgreSQL database via the MCP Toolbox.",
    "Run the agent locally to test its functionality and ability to interact with the database.",
    "Deploy the agent to Cloud Run for cloud-based access and further testing.",
    "Interact with the deployed agent through a web console or command line to query hotel information."
  ],
  "source_code_url": "N/A",
  "demo_url": "N/A",
  "qa_segment": [
    {
      "question": "What is the primary purpose of the MCP Toolbox for Databases?",
      "answer": "The MCP Toolbox for Databases is an open-source MCP server designed to help users develop tools faster, more securely, and by handling complexities like connection pooling, authentication, and more."
    },
    {
      "question": "Which Google Cloud service is used to create the database for the travel agent?",
      "answer": "Cloud SQL for PostgreSQL is used to create the database."
    },
    {
      "question": "What is the role of the Agent Development Kit (ADK)?",
      "answer": "The ADK helps build Generative AI tools that allow agents to access data in a database. It enables agents to perform actions, interact with users, utilize external tools, and coordinate with other agents."
    },
    {
      "question": "What command is used to create the initial agent application using ADK?",
      "answer": "The command `adk create hotel-agent-app` is used to create the agent application."
    },
    ....

اکنون باید بتوانید این ساختار JSON را برای موارد استفاده پیشرفته‌تر عامل‌محور اعتبارسنجی کنید.

چرا این رویکرد؟

این معماری مزایای استراتژیک قابل توجهی را ارائه می‌دهد:

  • مقرون به صرفه بودن: سرویس‌های بدون سرور به این معنی است که شما فقط برای آنچه استفاده می‌کنید هزینه پرداخت می‌کنید. مقیاس کارهای ابری در صورت عدم استفاده به صفر می‌رسد.
  • مقیاس‌پذیری: با تنظیم نمونه کار Cloud Run و تنظیمات همزمانی، ده‌ها هزار URL را به راحتی مدیریت می‌کند.
  • چابکی: چرخه‌های توسعه و استقرار سریع برای منطق پردازش جدید یا مدل‌های هوش مصنوعی، تنها با به‌روزرسانی ساده‌ی برنامه‌ی کاربردی و سرویس آن.
  • کاهش سربار عملیاتی: نیازی به وصله کردن یا مدیریت سرور نیست؛ گوگل زیرساخت را مدیریت می‌کند.
  • دموکراتیزه کردن هوش مصنوعی: پردازش پیشرفته هوش مصنوعی را برای وظایف دسته‌ای بدون تخصص عمیق در عملیات یادگیری ماشین در دسترس قرار می‌دهد.

۱۱. تمیز کردن

برای جلوگیری از تحمیل هزینه به حساب Google Cloud خود برای منابع استفاده شده در این پست، این مراحل را دنبال کنید:

  1. در کنسول گوگل کلود، به صفحه مدیریت منابع بروید.
  2. در لیست پروژه‌ها، پروژه‌ای را که می‌خواهید حذف کنید انتخاب کنید و سپس روی «حذف» کلیک کنید.
  3. در کادر محاوره‌ای، شناسه پروژه را تایپ کنید و سپس برای حذف پروژه، روی خاموش کردن کلیک کنید.

۱۲. تبریک

تبریک! با معماری راهکار ما پیرامون Cloud Run Jobs و بهره‌گیری از قدرت BigQuery برای مدیریت داده‌ها و یک Cloud Run Service خارجی برای پردازش هوش مصنوعی، شما یک سیستم بسیار مقیاس‌پذیر، مقرون‌به‌صرفه و قابل نگهداری ساخته‌اید. این الگو منطق پردازش را از هم جدا می‌کند، امکان اجرای موازی را بدون زیرساخت پیچیده فراهم می‌کند و زمان رسیدن به بینش را به طور قابل توجهی تسریع می‌کند.

ما شما را تشویق می‌کنیم که برای نیازهای پردازش دسته‌ای خود، Cloud Run Jobs را بررسی کنید. چه تجزیه و تحلیل هوش مصنوعی در مقیاس بزرگ باشد، چه اجرای خطوط لوله ETL یا انجام وظایف داده‌ای دوره‌ای، این رویکرد بدون سرور، یک راه‌حل قدرتمند و کارآمد ارائه می‌دهد. برای شروع کار خودتان، این را بررسی کنید .

اگر کنجکاو هستید که تمام برنامه‌های خود را به صورت بدون سرور و عامل‌دار بسازید و مستقر کنید، در Code Vipassana ثبت نام کنید که بر تسریع برنامه‌های عامل‌دار مولد داده‌محور متمرکز است!