1. 소개
이 Codelab에서는 BigQuery 연속 쿼리, Pub/Sub, Vertex AI Agent Engine에서 호스팅되는 에이전트 개발 키트 (ADK)를 사용하여 빌드된 사기 조사관 에이전트를 결합하는 이벤트 기반 아키텍처를 빌드합니다.

연속 쿼리가 실시간 소매 거래에서 '불가능한 이동'과 같은 이상치를 감지하고, 이러한 의심스러운 이벤트를 Pub/Sub 주제로 내보내면 ADK 에이전트가 각 이상치를 개별적으로 평가하고 대응하는 파이프라인을 설정합니다.
실습할 내용
- 샘플 거래 데이터로 BigQuery 환경 준비
- BigQuery 연속 쿼리를 만들어 실시간 이상치 감지
- 단일 메시지 변환 (SMT)으로 Pub/Sub 주제 및 구독 설정
- ADK 에이전트를 Vertex AI Agent Engine으로 가져오고, 구성하고, 배포합니다.
- 상담사가 에스컬레이션을 수신하고 처리하는지 확인하기 위해 거래 데이터를 스트리밍합니다.
필요한 항목
- 웹브라우저(예: Chrome)
- 결제가 사용 설정된 Google Cloud 프로젝트
- Google Cloud Shell 액세스
이 Codelab은 BigQuery와 기본 Python에 익숙한 중급 개발자를 대상으로 합니다.
이 Codelab에서 만든 리소스의 비용은 2달러 미만이어야 합니다.
예상 소요 시간: 이 Codelab을 완료하는 데 약 60분이 소요됩니다.
2. 시작하기 전에
Google Cloud 프로젝트 만들기
- Google Cloud 콘솔의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.
- Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.
Cloud Shell 시작
Cloud Shell은 Google Cloud에서 실행되는 명령줄 환경으로, 필요한 도구가 미리 로드되어 제공됩니다.
- Google Cloud 콘솔 상단에서 Cloud Shell 활성화를 클릭합니다.
- Cloud Shell에 연결되면 인증을 확인합니다.
gcloud auth list - 프로젝트가 구성되었는지 확인합니다.
gcloud config get project - 프로젝트가 예상대로 설정되지 않은 경우 설정합니다.
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
프로젝트 ID 설정
다음 명령어를 실행하여 활성 Google Cloud 프로젝트 ID를 검색하고 이 Codelab 전체에서 사용할 환경 변수로 저장합니다.
export PROJECT_ID=$(gcloud config get-value project)
코드 가져오기
다음 명령어를 실행하여 저장소를 클론하고 ADK 에이전트와 설정 스크립트가 포함된 타겟 event_driven_agents_demo 폴더만 다운로드합니다.
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
event_driven_agents_demo 디렉터리로 이동합니다.
cd event_driven_agents_demo
Cloud Shell 편집기를 열면 클론된 저장소 구조가 표시됩니다.

3. 환경 준비
저장소에 제공된 설정 스크립트를 사용하여 Google Cloud 환경을 준비합니다. 이 스크립트는 다음을 수행합니다.
- 에이전트 개발 키트 (ADK) 스테이징을 위해 Google Cloud Storage 버킷을 프로비저닝합니다.
- 쿼리 처리를 위해
CONTINUOUSEnterprise BigQuery 예약을 만듭니다. - BigQuery 데이터 세트를 설정하고 초기
customer_profiles데이터를 로드합니다. - IAM 권한을 구성하고 ADK 에이전트 서비스 계정에 필요한 역할을 부여합니다.
Cloud Shell에서 스크립트를 실행합니다.
chmod +x setup/setup_env.sh
./setup/setup_env.sh
4. ADK 에이전트 검사
이제 ADK 에이전트 코드를 Vertex AI Agent Engine에 배포합니다. 이렇게 하면 데이터 스트리밍을 시작하기 전에 에이전트가 배포되고 에스컬레이션을 처리할 준비가 됩니다.
cd agent
ADK (에이전트 개발 키트) 에이전트 코드 이해하기
핵심 에이전트 로직은 adk_agent_app/agent.py 내에 정의되어 있습니다.
Gemini 2.5 Flash를 사용하여 비정상적인 알림을 자율적으로 조사하는 에이전트를 구성합니다. 에이전트는 알림 페이로드를 분석하고, BigQuery에서 고객 기록을 가져오고, 웹 검색을 통해 판매자 세부정보를 확인한 후 거래를 FALSE_POSITIVE (합법적인 거래) 또는 ESCALATION_NEEDED로 분류합니다.
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
에이전트에는 다음과 같은 두 가지 도구가 있습니다.
BigQueryToolset: 상담사가cymbal_bank데이터 세트를 자율적으로 쿼리하여 추가 거래 내역을 조회할 수 있도록 허용합니다.google_search: 에이전트가 웹을 검색하여 판매자의 평판을 조사하고 합법성을 확인할 수 있습니다.
5. ADK 에이전트 배포
다음 명령어를 실행하여 에이전트 배포에 필요한 Python 패키지 (google-cloud-aiplatform, google-adk 등)를 설치합니다.
pip install -r requirements.txt
다음 명령어를 실행하여 특정 프로젝트 ID가 포함된 .env 파일을 동적으로 생성합니다. 이 파일은 에이전트를 배포할 때 사용됩니다.
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
이제 다음 명령어를 실행하여 에이전트를 Vertex AI Agent Engine에 배포합니다.
python deploy_agent_script.py
참고: deploy_agent_script.py는 BigQueryAgentAnalyticsPlugin를 초기화하며, 이는 추적 데이터와 에이전트 도구 사용량을 BigQuery의 agent_events 테이블에 자동으로 로깅합니다.
완료되기까지 몇 분 정도 걸립니다. 다음과 비슷한 출력이 표시됩니다.
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
다음 명령어를 실행하여 배포된 에이전트 엔드포인트 URL을 agent_endpoint.txt이라는 로컬 파일에 저장합니다.
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
이 URL은 나중에 Pub/Sub 푸시 구독을 만들 때 사용됩니다.
6. ADK 에이전트 테스트
라이브 스트리밍 이벤트를 생성하기 전에 에이전트 엔진의 ADK 에이전트가 수동 에스컬레이션을 올바르게 처리하는지 테스트합니다.
- Google Cloud 콘솔에서 Vertex AI Agent Engine 페이지로 이동합니다.
- 배포된 에이전트의 이름을 클릭합니다 (
Cymbal Bank Fraud Assitant). - 플레이그라운드 탭으로 이동하여 에이전트와 직접 상호작용합니다.
- 채팅 인터페이스에서 에이전트가 Pub/Sub에서 수신할 내용을 모방하는 다음 시뮬레이션된 JSON 이벤트 페이로드를 붙여넣고 Enter 키를 누릅니다.
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
에이전트가 거래를 평가하고 Playground 창에 FALSE POSITIVE 평가로 응답하는지 확인합니다.

7. BigQuery 연속 쿼리를 설정하여 에스컬레이션을 Pub/Sub로 스트리밍
이제 ADK 에이전트가 배포되어 이벤트를 수신할 준비가 되었으므로 루트 디렉터리로 돌아가 파이프라인의 나머지 부분을 빌드합니다.
cd ../../event_driven_agents_demo
1. Pub/Sub 주제 만들기
이 명령어를 실행하여 Pub/Sub 주제를 만듭니다. 이 주제는 BigQuery 연속 쿼리에서 내보낸 이상치를 수신합니다.
gcloud pubsub topics create cymbal-bank-escalations-topic
다음 단계에서 이 주제에 대한 구독을 만듭니다.
2. BigQuery 연속 쿼리 실행
에이전트가 배포되고 Pub/Sub 주제가 준비되면 연속 쿼리를 시작하여 retail_transactions 스트림을 실시간으로 모니터링합니다. 이 쿼리는 '불가능한 이동' 이상을 감지하고 Pub/Sub으로 알림을 내보냅니다.
다음 명령어를 실행하여 쿼리를 시작합니다.
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
터미널에 연속 쿼리가 성공적으로 시작되었음을 나타내는 출력이 표시됩니다.
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
8. 푸시 구독 만들기
이제 에이전트가 배포되고 지속적인 쿼리가 실행되므로 주제의 새로운 이상치 메시지를 에이전트의 웹훅 URL로 직접 적극적으로 전달하는 '푸시' 구독을 만듭니다.
상담사가 올바른 형식으로 데이터를 수신할 수 있도록 단일 메시지 변환 (SMT)을 사용합니다. SMT를 사용하면 메시지가 구독자에게 전송되기 전에 Pub/Sub 내에서 메시지 데이터와 속성을 즉시 가볍게 수정할 수 있습니다.
파이프라인에서 변환이 작동하는 방식은 다음과 같습니다.
- UDF:
setup디렉터리의transform.yaml파일에는 메시지를 처리하는 JavaScript 사용자 정의 함수 (UDF)가 포함되어 있습니다. - BigQuery 데이터 래핑 해제: BigQuery가 연속 쿼리를 통해 Pub/Sub로 데이터를 내보낼 때 JSON 페이로드를 외부 객체로 래핑합니다.
- ADK 형식: UDF는 이중 인코딩을 래핑 해제하고 페이로드를 에이전트 엔진
streamQueryAPI에서 예상하는 엄격한 형식으로 다시 패키징합니다.
다음 명령어를 실행하여 UDF 변환이 적용된 구독을 만듭니다.
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
구독이 생성되었음을 확인하는 출력이 표시됩니다.
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
9. 이벤트 생성
마지막으로 generate_events.py를 실행하여 합성된 '불가능한 이동' 거래를 cymbal_bank.retail_transactions 테이블로 스트리밍하여 엔드 투 엔드 흐름을 테스트합니다.
python simulator/generate_events.py
이 시뮬레이션에서는 이전에 로드한 고객 프로필 데이터 (거주 국가가 미국인 Karen Burton)를 사용하고 오스트레일리아 (AUS)에서 발생하는 새로운 2,500달러 규모의 전자 제품 거래를 시뮬레이션합니다.
이벤트 도착 확인: 연속 쿼리 윈도우 및 ADK 처리를 위해 약 2분 정도 기다린 후 배포된 에이전트의 로그를 확인하여 트리거된 Pub/Sub 메시지가 처리되었는지 확인합니다.

10. BigQuery에서 상담사 실적 분석
BigQuery 콘솔로 이동하여 cymbal_bank 데이터 세트를 선택합니다. agent_events 테이블을 선택하고 미리보기를 클릭합니다.

출력은 에이전트가 '불가능한 이동' 에스컬레이션을 성공적으로 분석했음을 확인해 줍니다.
자율 에이전트는 백그라운드에서 지속적으로 실행되므로 관찰 기능이 중요합니다. 에이전트는 ADK 플러그인을 통해 실행 추적을 자동으로 기록하고 맞춤 도구를 통해 결정을 로깅합니다.
다음 쿼리를 실행하여 에이전트의 결정을 agent_events 표에 캡처된 지연 시간 및 토큰 사용량 측정항목과 조인합니다.
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
다음과 비슷한 결과 테이블이 표시됩니다.

가능한 것의 기술: 이 Codelab은 시각화를 위해 에이전트의 결정을 BigQuery에 로깅하는 것으로 끝나고 이벤트 생성기 스크립트는 비교적 간단하며 단일 사용자의 사기만 삽입했지만, 에이전트 도구는 단순히 Python 함수라는 점을 기억하세요. 즉, 데모가 더 많은 사용 사례나 시나리오로 확장되면 에이전트가 모든 것과 상호작용할 수 있습니다.
프로덕션 환경에서는 이 아키텍처를 쉽게 확장할 수 있습니다. 데이터를 로깅하는 대신 에이전트가 웹훅을 호출하여 Slack 또는 Teams 채널에 알림을 보내거나, PagerDuty 인시던트를 트리거하거나, Cloud Spanner와 같은 지연 시간이 짧은 데이터베이스에 최종 평결을 작성하거나, 다운스트림 마이크로서비스에 새로운 Pub/Sub 메시지를 게시하여 유출된 신용카드를 자동으로 정지할 수 있습니다.
11. 삭제
Google Cloud 계정에 지속적으로 요금이 청구되지 않도록 하려면 이 Codelab 중에 생성된 리소스를 삭제하세요.
Codelab 저장소에는 Pub/Sub 배포, BigQuery 데이터 세트, BigQuery 예약 슬롯, Vertex Agent Engine 구성, Cloud Storage 스테이징 버킷, IAM 서비스 계정을 자동으로 삭제하는 정리 스크립트가 포함되어 있습니다.
아직 실행 중인 경우 Google Cloud 콘솔의 BigQuery UI에서 BigQuery 연속 쿼리를 중지합니다. 그런 다음 정리 스크립트를 실행합니다.
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
또는 이 Codelab용으로만 프로젝트를 만든 경우 전체 프로젝트를 삭제해도 됩니다.
12. 축하합니다
축하합니다. BigQuery, Pub/Sub, ADK를 사용하여 이벤트 기반 데이터 에이전트 파이프라인을 빌드했습니다.
학습한 내용
- BigQuery 연속 쿼리에서 Pub/Sub로 이상치를 내보내는 방법
- 변환된 Pub/Sub 메시지를 ADK 에이전트로 라우팅하는 방법
- Vertex AI Agent Engine에서 에이전트를 배포하고 상호작용하는 방법