Масштабируйте свой конвейер Video Insights с помощью Cloud Run Jobs, BigQuery и Gemini

1. Обзор

В современном мире, насыщенном данными, извлечение содержательной информации из неструктурированного контента, особенно видео, становится насущной необходимостью. Представьте, что вам нужно проанализировать сотни или тысячи URL-адресов видео, резюмировать их содержание, извлечь ключевые технологии и даже сформировать пары вопросов и ответов для образовательных материалов. Выполнять всё это по отдельности не только занимает много времени, но и неэффективно. Именно здесь современные облачные архитектуры могут проявить себя во всей красе.

В этой лабораторной работе мы рассмотрим масштабируемое бессерверное решение для обработки видеоконтента с использованием мощного набора сервисов Google Cloud: Cloud Run, BigQuery и Google Generative AI (Gemini). Мы подробно расскажем о нашем пути от обработки одного URL до организации параллельного выполнения большого набора данных, без дополнительных затрат на управление сложными очередями сообщений и интеграцией.

Вызов

Нам было поручено обработать большой каталог видеоконтента, уделив особое внимание практическим лабораторным занятиям. Целью было проанализировать каждое видео и составить структурированное резюме, включающее названия глав, вводный контекст, пошаговые инструкции, использованные технологии и соответствующие пары вопросов и ответов. Этот результат требовалось эффективно хранить для последующего использования при создании учебных материалов.

Изначально у нас был простой HTTP-сервис Cloud Run, который мог обрабатывать по одному URL за раз. Это хорошо подходило для тестирования и анализа по запросу. Однако, столкнувшись со списком из тысяч URL, полученным из BigQuery, ограничения этой модели с одним запросом и одним ответом стали очевидны. Последовательная обработка заняла бы дни, а то и недели.

Возможность заключалась в преобразовании ручного или медленного последовательного процесса в автоматизированный, параллельный рабочий процесс. Используя облако, мы стремились:

  • Параллельная обработка данных: значительное сокращение времени обработки больших наборов данных.
  • Используйте существующие возможности ИИ: используйте возможности Gemini для сложного анализа контента.
  • Поддерживайте бессерверную архитектуру: избегайте управления серверами или сложной инфраструктурой.
  • Централизация данных: используйте BigQuery как единый источник достоверной информации для входных URL-адресов и надежный пункт назначения для обработанных результатов.
  • Создайте надежный конвейер: создайте систему, устойчивую к сбоям, которую можно легко контролировать и которой можно легко управлять.

Цель

Организация параллельной обработки ИИ с помощью заданий Cloud Run:

Наше решение основано на задании Cloud Run, которое выполняет функцию оркестратора. Оно интеллектуально считывает пакеты URL-адресов из BigQuery, перенаправляет их в наш существующий развернутый сервис Cloud Run (который выполняет обработку с помощью ИИ для каждого URL-адреса), а затем агрегирует результаты для их записи обратно в BigQuery. Такой подход позволяет нам:

  • Отделите оркестровку от обработки: задание управляет рабочим процессом, в то время как отдельная служба фокусируется на задаче ИИ.
  • Используйте параллелизм Cloud Run Job: задание может масштабировать несколько экземпляров контейнера для одновременного вызова службы ИИ.
  • Снижение сложности: мы достигаем параллелизма, позволяя заданию напрямую управлять параллельными HTTP-вызовами, что упрощает архитектуру.

Вариант использования

Аналитика на основе искусственного интеллекта из видео сессий Code Vipassana

Наш конкретный пример использования заключался в анализе видеозаписей сессий практических занятий по Code Vipassana в Google Cloud. Целью было автоматическое создание структурированной документации (конспектов глав книг), включая:

  • Названия глав: краткие названия для каждого видеофрагмента.
  • Вводный контекст: объяснение значимости видео в более широком контексте обучения
  • Что будет построено: основная задача или цель сессии
  • Используемые технологии: список облачных сервисов и других упомянутых технологий.
  • Пошаговые инструкции: как была выполнена задача, включая фрагменты кода.
  • Исходный код/ссылки на демо-версии: ссылки указаны в видео.
  • Сегмент вопросов и ответов: создание соответствующих вопросов и ответов для проверки знаний.

Поток

8d7e83c296095fe0.png

Поток архитектуры

Что такое Cloud Run? Какие задания входят в Cloud Run?

Cloud Run

Полностью управляемая бессерверная платформа, позволяющая запускать контейнеры без сохранения состояния. Она идеально подходит для веб-сервисов, API и микросервисов, которые могут автоматически масштабироваться в зависимости от входящих запросов. Вы предоставляете образ контейнера, а Cloud Run берёт на себя всё остальное — от развертывания и масштабирования до управления инфраструктурой. Она превосходно справляется с синхронными рабочими нагрузками типа «запрос-ответ».

Работа в облаке

Предложение, дополняющее сервисы Cloud Run. Задания Cloud Run предназначены для пакетной обработки, которая должна быть завершена, а затем остановлена. Они идеально подходят для обработки данных, ETL, пакетного машинного обучения и любых задач, предполагающих обработку набора данных, а не обслуживание запросов в режиме реального времени. Ключевой особенностью является возможность масштабирования количества экземпляров контейнеров (задач), работающих одновременно, для обработки пакета задач. Задания могут запускаться различными источниками событий или вручную.

Ключевое отличие

Сервисы Cloud Run предназначены для длительно работающих приложений, управляемых запросами. Задания Cloud Run предназначены для пакетной обработки ограниченных задач, которая выполняется до завершения.

Что вы построите

Приложение для поиска товаров в розничной торговле

В рамках этого вам предстоит:

  1. Создайте набор данных BigQuery, таблицу и загрузите данные (метаданные Code Vipassana)
  2. Создайте функции Python Cloud Run для реализации функциональности генеративного ИИ (преобразование видео в JSON-файл главы книги)
  3. Создайте приложение Python для конвейера данных в ИИ — считывайте данные из BigQuery и вызывайте конечную точку Cloud Run Functions для получения аналитических данных и записывайте контекст обратно в BigQuery.
  4. Сборка и контейнеризация приложения
  5. Настройте задания Cloud Run с помощью этого контейнера
  6. Выполнение и мониторинг задания
  7. Сообщить результат

Требования

  • Браузер, например Chrome или Firefox
  • Проект Google Cloud с включенным биллингом.

2. Прежде чем начать

Создать проект

  1. В консоли Google Cloud на странице выбора проекта выберите или создайте проект Google Cloud.
  2. Убедитесь, что для вашего облачного проекта включена функция выставления счетов. Узнайте, как проверить, включена ли функция выставления счетов для проекта .

Для кредитов Google Cloud: если вы хотите получить кредиты Google Cloud для начала работы, воспользуйтесь этой ссылкой для погашения кредитов. Инструкции по погашению кредитов приведены здесь .

  1. Вы будете использовать Cloud Shell — среду командной строки, работающую в Google Cloud. Нажмите «Активировать Cloud Shell» в верхней части консоли Google Cloud.

Изображение кнопки «Активировать Cloud Shell»

  1. После подключения к Cloud Shell вы проверяете, что вы уже прошли аутентификацию и что проекту присвоен ваш идентификатор проекта, с помощью следующей команды:
gcloud auth list
  1. Выполните следующую команду в Cloud Shell, чтобы подтвердить, что команда gcloud знает о вашем проекте.
gcloud config list project
  1. Если ваш проект не настроен, используйте следующую команду для его настройки:
gcloud config set project <YOUR_PROJECT_ID>
  1. Включите необходимые API: Перейдите по ссылке и включите API.

В качестве альтернативы вы можете использовать команду gcloud. Подробности о командах и использовании gcloud см. в документации .

3. Настройка базы данных/хранилища

BigQuery послужил основой нашего конвейера данных. Благодаря своей бессерверной природе и высокой масштабируемости он идеально подходит как для хранения входных данных, так и для размещения обработанных результатов.

  • Хранилище данных: BigQuery выступил в качестве хранилища данных. Он хранит список URL-адресов видео, их статусы (например, «ОЖИДАНИЕ», «ОБРАБОТКА», «ЗАВЕРШЕНО») и итоговый сгенерированный контекст. Это единственный источник информации, для которой требуется обработка видео.
  • Место назначения: Здесь хранятся данные, сгенерированные ИИ, что позволяет легко запрашивать их для последующих приложений или для ручного анализа. Наш набор данных состоял из информации о видеосессиях, в частности из контента «Code Vipassana Seasons», который часто включает в себя подробные технические демонстрации.
  • Исходная таблица: таблица BigQuery (например, post_session_labs), содержащая такие записи:
  • id: уникальный идентификатор для каждого сеанса/строки.
  • url: URL-адрес видео (например, ссылка на YouTube или ссылка на доступный диск).
  • status: Строка, указывающая состояние обработки (например, PENDING, PROCESSING, COMPLETED, FAILED_PROCESSING).
  • контекст: строковое поле для хранения сводки, созданной ИИ.
  • Загрузка данных: в этом сценарии данные загружались в BigQuery с помощью скриптов INSERT. Для нашего конвейера BigQuery стал отправной точкой.

Перейдите в консоль BigQuery, откройте новую вкладку и выполните следующие операторы SQL:

--1. Create your dataset for the project

CREATE SCHEMA `<<YOUR_PROJECT_ID>>.cv_metadata`
OPTIONS(
  location = 'us-central1', -- Specify the location (e.g., 'US', 'EU', 'asia-east1')
  description = 'Code Vipassana Sessions Metadata' -- Optional: Add a description
);

--2. Create table

create table cv_metadata.post_session_labs(id STRING, descr STRING, url STRING, context STRING, status STRING);

4. Прием данных

Теперь пора добавить таблицу с данными о магазине. Перейдите на вкладку в BigQuery Studio и выполните следующие SQL-запросы для вставки образцов записей:

--Insert sample data

insert into cv_metadata.post_session_labs(id,descr,url) values('10-1','Gen AI to Agents, where do I begin? Get started with building a single agent application on ADK Python SDK','https://youtu.be/tyqnQQXpxtI');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-2','Build an E2E multi-agent kitchen renovation app on ADK in Python with AlloyDB data and multiple tools','https://youtu.be/RdrMo2lNh0o');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-3','Augment your multiagent app with tools from MCP Toolbox for AlloyDB','https://youtu.be/9VVNh77Q3ZU?si=oQ4fhAX59Y3D5iWa');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-4','Build an agentic MCP client application using MCP Toolbox for BigQuery','https://youtu.be/HmluMag5s20');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-5','Build a travel agent using ADK & MCP Toolbox for Cloud SQL','https://youtu.be/IWg5CH6ZNs0');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-6','Build an E2E Patent Analysis Agent using ADK and Advanced Vector Search with AlloyDB','https://youtu.be/yCXJ3sk3Lxc');

insert into cv_metadata.post_session_labs(id,descr,url) values('10-7','Getting Started with MCP, ADK and A2A','https://youtu.be/JcQ_DyWc0X0');

5. Создание функции Video Insights

Нам необходимо создать и развернуть функцию Cloud Run для реализации основной функции, которая заключается в создании структурированной главы книги из URL-адреса видео. Чтобы получить к ней доступ как к независимому инструменту конечной точки, мы только что создали и развернули функцию Cloud Run. Вы также можете включить её в качестве отдельной функции в само приложение Python для задания Cloud Run:

  1. В консоли Google Cloud перейдите на страницу Cloud Run.
  2. Нажмите «Написать функцию».
  3. В поле «Название сервиса» введите название, описывающее вашу функцию. Название сервиса должно начинаться только с буквы и содержать не более 49 символов, включая буквы, цифры и дефисы. Название сервиса не может заканчиваться дефисом и должно быть уникальным для каждого региона и проекта. Название сервиса нельзя изменить позже, оно доступно для публичного просмотра. ( generate-video-insights **)**
  4. В списке «Регион» используйте значение по умолчанию или выберите регион, в котором вы хотите развернуть свою функцию. (Выберите us-central1)
  5. В списке «Среда выполнения» используйте значение по умолчанию или выберите версию среды выполнения. (Выберите Python 3.11)
  6. В разделе «Аутентификация» выберите «Разрешить публичный доступ».
  7. Нажмите кнопку «Создать».
  8. Функция создается и загружается с помощью шаблона main.py и requirements.txt.
  9. Замените это файлами main.py и requirements.txt из репозитория этого проекта.

ВАЖНОЕ ПРИМЕЧАНИЕ: В файле main.py не забудьте заменить <<YOUR_PROJECT_ID>> на идентификатор вашего проекта.

  1. Разверните и сохраните конечную точку, чтобы ее можно было использовать в источнике для задания Cloud Run.

Ваша конечная точка должна выглядеть так (или похоже): https://generate-video-insights-<<YOUR_POJECT_NUMBER>>.us-central1.run.app

Что представляет собой функция Cloud Run?

Gemini 2.5 Flash для обработки видео

Для основной задачи понимания и обобщения видеоконтента мы использовали Flash-модель Gemini 2.5 от Google. Gemini — это мощные мультимодальные модели искусственного интеллекта, способные понимать и обрабатывать различные типы входных данных, включая текст и, при определённых интеграциях, видео.

В нашей конфигурации мы не передавали видеофайл напрямую в Gemini. Вместо этого мы отправляли текстовое приглашение, включающее URL-адрес видео, и инструктировали Gemini, как анализировать (гипотетическое) содержание видео по этому URL-адресу. Хотя Gemini 2.5 Flash поддерживает мультимодальный ввод, этот конкретный конвейер использовал текстовое приглашение, описывающее характер видео (практическое лабораторное занятие), и запрашивал структурированный вывод в формате JSON. Это позволяет Gemini использовать развитые логические функции и понимание естественного языка для вывода и синтеза информации на основе контекста приглашения.

Подсказка Gemini: руководство ИИ

Грамотно составленное приглашение критически важно для моделей искусственного интеллекта. Наше приглашение было разработано для извлечения очень конкретной информации и её структурирования в формат JSON, что позволило нашему приложению легко её анализировать.

PROMPT_TEMPLATE = """
In the video at the following URL: {youtube_url}, which is a hands-on lab session:
Ignore the credits set-up part particularly the coupon code and credits link aspect should not be included in your analysis or the extaction of context. Also exclude any credentials that are explicit in the video.
Take only the first 30-40 minutes of the video without throwing any error.
Analyze the rest of the content of the video.
Extract and synthesize information to create a book chapter section with the following structure, formatted as a JSON string:
1. **chapter_title:** A concise and engaging title for the chapter.
2. **introduction_context:** Briefly explain the relevance of this video segment within a broader learning context.
3. **what_will_build:** Clearly state the specific task or goal accomplished in this video segment.
4. **technologies_and_services:** List all mentioned Google Cloud services and any other relevant technologies (e.g., programming languages, tools, frameworks).
5. **how_we_did_it:** Provide a clear, numbered step-by-step guide of the actions performed. Include any exact commands or code snippets as they appear in the video. Format code/commands using markdown backticks (e.g., `my-command`).
6. **source_code_url:** Provide a URL to the source code repository if mentioned or implied. If not available, use "N/A".
7. **demo_url:** Provide a URL to a demo if mentioned or implied. If not available, use "N/A".
8. **qa_segment:** Generate 10–15 relevant questions based on the content of this segment, along with concise answers. Ensure the questions are thought-provoking and test understanding of the material.
REMEMBER: Ignore the credits set-up part particularly the coupon code and credits link aspect should not be included in your analysis or the extaction of context. Also exclude any credentials that are explicit in the video.
Format the entire output as a JSON string. Ensure all keys and string values are enclosed in double quotes.
Example structure:
...
"""

Это приглашение весьма специфично, позволяя Gemini действовать как своего рода образовательный инструмент. Запрос строки JSON обеспечивает структурированный, машиночитаемый вывод.

Вот код для анализа видеовхода и возврата его контекста:

def process_videos_batch(video_url: str, PROMPT_TEMPLATE: str) -> str:
    """
    Processes a video URL, generates chapter content using Gemini
    """
    formatted_prompt = PROMPT_TEMPLATE.format(youtube_url=video_url)
    try:

        client = genai.Client(vertexai=True,project='<<YOUR_PROJECT_ID>>',location='us-central1',http_options=HttpOptions(api_version="v1"))
        response = client.models.generate_content(
            model="gemini-2.5-flash",
            contents=formatted_prompt,
        )
        print(response.text)
    except Exception as e:
        print(f"An error occurred during content generation: {e}")
        return f"Error processing video: {e}"
    print(response.text)
    return response.text

В приведенном выше фрагменте кода показана основная функция варианта использования. Он получает URL-адрес видео и использует модель Gemini через клиент Vertex AI для анализа видеоконтента и извлечения релевантной информации в соответствии с запросом. Извлеченный контекст затем возвращается для дальнейшей обработки. Это представляет собой синхронную операцию, в которой задача Cloud Run ожидает завершения работы сервиса.

6. Разработка конвейерных приложений (Python)

Логика нашего центрального конвейера находится в исходном коде приложения, который будет помещен в контейнер Cloud Run Job, который организует весь процесс параллельного выполнения. Вот основные элементы:

Роль оркестратора в управлении рабочим процессом и обеспечении целостности данных:

# ... (imports and configuration) ...

def process_batch_from_bq(request_or_trigger_data=None):
    # ... (initial checks for config) ...
    BATCH_SIZE = 5 # Fetch 5 URLs at a time per job instance
    
    query = f"""
        SELECT url, id
        FROM `{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE_SOURCE}`
        WHERE status = 'PENDING'
        LIMIT {BATCH_SIZE}
    """
    try:
        logging.info(f"Fetching up to {BATCH_SIZE} pending URLs from BigQuery...")
        rows = bq_client.query(query).result() # job_should_wait=True is default for result()
        pending_urls_data = []
        for row in rows:
            pending_urls_data.append({"url": row.url, "id": row.id})
        
        if not pending_urls_data:
            logging.info("No pending URLs found. Job finished.")
            return "No pending URLs found. Job finished.", 200

        row_ids_to_process = [item["id"] for item in pending_urls_data]

        # --- Mark as PROCESSING to prevent duplicate work ---
        update_status_query = f"""
            UPDATE `{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE_SOURCE}`
            SET status = 'PROCESSING'
            WHERE id IN UNNEST(@row_ids_to_process)
        """
        
        status_update_job_config = bigquery.QueryJobConfig(
            query_parameters=[
                bigquery.ArrayQueryParameter("row_ids_to_process", "STRING", values=row_ids_to_process)
            ]
        )
        
        update_status_job = bq_client.query(update_status_query, job_config=status_update_job_config)
        update_status_job.result()
        logging.info(f"Marked {len(row_ids_to_process)} URLs as 'PROCESSING'.")

        # ... (rest of the code for parallel processing and writing) ...
    
    except Exception as e:
        # ... (error handling) ...

Этот фрагмент кода выше начинается с извлечения пакета URL-адресов видео со статусом «ОЖИДАНИЕ» из исходной таблицы BigQuery. Затем он обновляет статус этих URL-адресов в BigQuery на «ОБРАБОТКА», предотвращая дублирование обработки.

Параллельная обработка с помощью ThreadPoolExecutor и вызов службы процессора:

# ... (inside process_batch_from_bq function) ...

        # --- Step 3: Call the external URL Processor Service in parallel ---
        processed_results = {}
        futures = []

        # ThreadPoolExecutor for I/O-bound tasks (HTTP requests to the processor service)
        # MAX_CONCURRENT_TASKS_PER_INSTANCE controls parallelism within one job instance.
        with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_TASKS_PER_INSTANCE) as executor:
            for item in pending_urls_data:
                url = item["url"]
                row_id = item["id"]
                # Submit the task: call the processor service for this URL
                future = executor.submit(call_url_processor_service, url)
                futures.append((row_id, future)) 

            # Collect results as they complete
            for row_id, future in futures:
                try:
                    content = future.result(timeout=URL_PROCESSOR_TIMEOUT_SECONDS)
                    
                    # Check if the processor service returned an error message
                    if content.startswith("ERROR:"):
                        processed_results[row_id] = {"context": content, "status": "FAILED_PROCESSING"}
                    else:
                        processed_results[row_id] = {"context": content, "status": "COMPLETED"}
                        
                except TimeoutError:
                    logging.warning(f"URL processing timed out (service call for row ID {row_id}). Marking as FAILED.")
                    processed_results[row_id] = {"context": f"ERROR: Processing timed out for '{row_id}'.", "status": "FAILED_PROCESSING"}
                except Exception as e:
                    logging.error(f"Exception during future result retrieval for row ID {row_id}: {e}")
                    processed_results[row_id] = {"context": f"ERROR: Unexpected error during result retrieval for '{row_id}'. Details: {e}", "status": "FAILED_PROCESSING"}

Эта часть кода использует ThreadPoolExecutor для параллельной обработки URL-адресов полученных видео. Для каждого URL-адреса отправляется задача на асинхронный вызов службы Cloud Run Service (URL Processor). Это позволяет заданию Cloud Run Job эффективно обрабатывать несколько видео одновременно, повышая общую производительность конвейера. Фрагмент также обрабатывает возможные тайм-ауты и ошибки службы-обработчика.

Чтение и запись из BigQuery и в BigQuery

Основное взаимодействие с BigQuery включает в себя выборку ожидающих URL-адресов и их последующее обновление обработанными результатами.

# ... (inside process_batch_from_bq) ...
    BATCH_SIZE = 5 
    query = f"""
        SELECT url, id
        FROM `{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE_SOURCE}`
        WHERE status = 'PENDING'
        LIMIT {BATCH_SIZE}
    """
    rows = bq_client.query(query).result()
    
    pending_urls_data = []
    for row in rows:
        pending_urls_data.append({"url": row.url, "id": row.id})
# ... (rest of fetching and marking as PROCESSING) ...

Запись результатов обратно в BigQuery:

# --- Step 4: Write results back to BigQuery ---
        logging.info(f"Writing {len(processed_results)} results back to BigQuery...")
        successful_updates = 0
        for row_id, data in processed_results.items():
            if update_bq_row(row_id, data["context"], data["status"]):
                successful_updates += 1
        
        logging.info(f"Finished processing. {successful_updates} out of {len(processed_results)} rows updated successfully.")
        # ... (return statement) ...

# --- Helper to update a single row in BigQuery ---
def update_bq_row(row_id, context, status="COMPLETED"):
    """Updates a specific row in the target BigQuery table."""
    # ... (checks for config) ...

    update_query = f"""
        UPDATE `{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE_TARGET}`
        SET
            context = @context,
            status = @status
        WHERE id = @row_id
    """

    # Correctly defining query parameters for the UPDATE statement
    job_config = bigquery.QueryJobConfig(
            query_parameters=[
                bigquery.ScalarQueryParameter("context", "STRING", value=context),
                bigquery.ScalarQueryParameter("status", "STRING", value=status),
                # Assuming 'id' column is STRING. Adjust if it's INT64.
                bigquery.ScalarQueryParameter("row_id", "STRING", value=row_id) 
            ]
        )

    try:
        update_job = bq_client.query(update_query, job_config=job_config)
        update_job.result() # Wait for the job to complete
        logging.info(f"Successfully updated BigQuery row ID {row_id} with status {status}.")
        return True
    except Exception as e:
        logging.error(f"Failed to update BigQuery row ID {row_id}: {e}")
        return False

Приведённые выше фрагменты кода иллюстрируют взаимодействие данных между задачей Cloud Run Job и BigQuery. Она извлекает пакет URL-адресов видео в состоянии «PENDING» и их идентификаторы из исходной таблицы. После обработки URL-адресов этот фрагмент кода демонстрирует запись извлечённого контекста и статуса ('COMPLETED' или 'FAILED_PROCESSING') обратно в целевую таблицу BigQuery с помощью запроса UPDATE. Этот фрагмент кода завершает цикл обработки данных. Он также включает вспомогательную функцию update_bq_row, которая показывает, как определить параметры оператора обновления.

Настройка приложения

Приложение структурировано как единый скрипт Python, который будет помещен в контейнер. Для определения точки входа оно использует клиентские библиотеки Google Cloud и фреймворк функций.

  • Зависимости: google-cloud-bigquery, запросы
  • Конфигурация: все критически важные настройки (проект/набор данных/таблица BigQuery, URL-адрес службы процессора URL) загружаются из переменных среды, что делает приложение переносимым и безопасным.
  • Основная логика: функция process_batch_from_bq управляет всем рабочим процессом
  • Интеграция с внешними службами: функция call_url_processor_service обеспечивает связь с отдельной службой Cloud Run.
  • Взаимодействие с BigQuery: bq_client используется для извлечения URL-адресов и обновления результатов с правильной обработкой параметров.
  • Параллелизм: concurrent.futures.ThreadPoolExecutor управляет одновременными вызовами внешней службы
  • Точка входа: код Python с именем main.py выступает в качестве точки входа, запускающей пакетную обработку.

Давайте теперь настроим приложение:

  1. Вы можете начать с перехода в Cloud Shell Terminal и клонирования репозитория:
git clone https://github.com/AbiramiSukumaran/video-context-crj
  1. Перейдите в редактор Cloud Shell, где вы увидите недавно созданную папку video-context-crj.
  2. Удалите следующее, так как эти шаги уже выполнены в предыдущих разделах:
  3. Удалить папку Cloud_Run_Function
  4. Перейдите в папку проекта video-context-crj , и вы увидите структуру проекта:

84ace76f8e20c668.png

7. Настройка Dockerfile и контейнеризация

Чтобы развернуть эту логику как задачу Cloud Run, нам необходимо её контейнеризировать. Контейнеризация — это процесс упаковки кода нашего приложения, его зависимостей и среды выполнения в переносимый образ.

Обязательно замените заполнители (текст, выделенный жирным шрифтом) своими значениями в Dockerfile:

# Use an official Python runtime as a parent image
FROM python:3.12-alpine

# Set the working directory in the container
WORKDIR /app

# Copy the requirements file into the container
COPY requirements.txt .

# Install any needed packages specified in requirements.txt
RUN pip install --trusted-host pypi.python.org -r requirements.txt

# Copy the rest of the application code
COPY . .

# Define environment variables for configuration (these will be overridden during deployment)
ENV BIGQUERY_PROJECT="YOUR-project"
ENV BIGQUERY_DATASET="YOUR-dataset"
ENV BIGQUERY_TABLE_SOURCE="YOUR-source-table"
ENV URL_PROCESSOR_SERVICE_URL="ENDPOINT FOR VIDEO PROCESSING" 
ENV BIGQUERY_TABLE_TARGET = "YOUR-destination-table"

ENTRYPOINT ["python", "main.py"]

Приведённый выше фрагмент Dockerfile определяет базовый образ, устанавливает зависимости, копирует наш код и задаёт команду для запуска нашего приложения с использованием фреймворка функций с корректной целевой функцией (process_batch_from_bq). Затем этот образ отправляется в реестр артефактов.

Контейнеризация

Чтобы контейнеризировать его, перейдите в Cloud Shell Terminal и выполните следующие команды (не забудьте заменить заполнитель <<YOUR_PROJECT_ID>>):

export CONTAINER_IMAGE="gcr.io/<<YOUR_PROJECT_ID>>/batch-url-processor-orchestrator:latest"

gcloud builds submit --tag $CONTAINER_IMAGE .

После создания образа контейнера вы должны увидеть результат:

eec4f4a2bc5745f2.png

Наш контейнер создан и сохранён в реестре артефактов. Теперь можно переходить к следующему шагу.

8. Создание рабочих мест в облаке

Развертывание задания включает в себя создание образа контейнера, а затем создание ресурса Cloud Run Job.

Мы уже создали образ контейнера и сохранили его в реестре артефактов. Теперь давайте создадим задание.

  1. Перейдите в консоль Cloud Run Jobs и нажмите Deploy Container:

f3a1f4775000186e.png

  1. Выберите образ контейнера, который мы только что создали:

90989f396ad6c30a.png

  1. Введите другие данные конфигурации следующим образом:

b07fe386a4ae2797.png

  1. Установите емкость задачи следующим образом:

327a05d61e1337c3.png

Поскольку у нас есть записи в базу данных, а распараллеливание (max_instances и параллелизм задач) уже реализовано в коде, мы установим количество одновременных задач равным 1. Однако вы можете увеличить его по своему усмотрению. Цель здесь — обеспечить выполнение задач до завершения в соответствии с конфигурацией с уровнем параллелизма, заданным в параметре «параллелизм».

  1. Нажмите «Создать».

Ваша задача Cloud Run будет успешно создана.

Как это работает

Запускается экземпляр контейнера нашего задания. Он запрашивает в BigQuery небольшой пакет (BATCH_SIZE) URL-адресов, отмеченных как ОЖИДАНИЕ. Он немедленно обновляет статус этих полученных URL-адресов до ОБРАБОТКА в BigQuery, чтобы предотвратить их обработку другими экземплярами заданий. Он создаёт ThreadPoolExecutor и отправляет задачу для каждого URL-адреса в пакете. Каждая задача вызывает функцию call_url_processor_service. По мере завершения (или истечения времени ожидания/ошибки) запросов call_url_processor_service собираются их результаты (сгенерированный ИИ контекст или сообщение об ошибке), которые сопоставляются с исходным row_id. После завершения всех задач пакета задание перебирает собранные результаты и обновляет поля контекста и статуса для каждой соответствующей строки в BigQuery. В случае успеха экземпляр задания завершается корректно. При обнаружении необработанных ошибок возникает исключение, которое может привести к повторной попытке со стороны Cloud Run Jobs (в зависимости от конфигурации задания).

Как вписываются задания Cloud Run: оркестровка

Вот где работа в Cloud Run действительно проявляется.

Бессерверная пакетная обработка: мы получаем управляемую инфраструктуру, которая может запускать столько экземпляров контейнеров, сколько необходимо (до MAX_INSTANCES) для одновременной обработки наших данных.

Управление параллелизмом: мы определяем MAX_INSTANCES (количество заданий, которые могут выполняться параллельно) и TASK_CONCURRENCY (количество операций, которые каждый экземпляр задания выполняет параллельно). Это обеспечивает детальный контроль над производительностью и использованием ресурсов.

Отказоустойчивость: если выполнение задания завершается сбоем, можно настроить Cloud Run Jobs на повторную попытку выполнения всего задания или отдельных задач, гарантируя, что обработка данных не будет потеряна.

Упрощенная архитектура: организуя HTTP-вызовы непосредственно в задании и используя BigQuery для управления состоянием, мы избегаем сложности настройки и управления Pub/Sub, его темами, подписками и логикой подтверждений.

MAX_INSTANCES против TASK_CONCURRENCY:

MAX_INSTANCES: Общее количество экземпляров задания, которые могут выполняться одновременно на протяжении всего процесса выполнения. Это основной инструмент параллелизма для обработки множества URL-адресов одновременно.

TASK_CONCURRENCY: Количество параллельных операций (вызовов службы вашего процессора), которые будет выполнять один экземпляр вашего задания. Это помогает максимально загрузить ЦП/сеть одного экземпляра.

9. Выполнение и мониторинг задания Cloud Run

Метаданные видео

Прежде чем нажать кнопку «Выполнить», давайте посмотрим на состояние данных.

Перейдите в BigQuery Studio и выполните следующий запрос:

Select id, descr, url, status from cv_metadata.post_session_labs where status = PENDING'

e9d99c2ed84d265f.png

У нас есть несколько примеров записей с URL-адресами видео в статусе «ОЖИДАНИЕ». Наша цель — заполнить поле «Контекст» информацией из видео в формате, указанном в подсказке.

Триггер работы

Давайте продолжим и выполним задание, нажав кнопку ВЫПОЛНИТЬ на задании в консоли Cloud Run Jobs, и вы сможете увидеть ход выполнения и статус заданий в консоли:

13f6a8892e6fd2bf.png

Вы можете проверить тег LOGS в OBSERVABILITY для отслеживания этапов и других сведений о задании и задачах.

10. Анализ результатов

После завершения работы вы сможете увидеть обновленный контекст для каждого URL-адреса видео в таблице:

135f85ad141c070b.png

Контекст вывода (для одной из записей)

{
  "chapter_title": "Building a Travel Agent with ADK and MCP Toolbox",
  "introduction_context": "This chapter section is derived from a hands-on lab session focused on building a travel agent. It details the process of integrating various Google Cloud services and tools to create an intelligent agent capable of querying a database and interacting with users.",
  "what_will_build": "The goal is to build and deploy a travel agent that can answer user queries about hotels using the Agent Development Kit (ADK) and the MCP Toolbox for Databases, connecting to a PostgreSQL database.",
  "technologies_and_services": [
    "Google Cloud Platform",
    "Cloud SQL for PostgreSQL",
    "Agent Development Kit (ADK)",
    "MCP Toolbox for Databases",
    "Cloud Shell",
    "Cloud Run",
    "Python",
    "Docker"
  ],
  "how_we_did_it": [
    "Provision a Cloud SQL instance for PostgreSQL with the 'hoteldb-instance'.",
    "Prepare the 'hotels' database by creating a table with relevant schema and populating it with sample data.",
    "Set up the MCP Toolbox for Databases by downloading and configuring the necessary components.",
    "Install the Agent Development Kit (ADK) and its dependencies.",
    "Create a new agent using the ADK, specifying the model (Gemini 2.0-flash) and backend (Vertex AI).",
    "Modify the agent's code to connect to the PostgreSQL database via the MCP Toolbox.",
    "Run the agent locally to test its functionality and ability to interact with the database.",
    "Deploy the agent to Cloud Run for cloud-based access and further testing.",
    "Interact with the deployed agent through a web console or command line to query hotel information."
  ],
  "source_code_url": "N/A",
  "demo_url": "N/A",
  "qa_segment": [
    {
      "question": "What is the primary purpose of the MCP Toolbox for Databases?",
      "answer": "The MCP Toolbox for Databases is an open-source MCP server designed to help users develop tools faster, more securely, and by handling complexities like connection pooling, authentication, and more."
    },
    {
      "question": "Which Google Cloud service is used to create the database for the travel agent?",
      "answer": "Cloud SQL for PostgreSQL is used to create the database."
    },
    {
      "question": "What is the role of the Agent Development Kit (ADK)?",
      "answer": "The ADK helps build Generative AI tools that allow agents to access data in a database. It enables agents to perform actions, interact with users, utilize external tools, and coordinate with other agents."
    },
    {
      "question": "What command is used to create the initial agent application using ADK?",
      "answer": "The command `adk create hotel-agent-app` is used to create the agent application."
    },
    ....

Теперь вы сможете проверить использование этой структуры JSON для более сложных случаев использования агентами.

Почему именно такой подход?

Такая архитектура обеспечивает значительные стратегические преимущества:

  • Экономичность: бессерверные сервисы означают, что вы платите только за то, что используете. Масштабирование заданий Cloud Run сокращается до нуля, когда они не используются.
  • Масштабируемость: легко обрабатывает десятки тысяч URL-адресов, настраивая экземпляр задания Cloud Run и параметры параллелизма.
  • Гибкость: быстрые циклы разработки и развертывания новой логики обработки или моделей ИИ путем простого обновления содержащегося приложения и его служб.
  • Сокращение операционных расходов: не нужно исправлять или управлять серверами; Google управляет инфраструктурой.
  • Демократизация ИИ: делает расширенную обработку ИИ доступной для пакетных задач без глубоких знаний операций машинного обучения.

11. Уборка

Чтобы избежать списания средств с вашего аккаунта Google Cloud за ресурсы, использованные в этой публикации, выполните следующие действия:

  1. В консоли Google Cloud перейдите на страницу менеджера ресурсов .
  2. В списке проектов выберите проект, который вы хотите удалить, а затем нажмите Удалить .
  3. В диалоговом окне введите идентификатор проекта, а затем нажмите кнопку «Завершить» , чтобы удалить проект.

12. Поздравления

Поздравляем! Построив наше решение на основе Cloud Run Jobs и используя возможности BigQuery для управления данными и внешнего сервиса Cloud Run Service для обработки ИИ, вы создали высокомасштабируемую, экономичную и простую в обслуживании систему. Этот шаблон разделяет логику обработки, обеспечивает параллельное выполнение без сложной инфраструктуры и значительно ускоряет получение аналитической информации.

Мы рекомендуем вам изучить возможности Cloud Run Jobs для ваших собственных задач пакетной обработки. Этот бессерверный подход предлагает мощное и эффективное решение, будь то масштабирование анализа ИИ, запуск конвейеров ETL или выполнение периодических задач по обработке данных. Чтобы начать работу самостоятельно, ознакомьтесь с этой информацией .

Если вам интересно создавать и развертывать все свои приложения без сервера и с использованием агентов, зарегистрируйтесь на Code Vipassana , который посвящен ускорению генеративных агентных приложений, управляемых данными!