با استفاده از Cloud Run Jobs، BigQuery، و Gemini، خط لوله Insights Video خود را مقیاس دهید

1. بررسی اجمالی

در دنیای پر داده امروزی، استخراج بینش معنادار از محتوای بدون ساختار، به ویژه ویدئو، یک ضرورت مهم است. تصور کنید که نیاز به تجزیه و تحلیل صدها یا هزاران URL ویدیو، خلاصه کردن محتوای آنها، استخراج فناوری های کلیدی، و حتی ایجاد جفت پرسش و پاسخ برای مطالب آموزشی دارید. انجام یک به یک این کار نه تنها زمان بر است بلکه ناکارآمد است. اینجاست که معماری های ابری مدرن می درخشند.

در این آزمایشگاه، راه‌حل مقیاس‌پذیر و بدون سرور برای پردازش محتوای ویدیویی با استفاده از مجموعه خدمات قدرتمند Google Cloud را بررسی خواهیم کرد: Cloud Run، BigQuery و AI Generative Google (Gemini). ما سفر خود را از پردازش یک URL واحد تا هماهنگ کردن اجرای موازی در یک مجموعه داده بزرگ، بدون هزینه‌های سربار مدیریت صف‌های پیام‌رسانی پیچیده و ادغام، شرح می‌دهیم.

چالش

ما وظیفه داشتیم کاتالوگ بزرگی از محتوای ویدیویی را پردازش کنیم، به ویژه بر روی جلسات آزمایشگاهی عملی تمرکز کنیم. هدف تجزیه و تحلیل هر ویدیو و ایجاد یک خلاصه ساختاریافته، شامل عناوین فصل، زمینه مقدمه، دستورالعمل‌های گام به گام، فناوری‌های مورد استفاده، و جفت‌های پرسش و پاسخ مرتبط بود. این خروجی برای استفاده بعدی در مصالح آموزشی ساختمان باید به طور موثر ذخیره شود.

در ابتدا، ما یک سرویس Cloud Run ساده مبتنی بر HTTP داشتیم که می‌توانست یک URL را در یک زمان پردازش کند. این برای آزمایش و تجزیه و تحلیل ad-hoc به خوبی کار می کرد. با این حال، هنگامی که با لیستی از هزاران URL که منبع آن از BigQuery هستند، مواجه شدیم، محدودیت های این مدل تک درخواستی و تک پاسخی آشکار شد. پردازش متوالی روزها طول می کشد، اگر نه هفته ها.

فرصت تبدیل یک فرآیند متوالی دستی یا آهسته به یک گردش کار موازی و خودکار بود. با استفاده از ابر، هدف ما این بود:

  • پردازش داده ها به صورت موازی: به طور قابل توجهی زمان پردازش را برای مجموعه داده های بزرگ کاهش می دهد.
  • از قابلیت‌های هوش مصنوعی موجود استفاده کنید: از قدرت Gemini برای تجزیه و تحلیل محتوای پیچیده استفاده کنید.
  • حفظ معماری بدون سرور: از مدیریت سرورها یا زیرساخت های پیچیده خودداری کنید.
  • متمرکز کردن داده ها: از BigQuery به عنوان تنها منبع حقیقت برای URL های ورودی و یک مقصد قابل اعتماد برای نتایج پردازش شده استفاده کنید.
  • یک خط لوله قوی بسازید: سیستمی ایجاد کنید که در برابر شکست مقاوم باشد و به راحتی قابل مدیریت و نظارت باشد.

هدف

سازماندهی پردازش موازی هوش مصنوعی با وظایف اجرای ابری:

راه حل ما حول یک Cloud Run Job متمرکز است که به عنوان یک ارکستر عمل می کند. به طور هوشمند دسته‌ای از URL‌ها را از BigQuery می‌خواند، این URL‌ها را به سرویس Cloud Run موجود و مستقر ما (که پردازش هوش مصنوعی را برای یک URL واحد انجام می‌دهد) ارسال می‌کند و سپس نتایج را جمع‌آوری می‌کند تا آنها را دوباره در BigQuery بنویسد. این رویکرد به ما اجازه می دهد:

  • جدا کردن ارکستراسیون از پردازش: این کار گردش کار را مدیریت می کند، در حالی که سرویس جداگانه بر روی وظیفه هوش مصنوعی تمرکز می کند.
  • از موازی کاری Cloud Run Job استفاده کنید: این کار می تواند چندین نمونه کانتینر را برای تماس همزمان با سرویس هوش مصنوعی کاهش دهد.
  • کاهش پیچیدگی: ما با مدیریت مستقیم تماس‌های HTTP همزمان و ساده‌سازی معماری، به موازی‌سازی دست می‌یابیم.

استفاده از مورد

بینش های مبتنی بر هوش مصنوعی از ویدیوهای جلسه کد ویپاسانا

مورد استفاده خاص ما، تجزیه و تحلیل ویدیوهای جلسات Google Cloud آزمایشگاه‌های عملی Code Vipassana بود. هدف این بود که به طور خودکار اسناد ساختاریافته (رئوس مطالب فصل کتاب)، از جمله:

  • عناوین فصل: عناوین مختصر برای هر بخش ویدیو
  • زمینه مقدمه: توضیح ارتباط ویدیو در یک مسیر یادگیری گسترده تر
  • آنچه ساخته خواهد شد: وظیفه یا هدف اصلی جلسه
  • فناوری های مورد استفاده: فهرستی از خدمات ابری و سایر فناوری های ذکر شده
  • دستورالعمل های گام به گام: نحوه انجام کار، از جمله تکه های کد
  • کد منبع/ URL های نمایشی: پیوندهای ارائه شده در ویدیو
  • بخش پرسش و پاسخ: ایجاد پرسش ها و پاسخ های مرتبط برای بررسی دانش.

جریان

8d7e83c296095fe0.png

جریان معماری

Cloud Run چیست؟ Cloud Run Jobs چیست؟

Cloud Run

یک پلتفرم بدون سرور کاملاً مدیریت شده که به شما امکان می دهد کانتینرهای بدون حالت اجرا کنید. این برای سرویس‌های وب، APIها و میکروسرویس‌هایی که می‌توانند به طور خودکار بر اساس درخواست‌های دریافتی مقیاس شوند، ایده‌آل است. شما یک تصویر ظرف ارائه می‌دهید، و Cloud Run بقیه موارد را مدیریت می‌کند - از استقرار و مقیاس‌گذاری تا مدیریت زیرساخت. در مدیریت بارهای کاری همزمان و درخواست پاسخ عالی است.

مشاغل اجرای ابری

پیشنهادی که مکمل خدمات Cloud Run است. Cloud Run Jobs برای کارهای پردازش دسته ای طراحی شده است که باید تکمیل و سپس متوقف شوند. آنها برای پردازش داده، ETL، استنتاج دسته ای یادگیری ماشین و هر کاری که شامل پردازش مجموعه داده به جای ارائه درخواست های زنده است، عالی هستند. یکی از ویژگی‌های کلیدی، توانایی آن‌ها برای کوچک کردن تعداد نمونه‌های کانتینری (وظایف) است که به طور همزمان برای پردازش دسته‌ای از کار اجرا می‌شوند، و می‌توانند توسط منابع رویدادهای مختلف یا به صورت دستی فعال شوند.

تفاوت کلیدی

سرویس‌های Cloud Run برای برنامه‌های طولانی‌مدت و درخواست محور هستند. Cloud Run Jobs برای پردازش دسته ای محدود و وظیفه محور است که تا تکمیل اجرا می شود.

چیزی که خواهی ساخت

یک برنامه جستجوی خرده فروشی

به عنوان بخشی از این، شما:

  1. ایجاد یک مجموعه داده BigQuery، جدول و دریافت داده (Code Vipassana Metadata)
  2. ایجاد توابع اجرای ابری پایتون برای اجرای عملکرد هوش مصنوعی مولد (تبدیل ویدیو به کتاب فصل json)
  3. یک برنامه پایتون برای خط لوله داده به هوش مصنوعی ایجاد کنید - از BigQuery بخوانید و نقطه پایانی توابع اجرای Cloud را برای بینش فراخوانی کنید و زمینه را به BigQuery برگردانید.
  4. برنامه را بسازید و کانتینری کنید
  5. یک Cloud Run Jobs را با این کانتینر پیکربندی کنید
  6. اجرا و نظارت بر کار
  7. نتیجه را گزارش کنید

الزامات

  • مرورگری مانند کروم یا فایرفاکس
  • یک پروژه Google Cloud با فعال کردن صورت‌حساب.

2. قبل از شروع

یک پروژه ایجاد کنید

  1. در Google Cloud Console ، در صفحه انتخاب پروژه، یک پروژه Google Cloud را انتخاب یا ایجاد کنید.
  2. مطمئن شوید که صورتحساب برای پروژه Cloud شما فعال است. با نحوه بررسی فعال بودن صورت‌حساب در پروژه آشنا شوید.

برای اعتبارات Google Cloud: اگر می‌خواهید اعتبار Google Cloud را برای کمک به شروع کار دریافت کنید، از این پیوند برای بازخرید اعتبار استفاده کنید. برای بازخرید آن می توانید دستورالعمل های اینجا را دنبال کنید.

  1. شما از Cloud Shell ، یک محیط خط فرمان که در Google Cloud اجرا می شود، استفاده خواهید کرد. روی Activate Cloud Shell در بالای کنسول Google Cloud کلیک کنید.

تصویر دکمه Cloud Shell را فعال کنید

  1. پس از اتصال به Cloud Shell، با استفاده از دستور زیر بررسی می‌کنید که قبلاً احراز هویت شده‌اید و پروژه به ID پروژه شما تنظیم شده است:
gcloud auth list
  1. دستور زیر را در Cloud Shell اجرا کنید تا تأیید کنید که دستور gcloud از پروژه شما اطلاع دارد.
gcloud config list project
  1. اگر پروژه شما تنظیم نشده است، از دستور زیر برای تنظیم آن استفاده کنید:
gcloud config set project <YOUR_PROJECT_ID>
  1. فعال کردن API های مورد نیاز: پیوند را دنبال کنید و API ها را فعال کنید.

همچنین می توانید از دستور gcloud برای این کار استفاده کنید. برای دستورات و استفاده از gcloud به اسناد مراجعه کنید.

3. راه اندازی پایگاه داده / انبار

BigQuery به عنوان ستون فقرات خط لوله داده ما عمل کرد. ماهیت بدون سرور و بسیار مقیاس پذیر آن، آن را برای ذخیره داده های ورودی و نگهداری نتایج پردازش شده عالی می کند.

  • ذخیره سازی داده: BigQuery به عنوان انبار داده ما عمل کرد. فهرست نشانی‌های اینترنتی ویدیو، وضعیت آن‌ها (به عنوان مثال، در حال انتظار، پردازش، تکمیل شده) و زمینه نهایی تولید شده را ذخیره می‌کند. این تنها منبع حقیقت است که ویدیوها برای آن نیاز به پردازش دارند.
  • مقصد: جایی است که بینش‌های ایجاد شده توسط هوش مصنوعی ادامه دارد، و آنها را به راحتی برای برنامه‌های پایین دستی یا بررسی دستی قابل جستجو می‌کند. مجموعه داده ما شامل جزئیات جلسه ویدیویی بود، به ویژه از محتوای «فصل های کد ویپاسانا»، که اغلب شامل نمایش های فنی دقیق است.
  • جدول منبع: یک جدول BigQuery (به عنوان مثال، post_session_labs) حاوی رکوردهایی مانند:
  • id: یک شناسه منحصر به فرد برای هر جلسه/ردیف.
  • url: URL ویدیو (به عنوان مثال، پیوند YouTube یا پیوند درایو قابل دسترسی).
  • وضعیت: رشته ای که وضعیت پردازش را نشان می دهد (به عنوان مثال، در انتظار، پردازش، تکمیل، FAILED_PROCESSING).
  • متن: یک فیلد رشته ای برای ذخیره خلاصه تولید شده توسط هوش مصنوعی.
  • داده‌ها: در این سناریو، داده‌ها با اسکریپت‌های INSERT در BigQuery وارد شدند. برای خط لوله ما، BigQuery نقطه شروع بود.

به کنسول BigQuery بروید، یک تب جدید باز کنید و دستورات SQL زیر را اجرا کنید:

--1. Create your dataset for the project

CREATE SCHEMA `<<YOUR_PROJECT_ID>>.cv_metadata`
OPTIONS(
  location = 'us-central1', -- Specify the location (e.g., 'US', 'EU', 'asia-east1')
  description = 'Code Vipassana Sessions Metadata' -- Optional: Add a description
);

--2. Create table

create table cv_metadata.post_session_labs(id STRING, descr STRING, url STRING, context STRING, status STRING);

4. بلع داده ها

اکنون زمان اضافه کردن یک جدول با داده های مربوط به فروشگاه است. به یک برگه در BigQuery Studio بروید و دستورات SQL زیر را برای درج نمونه رکوردها اجرا کنید:

--Insert sample data

insert into cv_metadata.post_session_labs(id,descr,url) values('10-1','Gen AI to Agents, where do I begin? Get started with building a single agent application on ADK Python SDK','https://youtu.be/tyqnQQXpxtI');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-2','Build an E2E multi-agent kitchen renovation app on ADK in Python with AlloyDB data and multiple tools','https://youtu.be/RdrMo2lNh0o');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-3','Augment your multiagent app with tools from MCP Toolbox for AlloyDB','https://youtu.be/9VVNh77Q3ZU?si=oQ4fhAX59Y3D5iWa');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-4','Build an agentic MCP client application using MCP Toolbox for BigQuery','https://youtu.be/HmluMag5s20');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-5','Build a travel agent using ADK & MCP Toolbox for Cloud SQL','https://youtu.be/IWg5CH6ZNs0');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-6','Build an E2E Patent Analysis Agent using ADK and Advanced Vector Search with AlloyDB','https://youtu.be/yCXJ3sk3Lxc');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-7','Getting Started with MCP, ADK and A2A','https://youtu.be/JcQ_DyWc0X0');

5. ایجاد عملکرد بینش ویدئو

ما باید یک Cloud Run Function را ایجاد و استقرار دهیم تا هسته عملکردی را که ایجاد یک فصل کتاب ساختاریافته از URL ویدیو است، پیاده سازی کنیم. برای اینکه بتوانیم به این ابزار به عنوان یک ابزار جعبه ابزار نقطه پایانی مستقل دسترسی پیدا کنیم، ما به تازگی یک تابع اجرای ابری را ایجاد و اجرا کردیم. یا می توانید انتخاب کنید که این تابع را به عنوان یک تابع جداگانه در برنامه واقعی پایتون برای Cloud Run Job قرار دهید:

  1. در کنسول Google Cloud، به صفحه Cloud Run بروید
  2. روی نوشتن یک تابع کلیک کنید.
  3. در قسمت Service name، نامی را برای توصیف عملکرد خود وارد کنید. نام خدمات فقط باید با یک حرف شروع شود و حداکثر 49 کاراکتر یا کمتر از جمله حروف، اعداد یا خط فاصله داشته باشد. نام سرویس ها نمی توانند با خط تیره ختم شوند و باید در هر منطقه و پروژه منحصر به فرد باشند. نام سرویس را نمی توان بعداً تغییر داد و برای عموم قابل مشاهده است. ( تولید-ویدیو-بینش **)**
  4. در فهرست منطقه، از مقدار پیش‌فرض استفاده کنید یا منطقه‌ای را که می‌خواهید عملکرد خود را در آن مستقر کنید، انتخاب کنید. (us-central1 را انتخاب کنید)
  5. در لیست Runtime، از مقدار پیش فرض استفاده کنید یا یک نسخه زمان اجرا را انتخاب کنید. (Python 3.11 را انتخاب کنید)
  6. در بخش احراز هویت، "اجازه دسترسی عمومی" را انتخاب کنید.
  7. روی دکمه "ایجاد" کلیک کنید
  8. تابع ایجاد شده و با الگوی main.py و requires.txt بارگیری می شود
  9. آن را با فایل‌های: main.py و requires.txt از مخزن این پروژه جایگزین کنید

نکته مهم: در main.py، به یاد داشته باشید که <<YOUR_PROJECT_ID>> را با شناسه پروژه خود جایگزین کنید.

  1. نقطه پایانی را مستقر کرده و ذخیره کنید تا بتوانید از آن در منبع خود برای Cloud Run Job استفاده کنید.

نقطه پایانی شما باید شبیه این باشد (یا چیزی مشابه): https://generate-video-insights-<<YOUR_POJECT_NUMBER>>.us-central1.run.app

در این عملکرد Cloud Run چیست؟

فلش Gemini 2.5 برای پردازش ویدیو

برای وظیفه اصلی درک و خلاصه کردن محتوای ویدیویی، ما از مدل Gemini 2.5 Flash Google استفاده کردیم. مدل‌های Gemini مدل‌های قدرتمند و چندوجهی هوش مصنوعی هستند که قادر به درک و پردازش انواع ورودی‌ها، از جمله متن و با ادغام‌های خاص، ویدئو هستند.

در راه‌اندازی ما، فایل ویدیویی را مستقیماً به Gemini تغذیه نکردیم. در عوض، یک پیام متنی ارسال کردیم که شامل URL ویدیو بود و به Gemini آموزش دادیم که چگونه محتوای (فرضی) یک ویدیو را در آن URL تجزیه و تحلیل کند. در حالی که Gemini 2.5 Flash قادر به ورودی چندوجهی است، این خط لوله خاص از یک اعلان مبتنی بر متن استفاده می‌کند که ماهیت ویدیو را توصیف می‌کند (یک جلسه آزمایشگاهی عملی) و یک خروجی JSON ساختاریافته را درخواست می‌کند. این امر از استدلال پیشرفته و درک زبان طبیعی Gemini برای استنتاج و ترکیب اطلاعات بر اساس زمینه درخواست استفاده می کند.

The Gemini Prompt: هدایت هوش مصنوعی

یک دستور خوب برای مدل های هوش مصنوعی بسیار مهم است. درخواست ما برای استخراج اطلاعات بسیار خاص و ساختار آن به فرمت JSON طراحی شده است که به راحتی توسط برنامه ما قابل تجزیه است.

PROMPT_TEMPLATE = """
In the video at the following URL: {youtube_url}, which is a hands-on lab session:
Ignore the credits set-up part particularly the coupon code and credits link aspect should not be included in your analysis or the extaction of context. Also exclude any credentials that are explicit in the video.
Take only the first 30-40 minutes of the video without throwing any error.
Analyze the rest of the content of the video.
Extract and synthesize information to create a book chapter section with the following structure, formatted as a JSON string:
1. **chapter_title:** A concise and engaging title for the chapter.
2. **introduction_context:** Briefly explain the relevance of this video segment within a broader learning context.
3. **what_will_build:** Clearly state the specific task or goal accomplished in this video segment.
4. **technologies_and_services:** List all mentioned Google Cloud services and any other relevant technologies (e.g., programming languages, tools, frameworks).
5. **how_we_did_it:** Provide a clear, numbered step-by-step guide of the actions performed. Include any exact commands or code snippets as they appear in the video. Format code/commands using markdown backticks (e.g., `my-command`).
6. **source_code_url:** Provide a URL to the source code repository if mentioned or implied. If not available, use "N/A".
7. **demo_url:** Provide a URL to a demo if mentioned or implied. If not available, use "N/A".
8. **qa_segment:** Generate 10–15 relevant questions based on the content of this segment, along with concise answers. Ensure the questions are thought-provoking and test understanding of the material.
REMEMBER: Ignore the credits set-up part particularly the coupon code and credits link aspect should not be included in your analysis or the extaction of context. Also exclude any credentials that are explicit in the video.
Format the entire output as a JSON string. Ensure all keys and string values are enclosed in double quotes.
Example structure:
...
"""

این دستور بسیار خاص است و جوزا را راهنمایی می کند تا به عنوان یک آموزگار عمل کند. درخواست رشته JSON خروجی ساختار یافته و قابل خواندن توسط ماشین را تضمین می کند.

در اینجا کدی برای تجزیه و تحلیل ورودی ویدیو و برگرداندن متن آن آمده است:

def process_videos_batch(video_url: str, PROMPT_TEMPLATE: str) -> str:
    """
    Processes a video URL, generates chapter content using Gemini
    """
    formatted_prompt = PROMPT_TEMPLATE.format(youtube_url=video_url)
    try:

        client = genai.Client(vertexai=True,project='<<YOUR_PROJECT_ID>>',location='us-central1',http_options=HttpOptions(api_version="v1"))
        response = client.models.generate_content(
            model="gemini-2.5-flash",
            contents=formatted_prompt,
        )
        print(response.text)
    except Exception as e:
        print(f"An error occurred during content generation: {e}")
        return f"Error processing video: {e}"
    print(response.text)
    return response.text

این قطعه بالا عملکرد اصلی مورد استفاده را نشان می دهد. یک URL ویدیو را دریافت می کند و از مدل Gemini از طریق کلاینت Vertex AI برای تجزیه و تحلیل محتوای ویدیو و استخراج بینش های مرتبط با درخواست استفاده می کند. پس زمینه استخراج شده برای پردازش بیشتر برگردانده می شود. این یک عملیات همزمان را نشان می دهد که در آن Cloud Run Job منتظر می ماند تا سرویس کامل شود.

6. توسعه اپلیکیشن Pipeline (Python)

منطق خط لوله مرکزی ما در کد منبع برنامه قرار دارد که در یک Cloud Run Job قرار می گیرد، که کل اجرای موازی را هماهنگ می کند. در اینجا نگاهی به بخش های کلیدی داریم:

نقش ارکستراتور در مدیریت گردش کار و تضمین یکپارچگی داده ها:

# ... (imports and configuration) ...

def process_batch_from_bq(request_or_trigger_data=None):
    # ... (initial checks for config) ...
    BATCH_SIZE = 5 # Fetch 5 URLs at a time per job instance
    
    query = f"""
        SELECT url, id
        FROM `{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE_SOURCE}`
        WHERE status = 'PENDING'
        LIMIT {BATCH_SIZE}
    """
    try:
        logging.info(f"Fetching up to {BATCH_SIZE} pending URLs from BigQuery...")
        rows = bq_client.query(query).result() # job_should_wait=True is default for result()
        pending_urls_data = []
        for row in rows:
            pending_urls_data.append({"url": row.url, "id": row.id})
        
        if not pending_urls_data:
            logging.info("No pending URLs found. Job finished.")
            return "No pending URLs found. Job finished.", 200

        row_ids_to_process = [item["id"] for item in pending_urls_data]

        # --- Mark as PROCESSING to prevent duplicate work ---
        update_status_query = f"""
            UPDATE `{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE_SOURCE}`
            SET status = 'PROCESSING'
            WHERE id IN UNNEST(@row_ids_to_process)
        """
        
        status_update_job_config = bigquery.QueryJobConfig(
            query_parameters=[
                bigquery.ArrayQueryParameter("row_ids_to_process", "STRING", values=row_ids_to_process)
            ]
        )
        
        update_status_job = bq_client.query(update_status_query, job_config=status_update_job_config)
        update_status_job.result()
        logging.info(f"Marked {len(row_ids_to_process)} URLs as 'PROCESSING'.")

        # ... (rest of the code for parallel processing and writing) ...
    
    except Exception as e:
        # ... (error handling) ...

این قطعه بالا با واکشی دسته‌ای از نشانی‌های وب ویدیو با وضعیت «در انتظار» از جدول منبع BigQuery آغاز می‌شود. سپس وضعیت این URL ها را به «PROCESSING» در BigQuery به روز می کند و از پردازش تکراری جلوگیری می کند.

پردازش موازی با ThreadPoolExecutor و فراخوانی سرویس پردازنده:

# ... (inside process_batch_from_bq function) ...

        # --- Step 3: Call the external URL Processor Service in parallel ---
        processed_results = {}
        futures = []

        # ThreadPoolExecutor for I/O-bound tasks (HTTP requests to the processor service)
        # MAX_CONCURRENT_TASKS_PER_INSTANCE controls parallelism within one job instance.
        with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_TASKS_PER_INSTANCE) as executor:
            for item in pending_urls_data:
                url = item["url"]
                row_id = item["id"]
                # Submit the task: call the processor service for this URL
                future = executor.submit(call_url_processor_service, url)
                futures.append((row_id, future)) 

            # Collect results as they complete
            for row_id, future in futures:
                try:
                    content = future.result(timeout=URL_PROCESSOR_TIMEOUT_SECONDS)
                    
                    # Check if the processor service returned an error message
                    if content.startswith("ERROR:"):
                        processed_results[row_id] = {"context": content, "status": "FAILED_PROCESSING"}
                    else:
                        processed_results[row_id] = {"context": content, "status": "COMPLETED"}
                        
                except TimeoutError:
                    logging.warning(f"URL processing timed out (service call for row ID {row_id}). Marking as FAILED.")
                    processed_results[row_id] = {"context": f"ERROR: Processing timed out for '{row_id}'.", "status": "FAILED_PROCESSING"}
                except Exception as e:
                    logging.error(f"Exception during future result retrieval for row ID {row_id}: {e}")
                    processed_results[row_id] = {"context": f"ERROR: Unexpected error during result retrieval for '{row_id}'. Details: {e}", "status": "FAILED_PROCESSING"}

این بخش از کد از ThreadPoolExecutor برای دستیابی به پردازش موازی URL های ویدیویی واکشی شده استفاده می کند. برای هر URL، وظیفه ای برای فراخوانی ناهمزمان سرویس Cloud Run (پردازنده URL) ارسال می کند. این به Cloud Run Job اجازه می‌دهد تا چندین ویدیو را همزمان پردازش کند و عملکرد کلی خط لوله را افزایش دهد. این قطعه همچنین وقفه های زمانی و خطاهای احتمالی سرویس پردازنده را کنترل می کند.

خواندن و نوشتن از و به BigQuery

تعامل اصلی با BigQuery شامل واکشی URL های معلق و سپس به روز رسانی آنها با نتایج پردازش شده است.

# ... (inside process_batch_from_bq) ...
    BATCH_SIZE = 5 
    query = f"""
        SELECT url, id
        FROM `{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE_SOURCE}`
        WHERE status = 'PENDING'
        LIMIT {BATCH_SIZE}
    """
    rows = bq_client.query(query).result()
    
    pending_urls_data = []
    for row in rows:
        pending_urls_data.append({"url": row.url, "id": row.id})
# ... (rest of fetching and marking as PROCESSING) ...

بازنویسی نتایج به BigQuery:

# --- Step 4: Write results back to BigQuery ---
        logging.info(f"Writing {len(processed_results)} results back to BigQuery...")
        successful_updates = 0
        for row_id, data in processed_results.items():
            if update_bq_row(row_id, data["context"], data["status"]):
                successful_updates += 1
        
        logging.info(f"Finished processing. {successful_updates} out of {len(processed_results)} rows updated successfully.")
        # ... (return statement) ...

# --- Helper to update a single row in BigQuery ---
def update_bq_row(row_id, context, status="COMPLETED"):
    """Updates a specific row in the target BigQuery table."""
    # ... (checks for config) ...

    update_query = f"""
        UPDATE `{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE_TARGET}`
        SET
            context = @context,
            status = @status
        WHERE id = @row_id
    """

    # Correctly defining query parameters for the UPDATE statement
    job_config = bigquery.QueryJobConfig(
            query_parameters=[
                bigquery.ScalarQueryParameter("context", "STRING", value=context),
                bigquery.ScalarQueryParameter("status", "STRING", value=status),
                # Assuming 'id' column is STRING. Adjust if it's INT64.
                bigquery.ScalarQueryParameter("row_id", "STRING", value=row_id) 
            ]
        )

    try:
        update_job = bq_client.query(update_query, job_config=job_config)
        update_job.result() # Wait for the job to complete
        logging.info(f"Successfully updated BigQuery row ID {row_id} with status {status}.")
        return True
    except Exception as e:
        logging.error(f"Failed to update BigQuery row ID {row_id}: {e}")
        return False

تکه‌های بالا بر تعامل داده بین Cloud Run Job و BigQuery تمرکز دارند. مجموعه‌ای از نشانی‌های وب ویدیوی «در انتظار» و شناسه‌های آن‌ها را از جدول منبع بازیابی می‌کند. پس از پردازش URL ها، این قطعه، نوشتن متن و وضعیت استخراج شده ("COMPLETED" یا "FAILED_PROCESSING") را به جدول BigQuery هدف با استفاده از یک درخواست UPDATE نشان می دهد. این قطعه حلقه پردازش داده را کامل می کند. همچنین شامل تابع کمکی update_bq_row است که نحوه تعریف پارامترهای عبارت به روز رسانی را نشان می دهد.

راه اندازی برنامه

این برنامه به صورت یک اسکریپت پایتون منفرد ساخته شده است که کانتینری خواهد شد. از کتابخانه های سرویس گیرنده Google Cloud و چارچوب توابع استفاده می کند تا نقطه ورودی خود را تعریف کند.

  • وابستگی ها: google-cloud-bigquery، درخواست ها
  • پیکربندی: تمام تنظیمات حیاتی (پروژه BigQuery/مجموعه داده/جدول، URL سرویس پردازنده URL) از متغیرهای محیط بارگیری می شوند و برنامه را قابل حمل و ایمن می کنند.
  • منطق اصلی: تابع process_batch_from_bq کل گردش کار را هماهنگ می کند
  • یکپارچه سازی سرویس خارجی: تابع call_url_processor_service ارتباط با سرویس Cloud Run جداگانه را مدیریت می کند.
  • تعامل BigQuery: bq_client برای واکشی URL ها و به روز رسانی نتایج، با مدیریت صحیح پارامترها استفاده می شود.
  • Parallelism: concurrent.futures.ThreadPoolExecutor تماس های همزمان به سرویس خارجی را مدیریت می کند
  • نقطه ورود: کد پایتون با نام main.py به عنوان نقطه ورودی که پردازش دسته ای را آغاز می کند عمل می کند.

حالا بیایید برنامه را راه اندازی کنیم:

  1. می توانید با پیمایش به ترمینال Cloud Shell خود و شبیه سازی مخزن شروع کنید:
git clone https://github.com/AbiramiSukumaran/video-context-crj
  1. به ویرایشگر پوسته ابری بروید ، جایی که می توانید پوشه جدید ایجاد شده video-context-crj را ببینید.
  2. موارد زیر را حذف کنید زیرا این مراحل قبلاً در بخش‌های قبلی تکمیل شده‌اند:
  3. پوشه Cloud_Run_Function را حذف کنید
  4. به پوشه پروژه video-context-crj بروید و باید ساختار پروژه را ببینید:

84ace76f8e20c668.png

7. راه اندازی Dockerfile و Containerization

برای استقرار این منطق به عنوان یک Cloud Run Job، باید آن را محفظه بندی کنیم. Containerization فرآیند بسته بندی کد برنامه ما، وابستگی های آن و زمان اجرا در یک تصویر قابل حمل است.

اطمینان حاصل کنید که متغیرها (متن به صورت پررنگ) را با مقادیر خود در Dockerfile جایگزین کنید:

# Use an official Python runtime as a parent image
FROM python:3.12-alpine

# Set the working directory in the container
WORKDIR /app

# Copy the requirements file into the container
COPY requirements.txt .

# Install any needed packages specified in requirements.txt
RUN pip install --trusted-host pypi.python.org -r requirements.txt

# Copy the rest of the application code
COPY . .

# Define environment variables for configuration (these will be overridden during deployment)
ENV BIGQUERY_PROJECT="YOUR-project"
ENV BIGQUERY_DATASET="YOUR-dataset"
ENV BIGQUERY_TABLE_SOURCE="YOUR-source-table"
ENV URL_PROCESSOR_SERVICE_URL="ENDPOINT FOR VIDEO PROCESSING" 
ENV BIGQUERY_TABLE_TARGET = "YOUR-destination-table"

ENTRYPOINT ["python", "main.py"]

قطعه Dockerfile در بالا تصویر پایه را تعریف می‌کند، وابستگی‌ها را نصب می‌کند، کد ما را کپی می‌کند، و دستور اجرای برنامه ما را با استفاده از توابع-فریم ورک با تابع هدف صحیح (process_batch_from_bq) تنظیم می‌کند. سپس این تصویر به رجیستری مصنوع فرستاده می شود.

ظروف

برای کانتینری کردن آن، به ترمینال Cloud Shell رفته و دستورات زیر را اجرا کنید (به یاد داشته باشید که جای جای <<YOUR_PROJECT_ID>> را جایگزین کنید):

export CONTAINER_IMAGE="gcr.io/<<YOUR_PROJECT_ID>>/batch-url-processor-orchestrator:latest"

gcloud builds submit --tag $CONTAINER_IMAGE .

پس از ایجاد تصویر ظرف، باید خروجی را مشاهده کنید:

eec4f4a2bc5745f2.png

ظرف ما اکنون ایجاد و در رجیستری مصنوع ذخیره شده است. ما خوب هستیم که به مرحله بعدی برویم.

8. Cloud Run Jobs Creation

استقرار کار شامل ساخت تصویر ظرف و سپس ایجاد یک منبع Cloud Run Job است.

قبلاً تصویر کانتینر را ایجاد کرده‌ایم و در رجیستری مصنوع ذخیره کرده‌ایم. حالا بیایید کار را ایجاد کنیم.

  1. به کنسول Cloud Run Jobs بروید و روی Deploy Container کلیک کنید:

f3a1f4775000186e.png

  1. تصویر ظرفی را که به تازگی ایجاد کرده ایم انتخاب کنید:

90989f396ad6c30a.png

  1. سایر جزئیات پیکربندی را به صورت زیر وارد کنید:

b07fe386a4ae2797.png

  1. ظرفیت وظیفه را به صورت زیر تنظیم کنید:

327a05d61e1337c3.png

از آنجایی که ما نوشته های پایگاه داده داریم و این واقعیت که موازی سازی (max_instance ها و همزمانی وظایف) قبلاً در کد انجام می شود، تعداد کارهای همزمان را روی 1 تنظیم می کنیم. اما می توانید آن را به ازای نیاز خود افزایش دهید. هدف در اینجا این است که وظایف طبق پیکربندی با سطح همزمانی تنظیم شده به صورت موازی تکمیل شوند.

  1. روی ایجاد کلیک کنید

Cloud Run Job شما با موفقیت ایجاد خواهد شد.

چگونه کار می کند

یک نمونه ظرف از کار ما شروع می شود. BigQuery را درخواست می‌کند تا دسته کوچکی (BATCH_SIZE) از نشانی‌های اینترنتی را که به‌عنوان PENDING علامت‌گذاری شده‌اند، دریافت کند. بلافاصله وضعیت این نشانی‌های وب واکشی شده را به PROCESSING در BigQuery به‌روزرسانی می‌کند تا دیگر نمونه‌های شغلی نتوانند آن‌ها را دریافت کنند. این یک ThreadPoolExecutor ایجاد می کند و یک کار برای هر URL در دسته ارسال می کند. هر وظیفه تابع call_url_processor_service را فراخوانی می کند. هنگامی که درخواست‌های call_url_processor_service کامل می‌شوند (یا زمان‌بندی/شکست)، نتایج آن‌ها (چه زمینه ایجاد شده توسط هوش مصنوعی یا یک پیام خطا) جمع‌آوری می‌شوند و به row_id اصلی نگاشت می‌شوند. پس از اتمام تمام وظایف دسته، کار از طریق نتایج جمع آوری شده تکرار می شود و زمینه و فیلدهای وضعیت را برای هر ردیف مربوطه در BigQuery به روز می کند. اگر موفقیت آمیز باشد، نمونه کار به طور کامل خارج می شود. اگر با خطاهای کنترل نشده مواجه شود، یک استثنا ایجاد می کند، که احتمالاً باعث ایجاد یک امتحان مجدد توسط Cloud Run Jobs (بسته به پیکربندی کار) می شود.

نحوه جابجایی مشاغل Cloud Run: ارکستراسیون

اینجاست که Cloud Run Jobs واقعا می درخشد.

پردازش دسته‌ای بدون سرور: زیرساخت‌های مدیریت‌شده‌ای را دریافت می‌کنیم که می‌تواند به تعداد مورد نیاز (تا MAX_INSTANCES) برای پردازش داده‌های ما به صورت همزمان بچرخد.

کنترل موازی: ما MAX_INSTANCES (چند کار می تواند به طور کلی به صورت موازی اجرا شود) و TASK_CONCURRENCY (چند عملیات هر نمونه کار به صورت موازی انجام می دهد) را تعریف می کنیم. این کنترل دقیقی را بر روی توان عملیاتی و استفاده از منابع فراهم می کند.

تحمل خطا: اگر یک نمونه کار در میانه راه با شکست مواجه شود، Cloud Run Jobs را می توان به گونه ای پیکربندی کرد که کل کار یا کارهای خاص را مجدداً امتحان کند و اطمینان حاصل شود که پردازش داده ها از بین نمی رود.

معماری ساده شده: با هماهنگ کردن تماس‌های HTTP مستقیماً در Job و استفاده از BigQuery برای مدیریت حالت، از پیچیدگی راه‌اندازی و مدیریت Pub/Sub، موضوعات، اشتراک‌ها و منطق تأیید آن جلوگیری می‌کنیم.

MAX_INSTANCES در مقابل TASK_CONCURRENCY:

MAX_INSTANCES: تعداد کل نمونه های کار که می توانند به طور همزمان در کل اجرای کار شما اجرا شوند. این اهرم موازی اصلی شما برای پردازش چندین URL در یک زمان است.

TASK_CONCURRENCY: تعداد عملیات موازی (تماس با سرویس پردازنده شما) که یک نمونه از کار شما انجام خواهد داد. این به اشباع CPU/شبکه ​​یک نمونه کمک می کند.

9. اجرا و نظارت بر Cloud Run Job

فراداده ویدیویی

قبل از اینکه روی اجرا کلیک کنیم، بیایید وضعیت داده ها را مشاهده کنیم.

به استودیوی BigQuery بروید و کوئری زیر را اجرا کنید:

Select id, descr, url, status from cv_metadata.post_session_labs where status = PENDING'

e9d99c2ed84d265f.png

ما چند نمونه رکورد با نشانی‌های اینترنتی ویدیو و در وضعیت معلق داریم. هدف ما پر کردن فیلد «زمینه» با اطلاعاتی از ویدیو در قالب توضیح داده شده در درخواست است.

ماشه کار

بیایید با کلیک بر روی دکمه EXECUTE روی کار در کنسول Cloud Run Jobs، کار را اجرا کنیم و باید بتوانید پیشرفت و وضعیت کارها را در کنسول مشاهده کنید:

13f6a8892e6fd2bf.png

می‌توانید برچسب LOGS را در OBSERVABILITY برای نظارت بر مراحل و سایر جزئیات در مورد کار و وظایف بررسی کنید.

10. تجزیه و تحلیل نتایج

پس از تکمیل کار، باید بتوانید زمینه هر URL ویدیویی را که در جدول به روز شده است، مشاهده کنید:

135f85ad141c070b.png

متن خروجی (برای یکی از رکوردها)

{
  "chapter_title": "Building a Travel Agent with ADK and MCP Toolbox",
  "introduction_context": "This chapter section is derived from a hands-on lab session focused on building a travel agent. It details the process of integrating various Google Cloud services and tools to create an intelligent agent capable of querying a database and interacting with users.",
  "what_will_build": "The goal is to build and deploy a travel agent that can answer user queries about hotels using the Agent Development Kit (ADK) and the MCP Toolbox for Databases, connecting to a PostgreSQL database.",
  "technologies_and_services": [
    "Google Cloud Platform",
    "Cloud SQL for PostgreSQL",
    "Agent Development Kit (ADK)",
    "MCP Toolbox for Databases",
    "Cloud Shell",
    "Cloud Run",
    "Python",
    "Docker"
  ],
  "how_we_did_it": [
    "Provision a Cloud SQL instance for PostgreSQL with the 'hoteldb-instance'.",
    "Prepare the 'hotels' database by creating a table with relevant schema and populating it with sample data.",
    "Set up the MCP Toolbox for Databases by downloading and configuring the necessary components.",
    "Install the Agent Development Kit (ADK) and its dependencies.",
    "Create a new agent using the ADK, specifying the model (Gemini 2.0-flash) and backend (Vertex AI).",
    "Modify the agent's code to connect to the PostgreSQL database via the MCP Toolbox.",
    "Run the agent locally to test its functionality and ability to interact with the database.",
    "Deploy the agent to Cloud Run for cloud-based access and further testing.",
    "Interact with the deployed agent through a web console or command line to query hotel information."
  ],
  "source_code_url": "N/A",
  "demo_url": "N/A",
  "qa_segment": [
    {
      "question": "What is the primary purpose of the MCP Toolbox for Databases?",
      "answer": "The MCP Toolbox for Databases is an open-source MCP server designed to help users develop tools faster, more securely, and by handling complexities like connection pooling, authentication, and more."
    },
    {
      "question": "Which Google Cloud service is used to create the database for the travel agent?",
      "answer": "Cloud SQL for PostgreSQL is used to create the database."
    },
    {
      "question": "What is the role of the Agent Development Kit (ADK)?",
      "answer": "The ADK helps build Generative AI tools that allow agents to access data in a database. It enables agents to perform actions, interact with users, utilize external tools, and coordinate with other agents."
    },
    {
      "question": "What command is used to create the initial agent application using ADK?",
      "answer": "The command `adk create hotel-agent-app` is used to create the agent application."
    },
    ....

اکنون باید بتوانید از این ساختار JSON برای موارد پیشرفته تر استفاده از عامل استفاده کنید.

چرا این رویکرد؟

این معماری مزایای استراتژیک قابل توجهی را ارائه می دهد:

  • مقرون به صرفه بودن: خدمات بدون سرور به این معنی است که شما فقط برای آنچه استفاده می کنید هزینه پرداخت می کنید. Cloud Run Jobs در صورت عدم استفاده به صفر می رسد.
  • مقیاس پذیری: با تنظیم نمونه Cloud Run Job و تنظیمات همزمان، به راحتی ده ها هزار URL را مدیریت می کند.
  • چابکی: چرخه های توسعه و استقرار سریع برای منطق پردازش جدید یا مدل های هوش مصنوعی با به روز رسانی برنامه موجود و سرویس آن.
  • کاهش سربار عملیاتی: هیچ سروری برای وصله یا مدیریت وجود ندارد. گوگل زیرساخت ها را مدیریت می کند.
  • دموکراتیک کردن هوش مصنوعی: پردازش هوش مصنوعی پیشرفته را برای کارهای دسته‌ای بدون تخصص عمیق ML Ops قابل دسترس می‌کند.

11. پاکسازی کنید

برای جلوگیری از تحمیل هزینه به حساب Google Cloud خود برای منابع استفاده شده در این پست، این مراحل را دنبال کنید:

  1. در کنسول Google Cloud، به صفحه مدیریت منابع بروید.
  2. در لیست پروژه، پروژه ای را که می خواهید حذف کنید انتخاب کنید و سپس روی Delete کلیک کنید.
  3. در محاوره، شناسه پروژه را تایپ کنید و سپس روی Shut down کلیک کنید تا پروژه حذف شود.

12. تبریک می گویم

تبریک می گویم! با معماری راه حل ما در مورد Cloud Run Jobs و استفاده از قدرت BigQuery برای مدیریت داده و سرویس Cloud Run خارجی برای پردازش هوش مصنوعی، سیستمی بسیار مقیاس پذیر، مقرون به صرفه و قابل نگهداری ساخته اید. این الگو منطق پردازش را از هم جدا می کند، امکان اجرای موازی بدون زیرساخت پیچیده را فراهم می کند و زمان بینش را به طور قابل توجهی تسریع می کند.

ما شما را تشویق می‌کنیم تا برای نیازهای پردازش دسته‌ای خود، به بررسی Cloud Run Jobs بپردازید. این رویکرد بدون سرور راه حلی قدرتمند و کارآمد ارائه می‌کند، خواه مقیاس‌گذاری تجزیه و تحلیل هوش مصنوعی، اجرای خطوط لوله ETL یا انجام وظایف داده‌های دوره‌ای باشد. برای شروع به تنهایی، این را بررسی کنید .

اگر کنجکاو هستید که همه برنامه‌های خود را بدون سرور و به صورت عاملی بسازید و اجرا کنید، برای Code Vipassana ثبت نام کنید که بر سرعت بخشیدن به برنامه‌های عامل مولد مبتنی بر داده متمرکز است!