Crea un orchestratore della supply chain con ADK, AlloyDB e Vertex AI Memory Bank

1. Panoramica

In questo codelab creerai un agente Supply Chain Orchestrator. Questa applicazione consente agli utenti di analizzare l'inventario, monitorare la logistica e gestire i rischi della catena di fornitura utilizzando il linguaggio naturale.

Sfrutteremo Agent Development Kit (ADK) di Google per creare un'architettura multi-agente che mantenga il contesto, ricordi le preferenze dell'utente tramite Vertex AI Memory Bank e interagisca con un set di dati di grandi dimensioni archiviato in AlloyDB tramite MCP Toolbox.

Cosa creerai

91e8e53556ac1966.jpeg

Un'applicazione Python Flask costituita da:

Agente orchestratore globale:l'agente principale che gestisce il flusso e la delega della conversazione.

Agenti specializzati:un "InventorySpecialist" e un "LogisticsManager" per attività specifiche del dominio.

Integrazione della memoria: memoria di sessione a breve termine e memoria a lungo termine utilizzando Vertex AI Memory Bank.

Interfaccia utente narrativa:un'interfaccia web che visualizza il processo di ragionamento dell'agente (contesto della traccia).

Obiettivi didattici

  • Come utilizzare Google ADK per creare agenti e subagenti specializzati.
  • Come integrare Vertex AI Memory Bank per la memoria a lungo termine dell'agente.
  • Come utilizzare MCP Toolbox per connettere gli agenti agli strumenti per i dati di AlloyDB.
  • Come implementare i callback ADK per tracciare e visualizzare il ragionamento dell'agente.
  • Come eseguire il deployment della soluzione utilizzando Cloud Run o eseguirla localmente.

L'architettura

Lo stack tecnologico

  1. AlloyDB per PostgreSQL:funge da database operativo ad alte prestazioni che contiene oltre 50.000 record della supply chain. Supporta la ricerca e il recupero vettoriale.
  2. MCP Toolbox for Databases:funge da "maestro dell'orchestrazione", esponendo i dati di AlloyDB come strumenti eseguibili che gli agenti possono chiamare.
  3. Agent Development Kit (ADK): il framework utilizzato per definire gli agenti, le istruzioni e gli strumenti.
  4. Vertex AI Memory Bank:fornisce una memoria a lungo termine, consentendo all'agente di ricordare le preferenze dell'utente e le interazioni passate nelle varie sessioni.
  5. Vertex AI Session Service: gestisce il contesto della conversazione a breve termine.

The Flow

  1. Query dell'utente:l'utente pone una domanda (ad es. "Controlla le scorte di gelato premium").
  2. Controllo della memoria:Orchestrator controlla la banca della memoria per trovare informazioni passate pertinenti (ad es. "L'utente è un responsabile regionale per l'EMEA").
  3. Delega:l'orchestratore delega l'attività a InventorySpecialist.
  4. Esecuzione dello strumento: lo specialista utilizza gli strumenti forniti da MCP Toolbox per eseguire query su AlloyDB.
  5. Risposta:l'agente elabora i dati e restituisce una tabella formattata in Markdown.
  6. Archiviazione della memoria:le interazioni significative vengono salvate nel Memory Bank.

Requisiti

  • Un browser, ad esempio Chrome o Firefox.
  • Un progetto Google Cloud con la fatturazione abilitata.
  • Conoscenza di base di SQL e Python.

2. Prima di iniziare

Crea un progetto

  1. Nella console Google Cloud, nella pagina di selezione del progetto, seleziona o crea un progetto Google Cloud.
  2. Verifica che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata per un progetto.
  1. Utilizzerai Cloud Shell, un ambiente a riga di comando in esecuzione in Google Cloud. Fai clic su Attiva Cloud Shell nella parte superiore della console Google Cloud.

Immagine del pulsante Attiva Cloud Shell

  1. Una volta eseguita la connessione a Cloud Shell, verifica di essere già autenticato e che il progetto sia impostato sul tuo ID progetto utilizzando il seguente comando:
gcloud auth list
  1. Esegui questo comando in Cloud Shell per verificare che il comando gcloud conosca il tuo progetto.
gcloud config list project
  1. Se il progetto non è impostato, utilizza il seguente comando per impostarlo:
gcloud config set project <YOUR_PROJECT_ID>
  1. Abilita le API richieste: segui il link e abilita le API.

In alternativa, puoi utilizzare il comando gcloud. Consulta la documentazione per i comandi e l'utilizzo di gcloud.

Aspetti da considerare e risoluzione dei problemi

La sindrome del "progetto fantasma"

Hai eseguito gcloud config set project, ma in realtà stai esaminando un progetto diverso nell'interfaccia utente della console. Controlla l'ID progetto nel menu a discesa in alto a sinistra.

La barriera di fatturazione

Hai attivato il progetto, ma hai dimenticato l'account di fatturazione. AlloyDB è un motore ad alte prestazioni; non si avvia se il "serbatoio" (fatturazione) è vuoto.

Ritardo di propagazione dell'API

Hai fatto clic su "Abilita API", ma la riga di comando indica ancora Service Not Enabled. Attendi 60 secondi. Il cloud ha bisogno di un attimo per attivare i suoi neuroni.

Quota Quags

Se utilizzi un account di prova nuovo di zecca, potresti raggiungere una quota regionale per le istanze AlloyDB. Se us-central1 non va a buon fine, prova us-east1.

Service Agent"Nascosto"

A volte all'agente di servizio AlloyDB non viene concesso automaticamente il ruolo aiplatform.user. Se in un secondo momento le query SQL non riescono a comunicare con Gemini, questo è in genere il problema.

3. Configurazione del database

Al centro della nostra applicazione si trova AlloyDB per PostgreSQL. Abbiamo sfruttato le sue potenti funzionalità vettoriali e il motore colonnare integrato per generare incorporamenti per oltre 50.000 record SCM. Ciò consente l'analisi vettoriale quasi in tempo reale, consentendo ai nostri agenti di identificare anomalie di inventario o rischi logistici in set di dati di grandi dimensioni in pochi millisecondi.

In questo lab utilizzeremo AlloyDB come database per i dati di test. Utilizza i cluster per contenere tutte le risorse, come database e log. Ogni cluster ha un'istanza primaria che fornisce un punto di accesso ai dati. Le tabelle conterranno i dati effettivi.

Creiamo un cluster, un'istanza e una tabella AlloyDB in cui verrà caricato il set di dati di test.

  1. Fai clic sul pulsante o copia il link riportato di seguito nel browser in cui hai eseguito l'accesso all'utente della console Google Cloud.

In alternativa, puoi andare al terminale Cloud Shell dal progetto in cui hai riscattato l'account di fatturazione, clonare il repository GitHub e passare al progetto utilizzando i comandi riportati di seguito:

git clone https://github.com/AbiramiSukumaran/easy-alloydb-setup

cd easy-alloydb-setup
  1. Una volta completato questo passaggio, il repository verrà clonato nell'editor Cloud Shell locale e potrai eseguire il comando riportato di seguito dalla cartella del progetto (è importante assicurarsi di trovarsi nella directory del progetto):
sh run.sh
  1. Ora utilizza la UI (facendo clic sul link nel terminale o sul link "Anteprima sul web" nel terminale).
  2. Inserisci i tuoi dati per l'ID progetto, il cluster e i nomi delle istanze per iniziare.
  3. Prendi un caffè mentre scorrono i log e leggi qui come funziona dietro le quinte.

Aspetti da considerare e risoluzione dei problemi

Il problema della "pazienza"

I cluster di database sono un'infrastruttura pesante. Se aggiorni la pagina o termini la sessione Cloud Shell perché "sembra bloccata", potresti ritrovarti con un'istanza "fantasma" di cui è stato eseguito il provisioning parziale e impossibile da eliminare senza un intervento manuale.

Regione non corrispondente

Se hai abilitato le API in us-central1, ma provi a eseguire il provisioning del cluster in asia-south1, potresti riscontrare problemi di quota o ritardi nell'autorizzazione del service account. Attieniti a una regione per l'intero lab.

Cluster di zombie

Se in precedenza hai utilizzato lo stesso nome per un cluster e non lo hai eliminato, lo script potrebbe indicare che il nome del cluster esiste già. I nomi dei cluster devono essere univoci all'interno di un progetto.

Timeout di Cloud Shell

Se la pausa caffè dura 30 minuti, Cloud Shell potrebbe entrare in modalità di sospensione e disconnettere il processo sh run.sh. Mantieni la scheda attiva.

4. Provisioning dello schema

Una volta che il cluster e l'istanza AlloyDB sono in esecuzione, vai all'editor SQL di AlloyDB Studio per attivare le estensioni AI e eseguire il provisioning dello schema.

1e3ac974b18a8113.png

Potrebbe essere necessario attendere il completamento della creazione dell'istanza. Una volta creato, accedi ad AlloyDB utilizzando le credenziali che hai creato durante la creazione del cluster. Utilizza i seguenti dati per l'autenticazione a PostgreSQL:

  • Nome utente : "postgres"
  • Database : "postgres"
  • Password : "alloydb" (o qualsiasi altra password impostata al momento della creazione)

Una volta eseguita l'autenticazione in AlloyDB Studio, i comandi SQL vengono inseriti nell'editor. Puoi aggiungere più finestre dell'editor utilizzando il segno più a destra dell'ultima finestra.

28cb9a8b6aa0789f.png

Inserirai i comandi per AlloyDB nelle finestre dell'editor, utilizzando le opzioni Esegui, Formatta e Cancella in base alle esigenze.

Attivare le estensioni

Per creare questa app, utilizzeremo le estensioni pgvector e google_ml_integration. L'estensione pgvector consente di archiviare ed eseguire ricerche di vector embedding. L'estensione google_ml_integration fornisce funzioni che utilizzi per accedere agli endpoint di previsione Vertex AI per ottenere previsioni in SQL. Attiva queste estensioni eseguendo i seguenti DDL:

CREATE EXTENSION IF NOT EXISTS google_ml_integration CASCADE;
CREATE EXTENSION IF NOT EXISTS vector;

Creare una tabella

Puoi creare una tabella utilizzando l'istruzione DDL riportata di seguito in AlloyDB Studio:

DROP TABLE IF EXISTS shipments;
DROP TABLE IF EXISTS products;

-- 1. Product Inventory Table

CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
category VARCHAR(100),
stock_level INTEGER,
distribution_center VARCHAR(100),
region VARCHAR(50),
embedding vector(768),
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 2. Logistics & Shipments
CREATE TABLE shipments (
shipment_id SERIAL PRIMARY KEY,
product_id INTEGER REFERENCES products(id),
status VARCHAR(50), -- 'In Transit', 'Delayed', 'Delivered', 'Pending'
estimated_arrival TIMESTAMP,
route_efficiency_score DECIMAL(3, 2)
);

La colonna embedding consentirà di memorizzare i valori del vettore di alcuni campi di testo.

Importazione dati

Esegui il seguente insieme di istruzioni SQL per inserire in blocco 50.000 record nella tabella dei prodotti:

-- We use a CROSS JOIN pattern with realistic naming segments to create meaningful variety
DO $$
DECLARE
brand_names TEXT[] := ARRAY['Artisan', 'Nature', 'Elite', 'Pure', 'Global', 'Eco', 'Velocity', 'Heritage', 'Aura', 'Summit'];
product_types TEXT[] := ARRAY['Ice Cream', 'Body Wash', 'Laundry Detergent', 'Shampoo', 'Mayonnaise', 'Deodorant', 'Tea', 'Soup', 'Face Cream', 'Soap'];
variants TEXT[] := ARRAY['Classic', 'Gold', 'Premium', 'Eco-Friendly', 'Organic', 'Night-Repair', 'Extra-Fresh', 'Zero-Sugar', 'Sensitive', 'Maximum-Strength'];
regions TEXT[] := ARRAY['EMEA', 'APAC', 'LATAM', 'NAMER'];
dcs TEXT[] := ARRAY['London-Hub', 'Mumbai-Central', 'Sao-Paulo-Logistics', 'Singapore-Port', 'Rotterdam-Gate', 'New-York-DC'];
BEGIN
INSERT INTO products (name, category, stock_level, distribution_center, region)
SELECT
b || ' ' || v || ' ' || t as name,
CASE
WHEN t IN ('Ice Cream', 'Mayonnaise', 'Tea', 'Soup') THEN 'Food & Refreshment'
WHEN t IN ('Body Wash', 'Shampoo', 'Deodorant', 'Face Cream', 'Soap') THEN 'Personal Care'
ELSE 'Home Care'
END as category,
floor(random() * 20000 + 100)::int as stock_level,
dcs[floor(random() * 6 + 1)] as distribution_center,
regions[floor(random() * 4 + 1)] as region
FROM
unnest(brand_names) b,
unnest(variants) v,
unnest(product_types) t,
generate_series(1, 50); -- 10 * 10 * 10 * 50 = 50,000 records
END $$;

Inseriamo record specifici della demo per garantire risposte prevedibili per le domande in stile executive

-- These ensure you have predictable answers for specific "Executive" questions
INSERT INTO products (name, category, stock_level, distribution_center, region) VALUES
('Magnum Ultra Gold Limited Edition', 'Food & Refreshment', 45, 'Rotterdam-Gate', 'EMEA'),
('Dove Pro-Health Deep Moisture', 'Personal Care', 12000, 'Mumbai-Central', 'APAC'),
('Hellmanns Real Organic Mayonnaise', 'Food & Refreshment', 8000, 'London-Hub', 'EMEA');

Inserimento dei dati delle spedizioni

-- Shipments Generation (More shipments than products)
INSERT INTO shipments (product_id, status, estimated_arrival, route_efficiency_score)
SELECT
id,
CASE
WHEN random() > 0.8 THEN 'Delayed'
WHEN random() > 0.4 THEN 'In Transit'
ELSE 'Delivered'
END,
NOW() + (random() * 10 || ' days')::interval,
(random() * 0.5 + 0.5)::decimal(3,2)
FROM products
WHERE random() > 0.3; -- Create shipments for ~70% of products


-- Add duplicate shipments for some products to show complex logistics
INSERT INTO shipments (product_id, status, estimated_arrival, route_efficiency_score)
SELECT id, 'In Transit', NOW() + INTERVAL '12 days', 0.88
FROM products
LIMIT 5000;

Concedi autorizzazione

Esegui l'istruzione riportata di seguito per concedere l'esecuzione della funzione "embedding":

GRANT EXECUTE ON FUNCTION embedding TO postgres;

Concedi il ruolo Utente Vertex AI al service account AlloyDB

Dalla console Google Cloud IAM, concedi al service account AlloyDB (simile a service-<<PROJECT_NUMBER>>@gcp-sa-alloydb.iam.gserviceaccount.com) l'accesso al ruolo "Utente Vertex AI". PROJECT_NUMBER conterrà il numero del tuo progetto.

In alternativa, puoi eseguire il comando riportato di seguito dal terminale Cloud Shell:

PROJECT_ID=$(gcloud config get-value project)


gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:service-$(gcloud projects describe $PROJECT_ID --format="value(projectNumber)")@gcp-sa-alloydb.iam.gserviceaccount.com" \
--role="roles/aiplatform.user"

Genera incorporamenti

Ora generiamo gli incorporamenti vettoriali per campi di testo significativi specifici:

WITH
 rows_to_update AS (
 SELECT
   id
 FROM
   products
 WHERE
   embedding IS NULL
 LIMIT
   5000 )
UPDATE
 products
SET
 embedding = ai.embedding('text-embedding-005', name || ' ' || category || ' ' || distribution_center || ' ' || region)::vector
FROM
 rows_to_update
WHERE
 products.id = rows_to_update.id
 AND embedding IS null;

In questa istruzione precedente abbiamo impostato il limite su 5000, quindi assicurati di eseguirla ripetutamente finché non ci sono righe nella tabella con l'incorporamento della colonna come NULL.

Aspetti da considerare e risoluzione dei problemi

Il ciclo "Amnesia della password"

Se hai utilizzato la configurazione "One Click" e non ricordi la password, vai alla pagina delle informazioni di base dell'istanza nella console e fai clic su "Modifica" per reimpostare la password postgres.

Errore "Estensione non trovata"

Se CREATE EXTENSION non va a buon fine, spesso è perché l'istanza è ancora nello stato "Manutenzione" o "Aggiornamento" dal provisioning iniziale. Controlla se il passaggio di creazione dell'istanza è completato e attendi qualche secondo, se necessario.

Il divario di propagazione IAM

Hai eseguito il comando IAM gcloud, ma il comando SQL CALL continua a non riuscire a causa di un errore di autorizzazione. La propagazione delle modifiche IAM può richiedere un po' di tempo tramite l'infrastruttura di Google. Fai un respiro.

Mancata corrispondenza delle dimensioni del vettore

La tabella items è impostata su VECTOR(768). Se in un secondo momento provi a utilizzare un modello diverso (ad esempio un modello a 1536 dimensioni), gli inserti verranno espansi. Attieniti a text-embedding-005.

Errore di battitura nell'ID progetto

Nella chiamata create_model, se lasci le parentesi « » o digiti in modo errato l'ID progetto, la registrazione del modello sembrerà riuscita, ma non andrà a buon fine durante la prima query effettiva. Controlla la stringa.

5. Configurazione di strumenti e toolbox

MCP Toolbox for Databases è un server MCP open source per i database. Ti consente di sviluppare strumenti in modo più semplice, rapido e sicuro gestendo le complessità come il raggruppamento delle connessioni, l'autenticazione e altro. Toolbox ti aiuta a creare strumenti di AI generativa che consentono agli agenti di accedere ai dati del tuo database.

Utilizziamo Model Context Protocol (MCP) Toolbox for Databases come "direttore d'orchestra". Funge da middleware standardizzato tra i nostri agenti e AlloyDB. Definendo una configurazione tools.yaml, la toolbox espone automaticamente operazioni di database complesse come strumenti puliti ed eseguibili come search_products_by_context o check_inventory_levels. In questo modo si elimina la necessità di raggruppamento manuale delle connessioni o di SQL boilerplate nella logica dell'agente.

Installazione del server Toolbox

Dal terminale Cloud Shell, crea una cartella per salvare il nuovo file YAML degli strumenti e il file binario della toolbox:

mkdir scm-agent-toolbox

cd scm-agent-toolbox

Dall'interno della nuova cartella, esegui il seguente insieme di comandi:

# see releases page for other versions
export VERSION=0.27.0
curl -L -o toolbox https://storage.googleapis.com/genai-toolbox/v$VERSION/linux/amd64/toolbox
chmod +x toolbox

Poi crea il file tools.yaml all'interno della nuova cartella navigando nell'editor di Cloud Shell e copia i contenuti di questo file del repository nel file tools.yaml.

sources:
    supply_chain_db:
        kind: "alloydb-postgres"
        project: "YOUR_PROJECT_ID"
        region: "us-central1"
        cluster: "YOUR_CLUSTER"
        instance: "YOUR_INSTANCE"
        database: "postgres"
        user: "postgres"
        password: "YOUR_PASSWORD"

tools:
  search_products_by_context:
    kind: postgres-sql
    source: supply_chain_db
    description: Find products in the inventory using natural language search and vector embeddings.
    parameters:
      - name: search_text
        type: string
        description: Description of the product or category the user is looking for.
    statement: |
     SELECT name, category, stock_level, distribution_center, region
      FROM products
      ORDER BY embedding <=> ai.embedding('text-embedding-005', $1)::vector
      LIMIT 5;

  check_inventory_levels:
    kind: postgres-sql
    source: supply_chain_db
    description: Get precise stock levels for a specific product name.
    parameters:
      - name: product_name
        type: string
        description: The exact or partial name of the product.
    statement: |
     SELECT name, stock_level, distribution_center, last_updated
      FROM products
      WHERE name ILIKE '%' || $1 || '%'
      ORDER BY stock_level DESC;

  track_shipment_status:
    kind: postgres-sql
    source: supply_chain_db
    description: Retrieve real-time logistics and shipping status for a specific region or product.
    parameters:
      - name: region
        type: string
        description: The geographical region to filter shipments (e.g., EMEA, APAC).
    statement: |
     SELECT p.name, s.status, s.estimated_arrival, s.route_efficiency_score
      FROM shipments s
      JOIN products p ON s.product_id = p.id
      WHERE p.region = $1
      ORDER BY s.estimated_arrival ASC;

  analyze_supply_chain_risk:
    kind: postgres-sql
    source: supply_chain_db
    description: Rerank and filter shipments based on risk profiles and efficiency scores using Google ML reranker.
    parameters:
      - name: risk_context
        type: string
        description: The business context for risk analysis (e.g., 'heatwave impact' or 'port strike').
    statement: |
     WITH initial_ranking AS (
      SELECT s.shipment_id, p.name, s.status, p.distribution_center,
      ROW_NUMBER() OVER () AS ref_number
      FROM shipments s
      JOIN products p ON s.product_id = p.id
      WHERE s.status != 'Delivered'
      LIMIT 10
      ),
      reranked_results AS (
      SELECT index, score FROM
      ai.rank(
      model_id => 'semantic-ranker-default-003',
      search_string => $1,
      documents => (SELECT ARRAY_AGG(name || ' at ' || distribution_center ORDER BY ref_number) FROM initial_ranking)
      )
      )
      SELECT i.name, i.status, i.distribution_center, r.score
      FROM initial_ranking i, reranked_results r
      WHERE i.ref_number = r.index
      ORDER BY r.score DESC;

toolsets:
   supply_chain_toolset:
     - search_products_by_context
     - check_inventory_levels
     - track_shipment_status
     - analyze_supply_chain_risk

Ora testa il file tools.yaml nel server locale:

./toolbox --tools-file "tools.yaml"

In alternativa, puoi provarlo nell'interfaccia utente.

./toolbox --ui

Perfetto! Quando avrai verificato che tutto funziona, esegui il deployment in Cloud Run nel seguente modo.

Deployment di Cloud Run

  1. Imposta la variabile di ambiente PROJECT_ID:
export PROJECT_ID="my-project-id"
  1. Inizializza gcloud CLI:
gcloud init
gcloud config set project $PROJECT_ID
  1. Devi aver abilitato le seguenti API:
gcloud services enable run.googleapis.com \
                       cloudbuild.googleapis.com \
                       artifactregistry.googleapis.com \
                       iam.googleapis.com \
                       secretmanager.googleapis.com
  1. Crea un account di servizio backend se non ne hai già uno:
gcloud iam service-accounts create toolbox-identity
  1. Concedi le autorizzazioni per utilizzare Secret Manager:
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member serviceAccount:toolbox-identity@$PROJECT_ID.iam.gserviceaccount.com \
    --role roles/secretmanager.secretAccessor
  1. Concedi al service account autorizzazioni aggiuntive specifiche per la nostra origine AlloyDB (ruoli roles/alloydb.client e roles/serviceusage.serviceUsageConsumer)
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member serviceAccount:toolbox-identity@$PROJECT_ID.iam.gserviceaccount.com \
    --role roles/alloydb.client


gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member serviceAccount:toolbox-identity@$PROJECT_ID.iam.gserviceaccount.com \
    --role serviceusage.serviceUsageConsumer
  1. Carica tools.yaml come secret:
gcloud secrets create tools-scm-agent --data-file=tools.yaml
  1. Se hai già un secret e vuoi aggiornare la versione del secret, esegui il comando riportato di seguito:
gcloud secrets versions add tools-scm-agent --data-file=tools.yaml
  1. Imposta una variabile di ambiente sull'immagine container che vuoi utilizzare per Cloud Run:
export IMAGE=us-central1-docker.pkg.dev/database-toolbox/toolbox/toolbox:latest
  1. Esegui il deployment di Toolbox su Cloud Run utilizzando questo comando:

Se hai attivato l'accesso pubblico nella tua istanza AlloyDB (non consigliato), segui il comando riportato di seguito per il deployment su Cloud Run:

gcloud run deploy toolbox-scm-agent \
    --image $IMAGE \
    --service-account toolbox-identity \
    --region us-central1 \
    --set-secrets "/app/tools.yaml=tools-scm-agent:latest" \
    --args="--tools-file=/app/tools.yaml","--address=0.0.0.0","--port=8080" \
    --allow-unauthenticated

Se utilizzi una rete VPC, utilizza il comando riportato di seguito:

gcloud run deploy toolbox-scm-agent \
    --image $IMAGE \
    --service-account toolbox-identity \
    --region us-central1 \
    --set-secrets "/app/tools.yaml=tools-scm-agent:latest" \
    --args="--tools-file=/app/tools.yaml","--address=0.0.0.0","--port=8080" \
    # TODO(dev): update the following to match your VPC details
    --network <<YOUR_NETWORK_NAME>> \
    --subnet <<YOUR_SUBNET_NAME>> \
    --allow-unauthenticated

6. Configurazione dell'agente

Utilizzando l'Agent Development Kit (ADK), abbiamo abbandonato i prompt monolitici a favore di un'architettura multi-agente specializzata:

  • InventorySpecialist: si concentra sulle metriche relative alle scorte di prodotti e al magazzino.
  • LogisticsManager: esperto di rotte di spedizione globali e analisi dei rischi.
  • GlobalOrchestrator: il "cervello" che utilizza il ragionamento per delegare le attività e sintetizzare i risultati.

Clona questo repository nel tuo progetto e analizziamolo.

Per clonare questo progetto, esegui questo comando dal terminale Cloud Shell (nella directory principale o da dove vuoi creare il progetto):

git clone https://github.com/AbiramiSukumaran/scm-memory-agent
  1. In questo modo dovrebbe essere creato il progetto, che puoi verificare nell'editor di Cloud Shell.

53a398aff6ba7d5b.png

  1. Assicurati di aggiornare il file .env con i valori del progetto e dell'istanza.

Procedura dettagliata del codice

Una rapida occhiata all'agente di orchestrazione

    Go to app.py and you should be able to see the following snippet:
orchestrator = adk.Agent(
    name="GlobalOrchestrator",
    model="gemini-2.5-flash",
    description="Global Supply Chain Orchestrator root agent.",
    instruction="""
    You are the Global Supply Chain Brain. You are responsible for products, inventory and logistics.
    You also have access to the memory tool, remember to include all the information that the tool can provide you with about the user before you respond.
    1. Understand intent and delegate to specialists. As the Global Orchestrator, you have access to the full conversation history with the user.
    When you transfer a query to a specialist agent, sub agent or tool, share the important facts and information from your memory to them so they can operate with the full context. 
    2. Ensure the final response is professional and uses Markdown tables for data.
    3. If a specialist provides a long list, ensure only the top 10 items are shown initially.
    4. Conclude with a brief, high-level executive summary of what the data implies.
    """,
    tools=[adk.tools.preload_memory_tool.PreloadMemoryTool()],
    sub_agents=[inventory_agent, logistics_agent],
    
    #after_agent_callback=auto_save_session_to_memory_callback,
)

Questo snippet è la definizione della radice, ovvero l'agente orchestratore che riceve la conversazione o la richiesta dall'utente e la indirizza all'agente secondario o all'utente corrispondente gli strumenti corrispondenti in base all'attività.

  1. Diamo un'occhiata all'agente di inventario
inventory_agent = adk.Agent(
    name="InventorySpecialist",
    model="gemini-2.5-flash",
    description="Specialist in product stock and warehouse data.",
    instruction="""
    Analyze inventory levels.
    1. Use 'search_products_by_context' or 'check_inventory_levels'.
    2. ALWAYS format results as a clean Markdown table.
    3. If there are many results, display only the TOP 10 most relevant ones.
    4. At the end, state: 'There are additional records available. Would you like to see more?'
    """,
    tools=tools
)

Questo subagente in particolare è specializzato in attività di inventario come la ricerca contestuale di prodotti e il controllo dei livelli di inventario.

  1. Poi c'è il subagente logistico:
logistics_agent = adk.Agent(
    name="LogisticsManager",
    model="gemini-2.5-flash",
    description="Expert in global shipping routes and logistics tracking.",
    instruction="""
    Check shipment statuses.
    1. Use 'track_shipment_status' or 'analyze_supply_chain_risk'.
    2. ALWAYS format results as a clean Markdown table.
    3. Limit initial output to the top 10 shipments.
    4. Ask if the user needs the full manifest if more results exist.
    """,
    tools=tools
)

Questo subagente in particolare è specializzato in attività logistiche come il monitoraggio delle spedizioni e l'analisi dei rischi nella catena di fornitura.

  1. Tutti e tre gli agenti di cui abbiamo parlato finora utilizzano strumenti e gli strumenti vengono referenziati tramite il nostro server Toolbox, che abbiamo già implementato nella sezione precedente. Fai riferimento allo snippet di seguito:
from toolbox_core import ToolboxSyncClient

TOOLBOX_SERVER = os.environ["TOOLBOX_SERVER"]
TOOLBOX_TOOLSET = os.environ["TOOLBOX_TOOLSET"]

# --- ADK TOOLBOX CONFIGURATION ---
toolbox = ToolboxSyncClient(TOOLBOX_SERVER)
tools = toolbox.load_toolset(TOOLBOX_TOOLSET)

Questo subagente in particolare è specializzato in attività logistiche come il monitoraggio delle spedizioni e l'analisi dei rischi nella catena di fornitura.

7. Motore agente

Nella prima esecuzione, crea Agent Engine

import vertexai

GOOGLE_CLOUD_PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"]
GOOGLE_CLOUD_LOCATION = os.environ["GOOGLE_CLOUD_LOCATION"]

client = vertexai.Client(
  project=GOOGLE_CLOUD_PROJECT,
  location=GOOGLE_CLOUD_LOCATION
)

agent_engine = client.agent_engines.create()
  1. Per l'esecuzione successiva, aggiorna Agent Engine con la configurazione di Memory Bank:
agent_engine = client.agent_engines.update(
    name=APP_NAME,
    config={
        "context_spec": {
            "memory_bank_config": {
                "generation_config": {
                    "model": f"projects/{PROJECT_ID}/locations/{GOOGLE_CLOUD_LOCATION}/publishers/google/models/gemini-2.5-flash"
                }
            }
        }
    })

8. Contesto, esecuzione e memoria

La gestione del contesto è suddivisa in due livelli distinti per garantire che l'agente si senta un partner continuo anziché un bot stateless:

Memoria a breve termine (sessioni): gestita tramite VertexAiSessionService, tiene traccia della cronologia immediata degli eventi (messaggi utente, risposte dello strumento) all'interno di una singola interazione.

Memoria a lungo termine (banca della memoria): basata sulla banca della memoria di Vertex AI tramite adk.memorybankservice. Questo livello estrae informazioni "significative", come la preferenza di un utente per corrieri specifici o ritardi ricorrenti del magazzino, e le mantiene nelle varie sessioni.

Inizializza la sessione per la memoria della sessione nell'ambito della conversazione

Questa parte dello snippet crea la sessione per l'app corrente per l'utente corrente.

from google.adk.sessions import VertexAiSessionService

...

session_service = VertexAiSessionService(
    project=PROJECT_ID,
    location=GOOGLE_CLOUD_LOCATION,
)

...

# Initialize the session *outside* of the route handler to avoid repeated creation
session = None
session_lock = threading.Lock()

async def initialize_session():
    global session
    try:
        session = await session_service.create_session(app_name=APP_NAME, user_id=USER_ID)
        print(f"Session {session.id} created successfully.")  # Add a log
    except Exception as e:
        print(f"Error creating session: {e}")
        session = None  # Ensure session is None in case of error

# Create the session on app startup
asyncio.run(initialize_session())

Inizializza Vertex AI Memory Bank per la memoria a lungo termine

Questa parte dello snippet crea un'istanza dell'oggetto del servizio Vertex AI Memory Bank per l'agente.

from google.adk.memory import InMemoryMemoryService
from google.adk.memory import VertexAiMemoryBankService

...

try:
    memory_bank_service = adk.memory.VertexAiMemoryBankService(
        agent_engine_id=AGENT_ENGINE_ID,
        project=PROJECT_ID,
        location=GOOGLE_CLOUD_LOCATION,
    )
    #in_memory_service = InMemoryMemoryService()
    print("Memory Bank Service initialized successfully.")
except Exception as e:
    print(f"Error initializing Memory Bank Service: {e}")
    memory_bank_service = None

runner = adk.Runner(
    agent=orchestrator,
    app_name=APP_NAME,
    session_service=session_service,
    memory_service=memory_bank_service,
)

...

Che cosa viene configurato?

In questa parte dello snippet stiamo configurando il servizio Vertex AI Memory Bank per la memoria a lungo termine, che memorizza in modo contestuale la sessione per l'app specifica per l'utente specifico come memoria all'interno di Vertex AI Memory Bank.

Che cosa viene eseguito nell'ambito dell'esecuzione dell'agente?

   async def run_and_collect():
        final_text = ""
        try:
            async for event in runner.run_async(
                new_message=content,
                user_id=user_id,
                session_id=session_id
            ):
                if hasattr(event, 'author') and event.author:
                    if not any(log['agent'] == event.author for log in execution_logs):
                        execution_logs.append({
                            "agent": event.author,
                            "action": "Analyzing data requirements...",
                            "type": "orchestration_event"
                        })
                if hasattr(event, 'text') and event.text:
                    final_text = event.text
                elif hasattr(event, 'content') and hasattr(event.content, 'parts'):
                    for part in event.content.parts:
                        if hasattr(part, 'text') and part.text:
                            final_text = part.text
        except Exception as e:
            print(f"Error during runner.run_async: {e}")
            raise  # Re-raise the exception to signal failure
        finally:
            gc.collect()
            return final_text

Elabora i contenuti inseriti dall'utente nell'oggetto new_message con l'ID utente e l'ID sessione nell'ambito. A questo punto, l'agente prende il controllo, la sua risposta viene elaborata e restituita.

Che cosa viene memorizzato nella memoria a lungo termine?

Il dettaglio della sessione nell'ambito dell'app e dell'utente viene estratto nella variabile di sessione.

Questa sessione viene quindi aggiunta come memoria per l'utente corrente per l'app corrente dell'oggetto Vertex AI Memory Bank utilizzando il metodo "add_session_to_memory".

session = asyncio.run(session_service.get_session(app_name=APP_NAME, user_id=USER_ID, session_id=session.id))

if memory_bank_service and session:  # Check memory service AND session
                try:
                    #asyncio.run(in_memory_service.add_session_to_memory(session))
                    asyncio.run(memory_bank_service.add_session_to_memory(session))
                    '''
                    client.agent_engines.memories.generate(
                        scope={"app_name": APP_NAME, "user_id": USER_ID},
                        name=APP_NAME,
                        direct_contents_source={
                            "events": [
                                {"content": content}
                            ]
                        },
                        config={"wait_for_completion": True},
                    )   
                    '''

                    print("Successfully added session to memory.******")
                    print(session.id)

                except Exception as e:
                    print(f"Error adding session to memory: {e}")

Recupero della memoria

Per poterla trasmettere come parte del contesto all'orchestratore e ad altri agenti, se applicabile, dobbiamo recuperare la memoria a lungo termine memorizzata utilizzando il nome dell'app e il nome utente come ambito (poiché è l'ambito per cui abbiamo memorizzato i ricordi).

    results = client.agent_engines.memories.retrieve(
    name=APP_NAME,
    scope={"app_name": APP_NAME, "user_id": USER_ID}
    )
    # RetrieveMemories returns a pager. You can use `list` to retrieve all pages' memories.
    list(results)
    print(list(results))

In che modo il ricordo recuperato viene caricato come parte del contesto?

Utilizziamo il seguente attributo nella definizione dell'agente Orchestrator che consente all'agente principale di precaricare il contesto dalla banca di memoria. Oltre agli strumenti a cui accediamo dal server della cassetta degli attrezzi per i subagenti.

tools=[adk.tools.preload_memory_tool.PreloadMemoryTool()],

Contesto del callback

In una supply chain aziendale, non puoi avere una "scatola nera". Utilizziamo CallbackContext di ADK per creare un Narrative Engine. Agganciandoci all'esecuzione dell'agente, acquisisce ogni processo di pensiero e chiamata di strumenti, trasmettendoli a una barra laterale della UI.

  • Evento di traccia: "GlobalOrchestrator is analyzing data requirements..." (GlobalOrchestrator sta analizzando i requisiti dei dati…)
  • Trace Event: "Delegating to InventorySpecialist for stock levels..." (Delega a InventorySpecialist per i livelli di stock...)
  • Trace Event: "Recupero dei pattern storici di ritardo dei fornitori da Memory Bank…"

Questo audit trail è prezioso per il debug e garantisce che gli operatori umani possano fidarsi delle decisioni autonome dell'agente.

from google.adk.agents.callback_context import CallbackContext

...

# --- ADK CALLBACKS (Narrative Engine) ---
execution_logs = []

async def trace_callback(context: CallbackContext):
    """
    Captures agent and tool invocation flow for the UI narrative.
    """
    agent_name = context.agent.name
    event = {
        "agent": agent_name,
        "action": "Processing request steps...",
        "type": "orchestration_event"
    }
    execution_logs.append(event)
    return None

...

È tutto!!! Abbiamo clonato correttamente il progetto e esaminato i dettagli dell'agente, della memoria e del contesto.

Puoi testarlo accedendo alla cartella del progetto del repository clonato ed eseguendo i seguenti comandi:

>> pip install -r requirements.txt

>> python app.py

In questo modo l'agente dovrebbe avviarsi localmente e dovresti essere in grado di testarlo.

9. Eseguiamo il deployment in Cloud Run

  1. Esegui il deployment su Cloud Run eseguendo questo comando dal terminale Cloud Shell in cui il progetto è clonato e assicurati di trovarti nella cartella principale del progetto.

Esegui questo comando nel terminale Cloud Shell:

gcloud run deploy supply-chain-agent --source . --platform managed   --region us-central1 --allow-unauthenticated --set-env-vars GOOGLE_CLOUD_PROJECT=<<YOUR_PROJECT>>,GOOGLE_CLOUD_LOCATION=us-central1,GOOGLE_GENAI_USE_VERTEXAI=TRUE,REASONING_ENGINE_APP_NAME=<<YOUR_APP_ENGINE_URL>>,TOOLBOX_SERVER=<<YOUR_TOOLBOX_SERVER>>,TOOLBOX_TOOLSET=supply_chain_toolset,AGENT_ENGINE_ID=<<YOUR_AGENT_ENGINE_ID>>

Sostituisci i valori dei segnaposto <<YOUR_PROJECT>>, <<YOUR_APP_ENGINE_URL>>, <<YOUR_TOOLBOX_SERVER>> e <<YOUR_AGENT_ENGINE_ID>>

Al termine del comando, verrà visualizzato un URL del servizio. Copialo.

  1. Concedi il ruolo Client AlloyDB al service account Cloud Run.In questo modo, la tua applicazione serverless può eseguire il tunneling in modo sicuro nel database.

Esegui questo comando nel terminale Cloud Shell:

# 1. Get your Project ID and Project Number
PROJECT_ID=$(gcloud config get-value project)
PROJECT_NUMBER=$(gcloud projects describe $PROJECT_ID --format="value(projectNumber)")

# 2. Grant the AlloyDB Client role
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role="roles/alloydb.client"

Ora utilizza l'URL del servizio (l'endpoint Cloud Run che hai copiato in precedenza) e testa l'app.

Nota:se riscontri un problema con il servizio e viene indicata la memoria come motivo, prova ad aumentare il limite di memoria allocata a 1 GiB per eseguire il test.

3e4d36ed99b39325.png

d6b337f79a1f1d82.png

5e781a193a4aa903.png

10. Esegui la pulizia

Una volta completato questo lab, non dimenticare di eliminare il cluster e l'istanza AlloyDB.

Il cluster e le relative istanze verranno puliti.

11. Complimenti

Combinando la velocità di AlloyDB, l'efficienza di orchestrazione di MCP Toolbox e la"memoria istituzionale" di Vertex AI Memory Bank, abbiamo creato un sistema di supply chain in continua evoluzione. Non si limita a rispondere alle domande, ma ricorda che il tuo magazzino di Singapore ha sempre problemi di ritardi legati ai monsoni e suggerisce in modo proattivo di riassegnare le spedizioni prima ancora che tu lo chieda.