1. Введение
В этом практическом занятии вы создадите архитектуру, управляемую событиями, которая объединит непрерывные запросы BigQuery, модель Pub/Sub и агента для расследования мошенничеств, разработанного с использованием комплекта разработки агентов (ADK), размещенного на платформе Vertex AI Agent Engine.

Вам потребуется настроить конвейер обработки данных, в котором непрерывный запрос будет обнаруживать аномалии (например, "Невозможное путешествие") в розничных транзакциях в режиме реального времени, экспортировать эти подозрительные события в топик Pub/Sub, который затем запустит агент ADK для оценки каждой аномалии и реагирования на нее индивидуально.
Что вы будете делать
- Подготовьте среду BigQuery, используя примеры данных транзакций.
- Создайте непрерывный запрос BigQuery для обнаружения аномалий в реальном времени.
- Настройте тему и подписку Pub/Sub с использованием преобразования отдельных сообщений (SMT).
- Загрузите, настройте и разверните агента ADK в Vertex AI Agent Engine.
- Передача данных о транзакциях для подтверждения того, что агент получает и обрабатывает запросы на эскалацию.
Что вам понадобится
- Веб-браузер, например Chrome.
- Проект Google Cloud с включенной функцией выставления счетов.
- Доступ к Google Cloud Shell
Этот практический семинар предназначен для разработчиков среднего уровня, знакомых с BigQuery и основами Python.
Стоимость ресурсов, созданных в рамках этого практического занятия, должна составлять менее 2 долларов.
Ориентировочная продолжительность: выполнение этого практического задания займет приблизительно 60 минут.
2. Прежде чем начать
Создайте проект в Google Cloud.
- В консоли Google Cloud на странице выбора проекта выберите или создайте проект Google Cloud .
- Убедитесь, что для вашего облачного проекта включена функция выставления счетов. Узнайте, как проверить, включена ли функция выставления счетов для проекта .
Запустить Cloud Shell
Cloud Shell — это среда командной строки, работающая в Google Cloud и поставляемая с предустановленными необходимыми инструментами.
- В верхней части консоли Google Cloud нажмите кнопку «Активировать Cloud Shell» .
- После подключения к Cloud Shell подтвердите свою аутентификацию:
gcloud auth list - Убедитесь, что ваш проект настроен:
gcloud config get project - Если параметры вашего проекта заданы не так, как ожидалось, настройте их следующим образом:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Укажите идентификатор вашего проекта.
Выполните следующую команду, чтобы получить идентификатор вашего активного проекта Google Cloud и сохранить его в качестве переменной среды для использования на протяжении всего этого практического занятия:
export PROJECT_ID=$(gcloud config get-value project)
Получить код
Выполните эту команду, чтобы клонировать репозиторий и загрузить только целевую папку event_driven_agents_demo , которая содержит агент ADK и скрипты настройки:
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
Перейдите в каталог event_driven_agents_demo :
cd event_driven_agents_demo
Если вы откроете редактор Cloud Shell, вы сможете увидеть структуру клонированного репозитория:

3. Подготовка окружающей среды
Вы подготовите свою среду Google Cloud, используя скрипт настройки, предоставленный в репозитории. Этот скрипт:
- Создает хранилище Google Cloud Storage для подготовки комплекта разработчика агента (ADK).
- Создает
CONTINUOUSрезервирование в рамках корпоративного BigQuery для обработки запросов. - Создает набор данных BigQuery и загружает исходные данные
customer_profiles - Настраивает разрешения IAM и предоставляет необходимые роли учетной записи службы агента ADK.
Запустите скрипт из вашей оболочки Cloud Shell:
chmod +x setup/setup_env.sh
./setup/setup_env.sh
4. Проверьте агент ADK.
Теперь вам нужно развернуть код агента ADK в Vertex AI Agent Engine. Это необходимо сделать в первую очередь, чтобы ваш агент был развернут и готов к обработке запросов на эскалацию до начала потоковой передачи данных.
cd agent
Понимание кода агента ADK (Agent Development Kit).
Основная логика работы агента определена в файле adk_agent_app/agent.py .
Мы создаём агента, который использует Gemini 2.5 Flash для автономного расследования аномальных оповещений. Агент анализирует содержимое оповещения, извлекает историю клиента из BigQuery и проверяет данные продавца с помощью веб-поиска, прежде чем классифицировать транзакцию как FALSE_POSITIVE (легитимная транзакция) или ESCALATION_NEEDED .
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
Агент оснащен двумя различными инструментами:
-
BigQueryToolset: Позволяет агенту автономно запрашивать набор данныхcymbal_bankдля поиска дополнительной истории транзакций. -
google_search: Позволяет агенту осуществлять поиск в интернете для проверки репутации продавца и подтверждения его легитимности.
5. Разверните агент ADK.
Выполните следующую команду, чтобы установить необходимые пакеты Python ( google-cloud-aiplatform , google-adk и т. д.) для развертывания агента:
pip install -r requirements.txt
Выполните следующую команду, чтобы динамически сгенерировать файл .env , содержащий идентификатор вашего проекта; он будет использоваться при развертывании агента:
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
Теперь выполните эту команду, чтобы развернуть агента в Vertex AI Agent Engine:
python deploy_agent_script.py
Примечание: скрипт deploy_agent_script.py инициализирует плагин BigQueryAgentAnalyticsPlugin , который автоматически записывает данные трассировки и информацию об использовании инструментов агента в таблицу agent_events в BigQuery.
Выполнение этой процедуры займет несколько минут. В результате вы должны увидеть примерно следующий вывод:
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
Выполните эту команду, чтобы сохранить URL-адрес конечной точки развернутого агента в локальный файл с именем agent_endpoint.txt :
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
Мы будем использовать этот URL-адрес позже при создании нашей подписки Pub/Sub.
6. Протестируйте агент ADK.
Перед созданием событий для потоковой передачи в реальном времени проверьте, корректно ли агент ADK в Agent Engine обрабатывает ручные запросы на эскалацию.
- В консоли Google Cloud перейдите на страницу Vertex AI Agent Engine .
- Щелкните по имени вашего развернутого агента (
Cymbal Bank Fraud Assitant). - Перейдите на вкладку «Игровая площадка» , чтобы напрямую взаимодействовать с агентом.
- В интерфейсе чата вставьте следующий смоделированный JSON-код события, имитирующий то, что агент получит от системы Pub/Sub, и нажмите Enter:
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
Убедитесь, что агент оценивает транзакцию и выдает FALSE POSITIVE оценку в окне Playground:

7. Настройте непрерывный запрос BigQuery для потоковой передачи запросов на эскалацию в Pub/Sub.
Теперь, когда наш агент ADK развернут и готов к приему событий, давайте вернемся в корневой каталог и построим остальную часть конвейера:
cd ../../event_driven_agents_demo
1. Создайте тему для публикации/подтемы.
Выполните эту команду, чтобы создать тему Pub/Sub. Эта тема будет получать аномалии, экспортированные из BigQuery Continuous Query:
gcloud pubsub topics create cymbal-bank-escalations-topic
На следующем шаге мы оформим подписку на эту тему.
2. Выполните непрерывный запрос BigQuery.
После развертывания агента и подготовки темы Pub/Sub запустите непрерывный запрос для мониторинга потока retail_transactions в режиме реального времени. Этот запрос обнаруживает аномалии, связанные с "невозможными поездками", и экспортирует оповещения в Pub/Sub.
Для начала выполнения запроса выполните следующую команду:
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
В терминале должно отобразиться сообщение, указывающее на успешный запуск непрерывного запроса:
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
8. Создайте подписку на push-уведомления.
Теперь, когда ваш агент развернут и непрерывный запрос запущен, вам нужно создать подписку "Push", чтобы активно пересылать все новые сообщения об аномалиях из темы непосредственно на URL-адрес веб-перехватчика вашего агента.
Чтобы гарантировать получение агентом данных в правильном формате, мы будем использовать Single Message Transform (SMT) . SMT позволяют вносить незначительные изменения в данные и атрибуты сообщений непосредственно в Pub/Sub на лету, до их доставки подписчику.
Вот как происходит преобразование в нашем конвейере:
- Пользовательская функция (UDF): файл
transform.yamlв каталогеsetupсодержит пользовательскую функцию JavaScript, которая будет обрабатывать сообщения. - Распаковка данных BigQuery: Когда BigQuery экспортирует данные в Pub/Sub посредством непрерывного запроса, он оборачивает полезную нагрузку JSON во внешний объект.
- Форматирование для ADK: Пользовательская функция (UDF) расшифровывает двойное кодирование и переупаковывает полезную нагрузку в строгий формат, ожидаемый API
streamQueryAgent Engine.
Выполните следующую команду, чтобы создать подписку с применением преобразования UDF:
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
Вы должны увидеть сообщение, подтверждающее создание подписки:
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
9. Генерация событий
Наконец, протестируйте весь процесс от начала до конца, запустив generate_events.py для потоковой передачи синтетической транзакции "Невозможное путешествие" в таблицу cymbal_bank.retail_transactions :
python simulator/generate_events.py
В модели используются данные профиля клиента, которые мы загрузили ранее (Карен Бертон, проживающая в США), и моделируется новая транзакция по покупке электроники на сумму 2500 долларов США, происходящая в Австралии (AUS).
Убедитесь, что событие получено: подождите приблизительно две минуты для непрерывного формирования окна запроса и обработки ADK, затем проверьте журналы развернутого агента, чтобы убедиться, что он обработал сообщение Pub/Sub, которое было инициировано.

10. Анализ производительности агентов в BigQuery
Перейдите в консоль BigQuery и выберите набор данных cymbal_bank . Выберите таблицу agent_events и нажмите «Предварительный просмотр»:

Полученные данные подтверждают, что агент успешно проанализировал ситуацию эскалации "Невозможное путешествие".
Поскольку автономные агенты постоянно работают в фоновом режиме, возможность наблюдения имеет решающее значение. Ваш агент автоматически записывает трассировки выполнения с помощью плагина ADK и регистрирует принятые решения с помощью пользовательского инструмента.
Выполните следующий запрос, чтобы объединить решения вашего агента с метриками задержки и использования токенов, собранными в таблице agent_events :
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
Вы должны увидеть заполненную таблицу результатов, которая будет выглядеть примерно так:

Искусство возможного: Хотя этот CodeLab завершается записью решений агента в BigQuery для визуализации, а скрипт генератора событий был относительно простым и вставлял только данные о мошенничестве от одного пользователя, помните, что инструменты агента — это просто функции Python. Это означает, что по мере масштабирования вашей демонстрации и расширения спектра вариантов использования или сценариев ваш агент сможет взаимодействовать с чем угодно.
В производственной среде эту архитектуру можно легко расширить. Вместо простого логирования данных ваш агент может использовать веб-хук для оповещения канала Slack или Teams, инициировать инцидент PagerDuty, записать окончательный вердикт в базу данных с низкой задержкой, такую как Cloud Spanner, или опубликовать новое сообщение Pub/Sub в нижестоящий микросервис для автоматической блокировки скомпрометированной кредитной карты!
11. Уборка
Чтобы избежать дальнейших списаний средств с вашего аккаунта Google Cloud, удалите ресурсы, созданные в ходе этого практического занятия.
В репозитории codelab содержится скрипт очистки, который автоматически удалит ваше развертывание Pub/Sub, набор данных BigQuery, слот резервирования BigQuery, конфигурацию Vertex Agent Engine, промежуточный сегмент Cloud Storage и учетные записи службы IAM.
Если непрерывный запрос BigQuery все еще выполняется, остановите его через пользовательский интерфейс BigQuery в консоли Google Cloud. Затем запустите скрипт очистки:
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
В качестве альтернативы вы можете удалить весь проект, если он был создан исключительно для этого практического занятия.
12. Поздравляем!
Поздравляем! Вы создали конвейер обработки данных, управляемый событиями, с использованием BigQuery, Pub/Sub и ADK.
Что вы узнали
- Как экспортировать аномалии из непрерывного запроса BigQuery в Pub/Sub
- Как перенаправить преобразованные сообщения Pub/Sub в агент ADK
- Как развернуть агента и взаимодействовать с ним в Vertex AI Agent Engine