Создайте агент обработки данных, управляемый событиями, с помощью BigQuery и ADK.

1. Введение

В этом практическом занятии вы создадите архитектуру, управляемую событиями, которая объединит непрерывные запросы BigQuery, модель Pub/Sub и агента для расследования мошенничеств, разработанного с использованием комплекта разработки агентов (ADK), размещенного на платформе Vertex AI Agent Engine.

Архитектура агента данных, управляемого событиями

Вам потребуется настроить конвейер обработки данных, в котором непрерывный запрос будет обнаруживать аномалии (например, "Невозможное путешествие") в розничных транзакциях в режиме реального времени, экспортировать эти подозрительные события в топик Pub/Sub, который затем запустит агент ADK для оценки каждой аномалии и реагирования на нее индивидуально.

Что вы будете делать

  • Подготовьте среду BigQuery, используя примеры данных транзакций.
  • Создайте непрерывный запрос BigQuery для обнаружения аномалий в реальном времени.
  • Настройте тему и подписку Pub/Sub с использованием преобразования отдельных сообщений (SMT).
  • Загрузите, настройте и разверните агента ADK в Vertex AI Agent Engine.
  • Передача данных о транзакциях для подтверждения того, что агент получает и обрабатывает запросы на эскалацию.

Что вам понадобится

  • Веб-браузер, например Chrome.
  • Проект Google Cloud с включенной функцией выставления счетов.
  • Доступ к Google Cloud Shell

Этот практический семинар предназначен для разработчиков среднего уровня, знакомых с BigQuery и основами Python.

Стоимость ресурсов, созданных в рамках этого практического занятия, должна составлять менее 2 долларов.

Ориентировочная продолжительность: выполнение этого практического задания займет приблизительно 60 минут.

2. Прежде чем начать

Создайте проект в Google Cloud.

  1. В консоли Google Cloud на странице выбора проекта выберите или создайте проект Google Cloud .
  2. Убедитесь, что для вашего облачного проекта включена функция выставления счетов. Узнайте, как проверить, включена ли функция выставления счетов для проекта .

Запустить Cloud Shell

Cloud Shell — это среда командной строки, работающая в Google Cloud и поставляемая с предустановленными необходимыми инструментами.

  1. В верхней части консоли Google Cloud нажмите кнопку «Активировать Cloud Shell» .
  2. После подключения к Cloud Shell подтвердите свою аутентификацию:
    gcloud auth list
    
  3. Убедитесь, что ваш проект настроен:
    gcloud config get project
    
  4. Если параметры вашего проекта заданы не так, как ожидалось, настройте их следующим образом:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Укажите идентификатор вашего проекта.

Выполните следующую команду, чтобы получить идентификатор вашего активного проекта Google Cloud и сохранить его в качестве переменной среды для использования на протяжении всего этого практического занятия:

export PROJECT_ID=$(gcloud config get-value project)

Получить код

Выполните эту команду, чтобы клонировать репозиторий и загрузить только целевую папку event_driven_agents_demo , которая содержит агент ADK и скрипты настройки:

git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo

Перейдите в каталог event_driven_agents_demo :

cd event_driven_agents_demo

Если вы откроете редактор Cloud Shell, вы сможете увидеть структуру клонированного репозитория:

Папка с агентом ADK и скриптами настройки.

3. Подготовка окружающей среды

Вы подготовите свою среду Google Cloud, используя скрипт настройки, предоставленный в репозитории. Этот скрипт:

  • Создает хранилище Google Cloud Storage для подготовки комплекта разработчика агента (ADK).
  • Создает CONTINUOUS резервирование в рамках корпоративного BigQuery для обработки запросов.
  • Создает набор данных BigQuery и загружает исходные данные customer_profiles
  • Настраивает разрешения IAM и предоставляет необходимые роли учетной записи службы агента ADK.

Запустите скрипт из вашей оболочки Cloud Shell:

chmod +x setup/setup_env.sh
./setup/setup_env.sh

4. Проверьте агент ADK.

Теперь вам нужно развернуть код агента ADK в Vertex AI Agent Engine. Это необходимо сделать в первую очередь, чтобы ваш агент был развернут и готов к обработке запросов на эскалацию до начала потоковой передачи данных.

cd agent

Понимание кода агента ADK (Agent Development Kit).

Основная логика работы агента определена в файле adk_agent_app/agent.py .

Мы создаём агента, который использует Gemini 2.5 Flash для автономного расследования аномальных оповещений. Агент анализирует содержимое оповещения, извлекает историю клиента из BigQuery и проверяет данные продавца с помощью веб-поиска, прежде чем классифицировать транзакцию как FALSE_POSITIVE (легитимная транзакция) или ESCALATION_NEEDED .

# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
    model="gemini-2.5-flash",
    name="Fraud_Investigation_Agent",
    description="Expert fraud analyst agent that autonomously investigates alerts...",
    instruction=(
        "You are an expert fraud investigator for Cymbal Bank. "
        "Your goal is to investigate financial transaction alerts, "
        "determine if they are fraudulent, and take appropriate action. "
        "Calculate risk, assess the logic_signals provided in the input, "
        "query the database for past transactions, and search the merchant..."
    ),
    tools=[
        bigquery_toolset,
        google_search,
    ],
)

Агент оснащен двумя различными инструментами:

  1. BigQueryToolset : Позволяет агенту автономно запрашивать набор данных cymbal_bank для поиска дополнительной истории транзакций.
  2. google_search : Позволяет агенту осуществлять поиск в интернете для проверки репутации продавца и подтверждения его легитимности.

5. Разверните агент ADK.

Выполните следующую команду, чтобы установить необходимые пакеты Python ( google-cloud-aiplatform , google-adk и т. д.) для развертывания агента:

pip install -r requirements.txt

Выполните следующую команду, чтобы динамически сгенерировать файл .env , содержащий идентификатор вашего проекта; он будет использоваться при развертывании агента:

cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF

Теперь выполните эту команду, чтобы развернуть агента в Vertex AI Agent Engine:

python deploy_agent_script.py

Примечание: скрипт deploy_agent_script.py инициализирует плагин BigQueryAgentAnalyticsPlugin , который автоматически записывает данные трассировки и информацию об использовании инструментов агента в таблицу agent_events в BigQuery.

Выполнение этой процедуры займет несколько минут. В результате вы должны увидеть примерно следующий вывод:

Deploying Agent...
Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...

================================================================================
Pub/Sub Push Endpoint URL:
https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery
================================================================================

Выполните эту команду, чтобы сохранить URL-адрес конечной точки развернутого агента в локальный файл с именем agent_endpoint.txt :

export AGENT_ENDPOINT=$(cat agent_endpoint.txt)

Мы будем использовать этот URL-адрес позже при создании нашей подписки Pub/Sub.

6. Протестируйте агент ADK.

Перед созданием событий для потоковой передачи в реальном времени проверьте, корректно ли агент ADK в Agent Engine обрабатывает ручные запросы на эскалацию.

  1. В консоли Google Cloud перейдите на страницу Vertex AI Agent Engine .
  2. Щелкните по имени вашего развернутого агента ( Cymbal Bank Fraud Assitant ).
  3. Перейдите на вкладку «Игровая площадка» , чтобы напрямую взаимодействовать с агентом.
  4. В интерфейсе чата вставьте следующий смоделированный JSON-код события, имитирующий то, что агент получит от системы Pub/Sub, и нажмите Enter:
    {
      "window_end": "2026-03-15T10:00:00Z",
      "user_id": "user_39175",
      "customer_name": "Jonathan Mckinney",
      "tx_count": 1,
      "total_window_spend": 15.0,
      "highest_value_merchant": "Google One Subscription",
      "highest_value_mcc": "5732",
      "contains_international_tx": false,
      "contains_untrusted_device_tx": false,
      "final_risk_score": 2,
      "logic_signals": {
        "is_impossible_travel": false,
        "has_security_mismatch": false,
        "is_high_velocity": false
      }
    }
    

Убедитесь, что агент оценивает транзакцию и выдает FALSE POSITIVE оценку в окне Playground:

Игровая площадка для движка агентов Vertex AI

7. Настройте непрерывный запрос BigQuery для потоковой передачи запросов на эскалацию в Pub/Sub.

Теперь, когда наш агент ADK развернут и готов к приему событий, давайте вернемся в корневой каталог и построим остальную часть конвейера:

cd ../../event_driven_agents_demo

1. Создайте тему для публикации/подтемы.

Выполните эту команду, чтобы создать тему Pub/Sub. Эта тема будет получать аномалии, экспортированные из BigQuery Continuous Query:

gcloud pubsub topics create cymbal-bank-escalations-topic

На следующем шаге мы оформим подписку на эту тему.

2. Выполните непрерывный запрос BigQuery.

После развертывания агента и подготовки темы Pub/Sub запустите непрерывный запрос для мониторинга потока retail_transactions в режиме реального времени. Этот запрос обнаруживает аномалии, связанные с "невозможными поездками", и экспортирует оповещения в Pub/Sub.

Для начала выполнения запроса выполните следующую команду:

sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql

bq query \
  --use_legacy_sql=false \
  --continuous=true \
  --sync=false \
  --connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
  "$(cat setup/continuous_query.sql)"

В терминале должно отобразиться сообщение, указывающее на успешный запуск непрерывного запроса:

Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1

8. Создайте подписку на push-уведомления.

Теперь, когда ваш агент развернут и непрерывный запрос запущен, вам нужно создать подписку "Push", чтобы активно пересылать все новые сообщения об аномалиях из темы непосредственно на URL-адрес веб-перехватчика вашего агента.

Чтобы гарантировать получение агентом данных в правильном формате, мы будем использовать Single Message Transform (SMT) . SMT позволяют вносить незначительные изменения в данные и атрибуты сообщений непосредственно в Pub/Sub на лету, до их доставки подписчику.

Вот как происходит преобразование в нашем конвейере:

  • Пользовательская функция (UDF): файл transform.yaml в каталоге setup содержит пользовательскую функцию JavaScript, которая будет обрабатывать сообщения.
  • Распаковка данных BigQuery: Когда BigQuery экспортирует данные в Pub/Sub посредством непрерывного запроса, он оборачивает полезную нагрузку JSON во внешний объект.
  • Форматирование для ADK: Пользовательская функция (UDF) расшифровывает двойное кодирование и переупаковывает полезную нагрузку в строгий формат, ожидаемый API streamQuery Agent Engine.

Выполните следующую команду, чтобы создать подписку с применением преобразования UDF:

gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
  --topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
  --message-transforms-file=setup/transform.yaml \
  --push-endpoint="$AGENT_ENDPOINT" \
  --push-no-wrapper \
  --push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --ack-deadline=600

Вы должны увидеть сообщение, подтверждающее создание подписки:

Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].

9. Генерация событий

Наконец, протестируйте весь процесс от начала до конца, запустив generate_events.py для потоковой передачи синтетической транзакции "Невозможное путешествие" в таблицу cymbal_bank.retail_transactions :

python simulator/generate_events.py

В модели используются данные профиля клиента, которые мы загрузили ранее (Карен Бертон, проживающая в США), и моделируется новая транзакция по покупке электроники на сумму 2500 долларов США, происходящая в Австралии (AUS).

Убедитесь, что событие получено: подождите приблизительно две минуты для непрерывного формирования окна запроса и обработки ADK, затем проверьте журналы развернутого агента, чтобы убедиться, что он обработал сообщение Pub/Sub, которое было инициировано.

Журналы работы агентского движка

10. Анализ производительности агентов в BigQuery

Перейдите в консоль BigQuery и выберите набор данных cymbal_bank . Выберите таблицу agent_events и нажмите «Предварительный просмотр»:

Предварительный просмотр событий агента BigQuery

Полученные данные подтверждают, что агент успешно проанализировал ситуацию эскалации "Невозможное путешествие".

Поскольку автономные агенты постоянно работают в фоновом режиме, возможность наблюдения имеет решающее значение. Ваш агент автоматически записывает трассировки выполнения с помощью плагина ADK и регистрирует принятые решения с помощью пользовательского инструмента.

Выполните следующий запрос, чтобы объединить решения вашего агента с метриками задержки и использования токенов, собранными в таблице agent_events :

-- Create session-level metrics from detailed agent events
SELECT 
  MAX(d.timestamp) AS decision_time,
  ANY_VALUE(d.user_id) AS user_id,
  ANY_VALUE(d.merchant) AS merchant,
  ANY_VALUE(d.decision) AS decision,
  ANY_VALUE(d.summary) AS summary,
  -- Calculate latency in seconds
  TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
  -- Aggregate total tokens from LLM calls
  SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
  -- Count total events logged to represent the agent's complex reasoning steps
  COUNT(e.session_id) AS agent_reasoning_steps,
  -- Count total tool calls
  COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count  
FROM 
  `cymbal_bank.agent_decisions` d
JOIN 
  `cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY 
  d.session_id
ORDER BY 
  decision_time DESC

Вы должны увидеть заполненную таблицу результатов, которая будет выглядеть примерно так:

Результаты анализа данных агента BigQuery

Искусство возможного: Хотя этот CodeLab завершается записью решений агента в BigQuery для визуализации, а скрипт генератора событий был относительно простым и вставлял только данные о мошенничестве от одного пользователя, помните, что инструменты агента — это просто функции Python. Это означает, что по мере масштабирования вашей демонстрации и расширения спектра вариантов использования или сценариев ваш агент сможет взаимодействовать с чем угодно.

В производственной среде эту архитектуру можно легко расширить. Вместо простого логирования данных ваш агент может использовать веб-хук для оповещения канала Slack или Teams, инициировать инцидент PagerDuty, записать окончательный вердикт в базу данных с низкой задержкой, такую ​​как Cloud Spanner, или опубликовать новое сообщение Pub/Sub в нижестоящий микросервис для автоматической блокировки скомпрометированной кредитной карты!

11. Уборка

Чтобы избежать дальнейших списаний средств с вашего аккаунта Google Cloud, удалите ресурсы, созданные в ходе этого практического занятия.

В репозитории codelab содержится скрипт очистки, который автоматически удалит ваше развертывание Pub/Sub, набор данных BigQuery, слот резервирования BigQuery, конфигурацию Vertex Agent Engine, промежуточный сегмент Cloud Storage и учетные записи службы IAM.

Если непрерывный запрос BigQuery все еще выполняется, остановите его через пользовательский интерфейс BigQuery в консоли Google Cloud. Затем запустите скрипт очистки:

chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh

В качестве альтернативы вы можете удалить весь проект, если он был создан исключительно для этого практического занятия.

12. Поздравляем!

Поздравляем! Вы создали конвейер обработки данных, управляемый событиями, с использованием BigQuery, Pub/Sub и ADK.

Что вы узнали

  • Как экспортировать аномалии из непрерывного запроса BigQuery в Pub/Sub
  • Как перенаправить преобразованные сообщения Pub/Sub в агент ADK
  • Как развернуть агента и взаимодействовать с ним в Vertex AI Agent Engine

Справочная документация