1. 소개
이 Codelab에서는 BigQuery 연속 쿼리, Pub/Sub, Vertex AI Agent Engine에서 호스팅되는 에이전트 개발 키트 (ADK)를 사용하여 빌드된 사기 조사 담당 에이전트를 결합하는 이벤트 기반 아키텍처를 빌드합니다.

연속 쿼리가 실시간 소매 거래에서 '불가능한 이동 거리'와 같은 이상치를 감지하고 이러한 의심스러운 이벤트를 Pub/Sub 주제로 내보내는 파이프라인을 설정합니다. 그러면 ADK 에이전트가 트리거되어 각 이상치를 개별적으로 평가하고 대응합니다.
실습할 내용
- 샘플 거래 데이터로 BigQuery 환경 준비
- 실시간 이상치를 감지하는 BigQuery 연속 쿼리 만들기
- 단일 메시지 변환 (SMT)으로 Pub/Sub 주제 및 구독 설정
- Vertex AI Agent Engine으로 ADK 에이전트 가져오기, 구성, 배포
- 거래 데이터를 스트리밍하여 에이전트가 에스컬레이션을 수신하고 처리하는지 확인
필요한 항목
- 웹브라우저(예: Chrome)
- 결제가 사용 설정된 Google Cloud 프로젝트
- Google Cloud Shell에 액세스
이 Codelab은 BigQuery 및 기본 Python에 익숙한 중급 개발자를 대상으로 합니다.
이 Codelab에서 만든 리소스의 비용은 $2 미만이어야 합니다.
예상 소요 시간: 이 Codelab을 완료하는 데 약 60분이 소요됩니다.
2. 시작하기 전에
Google Cloud 프로젝트 만들기
- Google Cloud 콘솔의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.
- Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.
Cloud Shell 시작
Cloud Shell 은 Google Cloud에서 실행되며 필요한 도구가 사전 로드된 명령줄 환경입니다.
- Google Cloud 콘솔 상단에서 Cloud Shell 활성화 를 클릭합니다.
- Cloud Shell에 연결되면 인증을 확인합니다.
gcloud auth list - 프로젝트가 구성되었는지 확인합니다.
gcloud config get project - 프로젝트가 예상대로 설정되지 않은 경우 설정합니다.
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
프로젝트 ID 설정
다음 명령어를 실행하여 활성 Google Cloud 프로젝트 ID를 가져오고 이 Codelab 전반에서 사용할 환경 변수로 저장합니다.
export PROJECT_ID=$(gcloud config get-value project)
코드 가져오기
이 명령어를 실행하여 저장소를 클론하고 ADK 에이전트와 설정 스크립트가 포함된 대상 event_driven_agents_demo 폴더만 다운로드합니다.
git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo
event_driven_agents_demo 디렉터리로 이동합니다.
cd event_driven_agents_demo
Cloud Shell 편집기를 열면 클론된 저장소 구조를 볼 수 있습니다.

3. 환경 준비
저장소에 제공된 설정 스크립트를 사용하여 Google Cloud 환경을 준비합니다. 이 스크립트는 다음을 실행합니다.
- 에이전트 개발 키트 (ADK)를 스테이징하기 위한 Google Cloud Storage 버킷을 프로비저닝 합니다.
- 쿼리 처리를 위한
CONTINUOUSEnterprise BigQuery 예약을 만듭니다. - BigQuery 데이터 세트를 설정 하고 초기
customer_profiles데이터를 로드합니다. - IAM 권한을 구성 하고 ADK 에이전트 서비스 계정에 필요한 역할을 부여합니다.
Cloud Shell에서 스크립트를 실행합니다.
chmod +x setup/setup_env.sh
./setup/setup_env.sh
4. ADK 에이전트 검사
이제 ADK 에이전트 코드를 Vertex AI Agent Engine에 배포합니다. 이렇게 하면 데이터 스트리밍을 시작하기 전에 에이전트가 배포되고 에스컬레이션을 처리할 준비가 됩니다.
cd agent
ADK (에이전트 개발 키트) 에이전트 코드 이해
핵심 에이전트 로직은 adk_agent_app/agent.py 내에 정의됩니다.
Gemini 2.5 Flash를 사용하여 이상 알림을 자율적으로 조사하는 에이전트를 구성합니다. 에이전트는 알림 페이로드를 분석하고, BigQuery에서 고객 기록을 가져오고, 웹 검색을 통해 판매자 세부정보를 확인한 후 거래를 FALSE_POSITIVE (정상 거래) 또는 ESCALATION_NEEDED로 분류합니다.
# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
model="gemini-2.5-flash",
name="Fraud_Investigation_Agent",
description="Expert fraud analyst agent that autonomously investigates alerts...",
instruction=(
"You are an expert fraud investigator for Cymbal Bank. "
"Your goal is to investigate financial transaction alerts, "
"determine if they are fraudulent, and take appropriate action. "
"Calculate risk, assess the logic_signals provided in the input, "
"query the database for past transactions, and search the merchant..."
),
tools=[
bigquery_toolset,
google_search,
],
)
에이전트에는 두 가지 고유한 도구가 있습니다.
BigQueryToolset: 에이전트가cymbal_bank데이터 세트를 자율적으로 쿼리하여 추가 거래 내역을 조회할 수 있습니다.google_search: 에이전트가 웹을 검색하여 판매자의 평판을 조사하고 합법성을 확인할 수 있습니다.
5. ADK 에이전트 배포
다음 명령어를 실행하여 에이전트 배포에 필요한 Python 패키지 (google-cloud-aiplatform, google-adk 등)를 설치합니다.
pip install -r requirements.txt
다음 명령어를 실행하여 특정 프로젝트 ID가 포함된 .env 파일을 동적으로 생성합니다. 이 파일은 에이전트를 배포할 때 사용됩니다.
cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF
이제 이 명령어를 실행하여 에이전트를 Vertex AI Agent Engine에 배포합니다.
python deploy_agent_script.py
참고: deploy_agent_script.py는 추적 데이터와 에이전트 도구 사용량을 BigQuery의 agent_events 테이블에 자동으로 로깅하는 BigQueryAgentAnalyticsPlugin을 초기화합니다.
완료되기까지 몇 분 정도 걸립니다. 다음과 비슷한 출력이 표시됩니다.
Deploying Agent... Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/... ================================================================================ Pub/Sub Push Endpoint URL: https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery ================================================================================
이 명령어를 실행하여 배포된 에이전트 엔드포인트 URL을 agent_endpoint.txt라는 로컬 파일에 저장합니다.
export AGENT_ENDPOINT=$(cat agent_endpoint.txt)
이 URL은 나중에 Pub/Sub 푸시 구독을 만들 때 사용합니다.
6. ADK 에이전트 테스트
실시간 스트리밍 이벤트를 생성하기 전에 Agent Engine의 ADK 에이전트가 수동 에스컬레이션을 올바르게 처리하는지 테스트합니다.
- Google Cloud 콘솔에서 Vertex AI Agent Engine 페이지로 이동합니다.
- 배포된 에이전트 이름 (
Cymbal Bank Fraud Assitant)을 클릭합니다. - Playground 탭으로 이동하여 에이전트와 직접 상호작용합니다.
- 채팅 인터페이스에서 에이전트가 Pub/Sub에서 수신할 내용을 모방하는 다음 시뮬레이션된 JSON 이벤트 페이로드를 붙여넣고 Enter 키를 누릅니다.
{ "window_end": "2026-03-15T10:00:00Z", "user_id": "user_39175", "customer_name": "Jonathan Mckinney", "tx_count": 1, "total_window_spend": 15.0, "highest_value_merchant": "Google One Subscription", "highest_value_mcc": "5732", "contains_international_tx": false, "contains_untrusted_device_tx": false, "final_risk_score": 2, "logic_signals": { "is_impossible_travel": false, "has_security_mismatch": false, "is_high_velocity": false } }
에이전트가 거래를 평가하고 Playground 창에서 FALSE POSITIVE 평가로 응답하는지 확인합니다.

7. 에스컬레이션을 Pub/Sub로 스트리밍하는 BigQuery 연속 쿼리 설정
이제 ADK 에이전트가 배포되고 이벤트를 수신할 준비가 되었으므로 루트 디렉터리로 돌아가서 나머지 파이프라인을 빌드해 보겠습니다.
cd ../../event_driven_agents_demo
1. Pub/Sub 주제 만들기
이 명령어를 실행하여 Pub/Sub 주제를 만듭니다. 이 주제는 BigQuery 연속 쿼리에서 내보낸 이상치를 수신합니다.
gcloud pubsub topics create cymbal-bank-escalations-topic
다음 단계에서 이 주제에 대한 구독을 만듭니다.
2. BigQuery 연속 쿼리 실행
에이전트가 배포되고 Pub/Sub 주제가 준비되면 연속 쿼리를 시작하여 retail_transactions 스트림을 실시간으로 모니터링합니다. 이 쿼리는 '불가능한 이동 거리' 이상치를 감지하고 Pub/Sub로 알림을 내보냅니다.
다음 명령어를 실행하여 쿼리를 시작합니다.
sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql
bq query \
--use_legacy_sql=false \
--continuous=true \
--sync=false \
--connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
"$(cat setup/continuous_query.sql)"
연속 쿼리가 성공적으로 시작되었음을 나타내는 출력이 터미널에 표시됩니다.
Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1
8. 푸시 구독 만들기
이제 에이전트가 배포되고 연속 쿼리가 실행 중이므로 주제의 새 이상 메시지를 에이전트의 웹훅 URL로 직접 적극적으로 전달하는 '푸시' 구독을 만듭니다.
에이전트가 올바른 형식으로 데이터를 수신하도록 단일 메시지 변환 (SMT) 을 사용합니다. SMT를 사용하면 구독자에게 전달되기 전에 Pub/Sub 내에서 메시지 데이터와 속성을 즉석에서 가볍게 수정할 수 있습니다.
변환은 파이프라인에서 다음과 같이 작동합니다.
- UDF:
setup디렉터리의transform.yaml파일에는 메시지를 처리하는 JavaScript 사용자 정의 함수 (UDF)가 포함되어 있습니다. - BigQuery 데이터 언래핑: BigQuery가 연속 쿼리를 통해 Pub/Sub로 데이터를 내보낼 때 JSON 페이로드를 외부 객체로 래핑합니다.
- ADK 형식 지정: UDF는 이중 인코딩을 언래핑하고 페이로드를 Agent Engine
streamQueryAPI에서 예상하는 엄격한 형식으로 다시 패키징합니다.
다음 명령어를 실행하여 UDF 변환이 적용된 구독을 만듭니다.
gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
--topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
--message-transforms-file=setup/transform.yaml \
--push-endpoint="$AGENT_ENDPOINT" \
--push-no-wrapper \
--push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--ack-deadline=600
구독이 생성되었음을 확인하는 출력이 표시됩니다.
Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].
9. 이벤트 생성
마지막으로 generate_events.py를 실행하여 합성 '불가능한 이동 거리' 거래를 cymbal_bank.retail_transactions 테이블로 스트리밍하여 엔드 투 엔드 흐름을 테스트합니다.
python simulator/generate_events.py
이전 (미국이 거주지인 Karen Burton)에 로드한 고객 프로필 데이터를 사용하고 오스트레일리아 (AUS)에서 발생하는 새로운 $2,500 전자제품 거래를 시뮬레이션합니다.
이벤트 도착 확인: 연속 쿼리 윈도우 처리 및 ADK 처리를 위해 약 2분 정도 기다린 후 배포된 에이전트의 로그를 확인하여 트리거된 Pub/Sub 메시지를 처리했는지 확인합니다.

10. BigQuery에서 에이전트 성능 분석
BigQuery 콘솔로 이동하여 cymbal_bank 데이터 세트를 선택합니다. agent_events 테이블을 선택하고 미리보기를 클릭합니다.

출력은 에이전트가 '불가능한 이동 거리' 에스컬레이션을 성공적으로 분석했음을 확인합니다.
자율 에이전트는 백그라운드에서 지속적으로 실행되므로 관측 가능성이 중요합니다. 에이전트는 ADK 플러그인을 통해 실행 추적을 자동으로 기록하고 커스텀 도구를 통해 결정을 로깅합니다.
다음 쿼리를 실행하여 에이전트의 결정을 agent_events 테이블에 캡처된 지연 시간 및 토큰 사용량 측정항목과 조인합니다.
-- Create session-level metrics from detailed agent events
SELECT
MAX(d.timestamp) AS decision_time,
ANY_VALUE(d.user_id) AS user_id,
ANY_VALUE(d.merchant) AS merchant,
ANY_VALUE(d.decision) AS decision,
ANY_VALUE(d.summary) AS summary,
-- Calculate latency in seconds
TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
-- Aggregate total tokens from LLM calls
SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
-- Count total events logged to represent the agent's complex reasoning steps
COUNT(e.session_id) AS agent_reasoning_steps,
-- Count total tool calls
COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count
FROM
`cymbal_bank.agent_decisions` d
JOIN
`cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY
d.session_id
ORDER BY
decision_time DESC
다음과 비슷한 결과 테이블이 채워져 표시됩니다.

가능한 작업: 이 Codelab은 시각화를 위해 에이전트의 결정을 BigQuery에 로깅하는 것으로 끝나고 이벤트 생성기 스크립트는 비교적 간단하며 단일 사용자의 사기만 삽입했지만 에이전트 도구는 단순히 Python 함수라는 점을 기억하세요. 즉, 데모가 더 많은 사용 사례 또는 시나리오로 확장됨에 따라 에이전트는 모든 항목과 상호작용할 수 있습니다.
프로덕션 환경에서는 이 아키텍처를 쉽게 확장할 수 있습니다. 에이전트는 데이터를 로깅하는 대신 웹훅을 호출하여 Slack 또는 Teams 채널에 알리거나, PagerDuty 인시던트를 트리거하거나, 최종 평결을 Cloud Spanner와 같은 지연 시간이 짧은 데이터베이스에 쓰거나, 다운스트림 마이크로서비스에 새 Pub/Sub 메시지를 게시하여 손상된 신용카드를 자동으로 정지할 수 있습니다.
11. 정리
Google Cloud 계정에 지속적으로 비용이 청구되지 않도록 하려면 이 Codelab 중에 만든 리소스를 삭제합니다.
Codelab 저장소에는 Pub/Sub 배포, BigQuery 데이터 세트, BigQuery 예약 슬롯, Vertex Agent Engine 구성, Cloud Storage 스테이징 버킷, IAM 서비스 계정을 자동으로 삭제하는 정리 스크립트가 포함되어 있습니다.
BigQuery 연속 쿼리가 아직 실행 중인 경우 Google Cloud 콘솔의 BigQuery UI에서 중지합니다. 그런 다음 정리 스크립트를 실행합니다.
chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh
또는 이 Codelab을 위해서만 프로젝트를 만든 경우 전체 프로젝트를 삭제할 수도 있습니다.
12. 축하합니다
축하합니다. BigQuery, Pub/Sub, ADK를 사용하여 이벤트 기반 데이터 에이전트 파이프라인을 빌드했습니다.
학습한 내용
- BigQuery 연속 쿼리에서 Pub/Sub로 이상치를 내보내는 방법
- 변환된 Pub/Sub 메시지를 ADK 에이전트로 라우팅하는 방법
- Vertex AI Agent Engine에서 에이전트를 배포하고 상호작용하는 방법