Tworzenie agenta danych opartego na zdarzeniach za pomocą BigQuery i ADK

1. Wprowadzenie

W tym ćwiczeniu utworzysz architekturę opartą na zdarzeniach, która łączy ciągłe zapytania BigQuery, Pub/Sub i agenta do wykrywania oszustw utworzonego za pomocą pakietu Agent Development Kit (ADK) i hostowanego w Vertex AI Agent Engine.

Architektura agenta danych opartego na zdarzeniach

Skonfigurujesz potok, w którym zapytanie ciągłe wykrywa anomalie (np. „niemożliwe podróże”) w transakcjach detalicznych w czasie rzeczywistym, eksportuje te podejrzane zdarzenia do tematu Pub/Sub, który następnie wywołuje agenta pakietu ADK, aby ocenić każdą anomalię i na nią zareagować.

Jakie zadania wykonasz

  • Przygotowywanie środowiska BigQuery z przykładowymi danymi transakcji
  • Tworzenie zapytania ciągłego BigQuery do wykrywania anomalii w czasie rzeczywistym
  • Konfigurowanie tematu i subskrypcji Pub/Sub z przekształceniami pojedynczych wiadomości (SMT)
  • Pobieranie, konfigurowanie i wdrażanie agenta ADK w Vertex AI Agent Engine
  • Przesyłaj strumieniowo dane transakcji, aby sprawdzić, czy agent otrzymuje i przetwarza eskalacje.

Czego potrzebujesz

  • przeglądarka, np. Chrome;
  • projekt Google Cloud z włączonymi płatnościami;
  • Dostęp do Google Cloud Shell

To ćwiczenie jest przeznaczone dla średnio zaawansowanych programistów, którzy znają BigQuery i podstawy Pythona.

Zasoby utworzone w tym laboratorium powinny kosztować mniej niż 2 PLN.

Szacowany czas trwania: ukończenie tego laboratorium zajmie około 60 minut.

2. Zanim zaczniesz

Tworzenie projektu Google Cloud

  1. W konsoli Google Cloud na stronie selektora projektu wybierz lub utwórz projekt w chmurze Google.
  2. Sprawdź, czy w projekcie Cloud włączone są płatności. Dowiedz się, jak sprawdzić, czy w projekcie są włączone płatności.

Uruchamianie Cloud Shell

Cloud Shell to środowisko wiersza poleceń działające w Google Cloud, które zawiera niezbędne narzędzia.

  1. Kliknij Aktywuj Cloud Shell u góry konsoli Google Cloud.
  2. Po połączeniu z Cloud Shell sprawdź uwierzytelnianie:
    gcloud auth list
    
  3. Sprawdź, czy projekt jest skonfigurowany:
    gcloud config get project
    
  4. Jeśli projekt nie jest ustawiony zgodnie z oczekiwaniami, ustaw go:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Ustawianie identyfikatora projektu

Uruchom to polecenie, aby pobrać identyfikator aktywnego projektu Google Cloud i zapisać go jako zmienną środowiskową, której będziesz używać podczas naszych ćwiczeń z programowania:

export PROJECT_ID=$(gcloud config get-value project)

Pobierz kod

Uruchom to polecenie, aby sklonować repozytorium i pobrać tylko folder docelowy event_driven_agents_demo, który zawiera agenta ADK i skrypty konfiguracyjne:

git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/devrel-demos.git temp-repo && cd temp-repo && git sparse-checkout set data-analytics/event_driven_agents_demo && cd .. && mv temp-repo/data-analytics/event_driven_agents_demo . && rm -rf temp-repo

Przejdź do katalogu event_driven_agents_demo:

cd event_driven_agents_demo

Jeśli otworzysz edytor Cloud Shell, powinna być widoczna struktura sklonowanego repozytorium:

Folder z agentem ADK i skryptami konfiguracji

3. Przygotowywanie środowiska

Przygotuj środowisko Google Cloud za pomocą skryptu konfiguracyjnego dostępnego w repozytorium. Ten skrypt:

  • Udostępnia zasobnik Cloud Storage na potrzeby pakietu Agent Development Kit (ADK).
  • Tworzy CONTINUOUS rezerwację BigQuery Enterprise na potrzeby przetwarzania zapytań.
  • Konfiguruje zbiór danych BigQuery i wczytuje początkowe customer_profiles dane.
  • Konfiguruje uprawnienia i przypisuje niezbędne role do konta usługi agenta ADK.

Uruchom skrypt w Cloud Shell:

chmod +x setup/setup_env.sh
./setup/setup_env.sh

4. Sprawdzanie agenta ADK

Teraz wdrożysz kod agenta ADK w Vertex AI Agent Engine. Dzięki temu agent zostanie wdrożony i będzie gotowy do obsługi eskalacji, zanim zaczniesz przesyłać dane strumieniowo.

cd agent

Informacje o kodzie agenta pakietu ADK (Agent Development Kit)

Podstawowa logika agenta jest zdefiniowana w adk_agent_app/agent.py.

Tworzymy agenta, który używa modelu Gemini 2.5 Flash do samodzielnego analizowania nietypowych alertów. Agent analizuje ładunek alertu, pobiera historię klienta z BigQuery i weryfikuje szczegóły sprzedawcy za pomocą wyszukiwania w internecie, a następnie klasyfikuje transakcję jako FALSE_POSITIVE (prawidłową transakcję) lub ESCALATION_NEEDED.

# Excerpt from agent/adk_agent_app/agent.py
investigation_agent = Agent(
    model="gemini-2.5-flash",
    name="Fraud_Investigation_Agent",
    description="Expert fraud analyst agent that autonomously investigates alerts...",
    instruction=(
        "You are an expert fraud investigator for Cymbal Bank. "
        "Your goal is to investigate financial transaction alerts, "
        "determine if they are fraudulent, and take appropriate action. "
        "Calculate risk, assess the logic_signals provided in the input, "
        "query the database for past transactions, and search the merchant..."
    ),
    tools=[
        bigquery_toolset,
        google_search,
    ],
)

Agent jest wyposażony w 2 różne narzędzia:

  1. BigQueryToolset: umożliwia agentowi samodzielne wysyłanie zapytań do zbioru danych cymbal_bank w celu wyszukania dodatkowej historii transakcji.
  2. google_search: umożliwia agentowi przeszukiwanie internetu w celu sprawdzenia reputacji sprzedawcy i potwierdzenia jego wiarygodności.

5. Wdrażanie agenta ADK

Aby zainstalować wymagane pakiety Pythona (google-cloud-aiplatform, google-adk itp.) do wdrożenia agenta, wykonaj to polecenie:

pip install -r requirements.txt

Uruchom to polecenie, aby dynamicznie wygenerować plik .env zawierający Twój identyfikator projektu. Będzie on używany podczas wdrażania agenta:

cat <<EOF > .env
PROJECT_ID=$PROJECT_ID
LOCATION=us-central1
STAGING_BUCKET=gs://$PROJECT_ID-adk-staging
SERVICE_ACCOUNT=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com
BIGQUERY_DATASET=cymbal_bank
GOOGLE_GENAI_USE_VERTEXAI=1
EOF

Teraz uruchom to polecenie, aby wdrożyć agenta w Vertex AI Agent Engine:

python deploy_agent_script.py

Uwaga: funkcja deploy_agent_script.py inicjuje funkcję BigQueryAgentAnalyticsPlugin, która automatycznie rejestruje dane śledzenia i użycie narzędzia agenta w tabeli agent_events w BigQuery.

Ich wykonanie może potrwać kilka minut. Dane wyjściowe powinny być podobne do tych:

Deploying Agent...
Deployed Resource Name: projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...

================================================================================
Pub/Sub Push Endpoint URL:
https://<REGION>-aiplatform.googleapis.com/v1/projects/<YOUR_PROJECT_ID>/locations/<REGION>/reasoningEngines/...:streamQuery
================================================================================

Aby zapisać adres URL wdrożonego punktu końcowego agenta w pliku lokalnym o nazwie agent_endpoint.txt, uruchom to polecenie:

export AGENT_ENDPOINT=$(cat agent_endpoint.txt)

Użyjemy tego adresu URL później podczas tworzenia subskrypcji push Pub/Sub.

6. Testowanie agenta ADK

Przed wygenerowaniem wydarzeń transmisji na żywo sprawdź, czy agent ADK w Agent Engine prawidłowo obsługuje ręczne przekazywanie zgłoszeń.

  1. W konsoli Google Cloud otwórz stronę Vertex AI Agent Engine.
  2. Kliknij nazwę wdrożonego agenta (Cymbal Bank Fraud Assitant).
  3. Aby bezpośrednio wejść w interakcję z agentem, otwórz kartę Playground.
  4. W interfejsie czatu wklej ten symulowany ładunek zdarzenia JSON, który naśladuje to, co agent otrzyma z Pub/Sub, i naciśnij Enter:
    {
      "window_end": "2026-03-15T10:00:00Z",
      "user_id": "user_39175",
      "customer_name": "Jonathan Mckinney",
      "tx_count": 1,
      "total_window_spend": 15.0,
      "highest_value_merchant": "Google One Subscription",
      "highest_value_mcc": "5732",
      "contains_international_tx": false,
      "contains_untrusted_device_tx": false,
      "final_risk_score": 2,
      "logic_signals": {
        "is_impossible_travel": false,
        "has_security_mismatch": false,
        "is_high_velocity": false
      }
    }
    

Sprawdź, czy agent ocenia transakcję i odpowiada z FALSE POSITIVE oceną w oknie Playground:

Platforma testowa Vertex AI Agent Engine

7. Konfigurowanie ciągłego zapytania BigQuery do przesyłania eskalacji strumieniowo do Pub/Sub

Agent ADK jest już wdrożony i gotowy do odbierania zdarzeń. Wróćmy do katalogu głównego i zbudujmy resztę potoku:

cd ../../event_driven_agents_demo

1. Tworzenie tematu Pub/Sub

Uruchom to polecenie, aby utworzyć temat Pub/Sub. Ten temat będzie otrzymywać anomalie wyeksportowane z zapytania ciągłego BigQuery:

gcloud pubsub topics create cymbal-bank-escalations-topic

W następnym kroku utworzymy subskrypcję tego tematu.

2. Uruchamianie zapytania ciągłego BigQuery

Po wdrożeniu agenta i przygotowaniu tematu Pub/Sub rozpocznij zapytanie ciągłe, aby monitorować strumień retail_transactions w czasie rzeczywistym. To zapytanie wykrywa anomalie „Impossible Travel” i eksportuje alerty do Pub/Sub.

Aby uruchomić zapytanie, wpisz to polecenie:

sed -i "s/YOUR_PROJECT_ID/$PROJECT_ID/g" setup/continuous_query.sql

bq query \
  --use_legacy_sql=false \
  --continuous=true \
  --sync=false \
  --connection_property=service_account=adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com \
  "$(cat setup/continuous_query.sql)"

W terminalu powinny pojawić się dane wyjściowe wskazujące, że zapytanie ciągłe zostało uruchomione:

Successfully started query your-project-id:bqjob_r66189572226875ed_0000019d000000_1

8. Tworzenie subskrypcji push

Teraz, gdy agent jest wdrożony i zapytanie ciągłe jest uruchomione, utworzysz subskrypcję „Push”, aby aktywnie przekazywać wszystkie nowe wiadomości o anomaliach z tematu bezpośrednio na adres URL webhooka agenta.

Aby mieć pewność, że agent otrzyma dane w prawidłowym formacie, użyjemy transformacji pojedynczej wiadomości (SMT). Przekształcenia pojedynczych wiadomości umożliwiają wprowadzanie prostych zmian w danych i atrybutach wiadomości bezpośrednio w Pub/Sub w czasie rzeczywistym, zanim zostaną one dostarczone do subskrybenta.

Przekształcenie w naszym potoku wygląda tak:

  • Funkcja UDF: plik transform.yaml w katalogu setup zawiera funkcję zdefiniowaną przez użytkownika JavaScript (UDF), która będzie przetwarzać wiadomości.
  • Rozpakowywanie danych BigQuery: gdy BigQuery eksportuje dane do Pub/Sub za pomocą zapytania ciągłego, opakowuje ładunek JSON w obiekt zewnętrzny.
  • Formatowanie na potrzeby ADK: funkcja UDF usuwa podwójne kodowanie i przekształca ładunek w format oczekiwany przez interfejs API streamQuery Agent Engine.

Aby utworzyć subskrypcję z zastosowanym przekształceniem UDF, uruchom to polecenie:

gcloud pubsub subscriptions create cymbal-bank-escalations-sub \
  --topic=projects/$PROJECT_ID/topics/cymbal-bank-escalations-topic \
  --message-transforms-file=setup/transform.yaml \
  --push-endpoint="$AGENT_ENDPOINT" \
  --push-no-wrapper \
  --push-auth-service-account="adk-agent-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --ack-deadline=600

Powinny się wyświetlić dane wyjściowe potwierdzające utworzenie subskrypcji:

Created subscription [projects/your-project-id/subscriptions/cymbal-bank-escalations-sub].

9. Generowanie zdarzeń

Na koniec przetestuj cały proces, uruchamiając polecenie generate_events.py, aby przesłać syntetyczną transakcję „Impossible Travel” do tabeli cymbal_bank.retail_transactions:

python simulator/generate_events.py

Wykorzystuje ona dane profilu klienta, które zostały wcześniej wczytane (Karen Burton, której kraj zamieszkania to Stany Zjednoczone), i symuluje nową transakcję zakupu elektroniki o wartości 2500 USD, która ma miejsce w Australii (AUS).

Sprawdź, czy zdarzenie dotarło: poczekaj około 2 minut na okienkowanie zapytania ciągłego i przetwarzanie pakietu ADK, a potem sprawdź dzienniki wdrożonego agenta, aby potwierdzić, że przetworzył on wywołaną wiadomość Pub/Sub.

Logi usługi Agent Engine

10. Analizowanie wydajności agenta w BigQuery

Otwórz konsolę BigQuery i wybierz zbiór danych cymbal_bank. Wybierz tabelę agent_events i kliknij Podgląd:

Podgląd zdarzeń agenta BigQuery

Dane wyjściowe potwierdzają, że agent pomyślnie przeanalizował eskalację „Impossible Travel”.

Agenty autonomiczne działają nieprzerwanie w tle, dlatego obserwacja jest kluczowa. Agent automatycznie rejestruje ślady wykonania za pomocą wtyczki ADK i loguje decyzje za pomocą narzędzia niestandardowego.

Uruchom to zapytanie, aby połączyć decyzje agenta z danymi o opóźnieniu i wykorzystaniu tokenów zarejestrowanymi w tabeli agent_events:

-- Create session-level metrics from detailed agent events
SELECT 
  MAX(d.timestamp) AS decision_time,
  ANY_VALUE(d.user_id) AS user_id,
  ANY_VALUE(d.merchant) AS merchant,
  ANY_VALUE(d.decision) AS decision,
  ANY_VALUE(d.summary) AS summary,
  -- Calculate latency in seconds
  TIMESTAMP_DIFF(MAX(e.timestamp), MIN(e.timestamp), SECOND) AS execution_latency_sec,
  -- Aggregate total tokens from LLM calls
  SUM(CAST(JSON_EXTRACT_SCALAR(e.content, '$.usage.total') AS INT64)) AS total_tokens_used,
  -- Count total events logged to represent the agent's complex reasoning steps
  COUNT(e.session_id) AS agent_reasoning_steps,
  -- Count total tool calls
  COUNTIF(e.event_type = 'TOOL_COMPLETED') AS total_tool_count  
FROM 
  `cymbal_bank.agent_decisions` d
JOIN 
  `cymbal_bank.agent_events` e ON d.session_id = e.session_id
GROUP BY 
  d.session_id
ORDER BY 
  decision_time DESC

Powinna się wyświetlić wypełniona tabela wyników podobna do tej:

Wyniki analizy agenta BigQuery

Możliwości: ten CodeLab kończy się rejestrowaniem decyzji agenta w BigQuery na potrzeby wizualizacji. Skrypt generatora zdarzeń był stosunkowo prosty i wstawiał oszustwa tylko od jednego użytkownika. Pamiętaj jednak, że narzędzia agenta to po prostu funkcje Pythona. Oznacza to, że w miarę jak wersja demonstracyjna będzie obejmować więcej przypadków użycia lub scenariuszy, Twój agent będzie mógł wchodzić w interakcję z dowolnymi elementami.

W środowisku produkcyjnym możesz łatwo rozbudować tę architekturę. Zamiast tylko rejestrować dane, agent może wywołać webhooka, aby wysłać alert do kanału Slack lub Teams, wywołać incydent PagerDuty, zapisać ostateczny wynik w bazie danych o niskim opóźnieniu, takiej jak Cloud Spanner, lub opublikować nową wiadomość Pub/Sub w mikrousłudze podrzędnej, aby automatycznie zablokować przejętą kartę kredytową.

11. Czyszczenie danych

Aby uniknąć obciążenia konta Google Cloud bieżącymi opłatami, usuń zasoby utworzone podczas tego ćwiczenia.

Repozytorium ćwiczenia zawiera skrypt czyszczący, który automatycznie usuwa wdrożenie Pub/Sub, zbiór danych BigQuery, miejsce rezerwacji BigQuery, konfigurację Vertex Agent Engine, tymczasowy zasobnik Cloud Storage i konta usługi IAM.

Jeśli zapytanie ciągłe BigQuery nadal działa, zatrzymaj je w interfejsie BigQuery w konsoli Google Cloud. Następnie uruchom skrypt czyszczący:

chmod +x setup/cleanup_env.sh
./setup/cleanup_env.sh

Możesz też usunąć cały projekt, jeśli został utworzony wyłącznie na potrzeby tego laboratorium.

12. Gratulacje

Gratulacje! Utworzono potok agenta danych o zdarzeniach przy użyciu BigQuery, Pub/Sub i ADK.

Czego się dowiedziałeś(-aś)

  • Eksportowanie anomalii z ciągłego zapytania BigQuery do Pub/Sub
  • Jak kierować przekształcone wiadomości Pub/Sub do agenta ADK
  • Jak wdrożyć agenta w Vertex AI Agent Engine i wchodzić z nim w interakcje

Dokumentacja