Skalowanie agentów za pomocą CrewAI, LangGraph, A2A i ADK

1. Wprowadzenie

Z tego ćwiczenia dowiesz się, jak utworzyć system orkiestracji wieloagentowej za pomocą CrewAI, LangGraph, protokołu A2AADK (pakietu Agent Development Kit). Utworzysz system, w którym pokój kontrolny ADK deleguje planowanie do narzędzia LangGraph Planner, które wysyła zadania do zespołu wykonawczego CrewAI – wszystko to jest połączone za pomocą protokołu A2A, aby obsługiwać scenariusz uzupełniania zapasów w sklepie detalicznym.

Czym jest orkiestracja wielu agentów?

W systemie z wieloma agentami współpracuje ze sobą wielu wyspecjalizowanych agentów AI, aby wykonywać zadania, które byłyby zbyt złożone dla jednego agenta. Zamiast jednego monolitycznego agenta, który robi wszystko, dzielisz problem na role – planisty i wykonawcy – z których każda ma własne narzędzia i wiedzę.

Odzwierciedla to sposób działania organizacji ludzkich: menedżer deleguje strategię na analityków, a realizację na specjalistów. Korzyści:

  • Rozdzielenie zadań: każdy agent skupia się na tym, co robi najlepiej.
  • Elastyczność platformy: używaj najlepszej platformy do każdej roli (LangGraph do logiki planowania, CrewAI do wykonywania narzędzi).
  • Skalowalność: dodawanie wyspecjalizowanych agentów bez zmiany całego systemu.

Scenariusz

Gdy użytkownik wyśle prośbę o uzupełnienie zapasów, np. „Uzupełnij zapasy 1 telefonu Pixel 7 w biurze w Tokio”, system:

  1. LangGraph Planner analizuje żądanie i wyodrębnia produkt oraz jego ilość.
  2. Planer deleguje wykonanie do zespołu wykonawczego CrewAI.
  3. Agent Sourcing Specialist przeszukuje katalog produktów za pomocą narzędzi.
  4. Agent Procurement Officer sprawdza budżet i składa zamówienie za pomocą narzędzi.
  5. Wynik jest przesyłany z powrotem do planera, który generuje raport końcowy.
User Request
    
    
┌──────────────────────┐
 ADK Control Room        Top-level orchestrator, re-plans on failure
 (BaseAgent)          
└──────────┬───────────┘
            A2A (JSON-RPC)
           
┌──────────────────────┐
 LangGraph Planner       Analyzes intent, delegates, reports
 (State Machine)      
└──────────┬───────────┘
           
           
┌──────────────────────┐
 CrewAI Execution Crew   Runs agents with tools
  ├─ Sourcing Agent      search_products
  └─ Procurement Agent   check_budget, create_purchase_order
└──────────────────────┘

Stos technologiczny

Warstwa

Technologia

Rola

Planowanie

LangGraph

Automat stanowy, który analizuje intencje, kieruje żądania i generuje raporty

Wykonanie

CrewAI

Agenty oparte na rolach, które wywołują narzędzia sekwencyjnie

LLM

Gemini w Vertex AI

Umożliwia rozumowanie agenta i wybór narzędzi

Komunikacja między agentami

Protokół A2A

Most JSON-RPC 2.0, dzięki któremu agenci z różnych platform mogą się komunikować.

Aranżer najwyższego poziomu

ADK (BaseAgent)

Otrzymuje żądania, przekazuje je za pomocą interfejsu A2A i w razie niepowodzenia ponownie planuje.

Zobacz w działaniu: jeśli to możliwe, wypróbuj pełną wersję systemu produkcyjnego na stronie https://scale-control-room-761793285222.us-central1.run.app. Zawiera ona rozszerzenia, które tu utworzysz, takie jak panel w czasie rzeczywistym, protokół A2A i zabezpieczenia IAM.

Jakie zadania wykonasz

  • Określ niestandardowe narzędzia, których będą używać agenci.
  • Twórz wyspecjalizowanych agentów za pomocą CrewAI.
  • Utwórz planer maszyny stanowej za pomocą LangGraph.
  • Koordynuj przepływ informacji między planistą a ekipą wykonawczą.
  • Umieść planer na serwerze protokołu A2A, aby umożliwić komunikację między platformami.
  • Utwórz Pokój reżyserski ADK najwyższego poziomu, który deleguje zadania za pomocą protokołu A2A i w razie niepowodzenia ponownie planuje działania.

Czego potrzebujesz

  • przeglądarka, np. Chrome;
  • projekt Google Cloud z włączonymi płatnościami;

To ćwiczenie jest przeznaczone dla średnio zaawansowanych deweloperów, którzy znają Pythona i podstawowe pojęcia związane z LLM.

Szacowany czas trwania: 35 minut.

Szacowany koszt: zasoby utworzone w tym module powinny kosztować mniej niż 1 USD.

2. Zanim zaczniesz

Tworzenie projektu Google Cloud

  1. W konsoli Google Cloud na stronie selektora projektu wybierz lub utwórz projekt w chmurze Google.
  2. Sprawdź, czy w projekcie Cloud włączone są płatności. Dowiedz się, jak sprawdzić, czy w projekcie są włączone płatności.

Uruchamianie Cloud Shell

Cloud Shell to środowisko wiersza poleceń działające w Google Cloud, które zawiera niezbędne narzędzia.

  1. Kliknij Aktywuj Cloud Shell u góry konsoli Google Cloud.
  2. Po połączeniu z Cloud Shell sprawdź uwierzytelnianie:
    gcloud auth list
    
  3. Sprawdź, czy projekt jest skonfigurowany:
    gcloud config get project
    
  4. Jeśli projekt nie jest ustawiony zgodnie z oczekiwaniami, ustaw go:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Włącz interfejsy API

Aby włączyć interfejs Vertex AI API, uruchom to polecenie:

gcloud services enable aiplatform.googleapis.com

Uwaga: Cloud Shell automatycznie uwierzytelnia się na Twoim koncie Google Cloud. Jeśli wykonujesz to ćwiczenie poza Cloud Shell, musisz uruchomić gcloud auth application-default login, aby uwierzytelnić się w Vertex AI.

Konfigurowanie środowiska

W Cloud Shell utwórz nowy katalog dla swojego projektu i przejdź do niego:

mkdir scale-agents
cd scale-agents

Zainstaluj uv i użyj go do zainstalowania wymaganych pakietów:

curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow

Ustaw zmienne środowiskowe dla Vertex AI:

export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE

3. Określanie narzędzi i agentów

W systemie z wieloma agentami potrzebne są narzędzia do interakcji ze światem oraz określone role, które wskazują, co należy robić.

Utwórz plik o nazwie scale_agents.py i dodaj do niego ten kod. Spowoduje to skonfigurowanie importów, narzędzi symulacyjnych i agentów CrewAI.

import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict

# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"

# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"

# Initialize the LLM to use Vertex AI
llm = LLM(
    model="vertex_ai/gemini-2.5-flash",
    temperature=0.0,
    max_tokens=4096,
)

# --- Step 1: Define Tools ---

@tool("search_products")
def search_products(query: str) -> list:
    """Search for products in the catalog."""
    # Mock product catalog
    products = [
        {"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
        {"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
    ]
    return [p for p in products if query.lower() in p["name"].lower()]

@tool("check_budget")
def check_budget(amount: float) -> dict:
    """Check if a purchase amount is within the budget."""
    limit = 100.0
    if amount <= limit:
        return {"approved": True, "remaining": limit - amount}
    return {"approved": False, "reason": f"Exceeds budget of ${limit}"}

@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
    """Create a purchase order for a product."""
    return {
        "status": "SUCCESS",
        "po_id": f"PO-{product_id}-{quantity}",
        "message": f"Successfully ordered {quantity} units of {product_id}."
    }

# --- Step 2: Define Agents ---

sourcing_agent = Agent(
    role="Sourcing Specialist",
    goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
    backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
    tools=[search_products],
    llm=llm,
    verbose=True,
    allow_delegation=False,
    memory=False,
    reasoning=False,
)

procurement_agent = Agent(
    role="Procurement Officer",
    goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
    backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
    tools=[check_budget, create_purchase_order],
    llm=llm,
    verbose=True,
    allow_delegation=False,
    memory=False,
    reasoning=False,
)

Kluczowe pojęcia

  • @tooldekorator: CrewAI używa go do przekształcania zwykłych funkcji Pythona w narzędzia, które modele LLM mogą rozumieć i wywoływać. Wskazówki dotyczące typu funkcji i ciąg dokumentacyjny są używane do generowania schematu narzędzia, który może zrozumieć model LLM.
  • Rola, cel i historia: określają personę agenta i kierują rozumowaniem modelu LLM. Kontekst to nie tylko tekst dodatkowy – „Zawsze przeszukuj katalog” zachęca agenta do korzystania z narzędzi zamiast wymyślania odpowiedzi.
  • reasoning=False: wyłącza rozszerzone rozumowanie, dzięki czemu agent wykonuje standardową pętlę wywoływania narzędzi zamiast próbować odpowiadać bezpośrednio.
  • allow_delegation=False: sprawia, że każdy agent skupia się na przypisanych mu narzędziach, zamiast przekazywać pracę innym agentom.

Dlaczego 2 agenty zamiast 1? Każdy agent ma inne narzędzia i wykonuje inne zadania. Specjalista ds. źródeł wyszukuje tylko produkty, a pracownik ds. zamówień zajmuje się tylko budżetami i zamówieniami. To rozdzielenie obowiązków oznacza, że każdy agent ma skoncentrowany prompt i mały, odpowiedni zestaw narzędzi, co prowadzi do bardziej niezawodnego działania LLM niż w przypadku jednego agenta, który musi sobie radzić ze wszystkim.

4. Określanie zadań i ekipy

Teraz zdefiniujmy, co mają robić te agenty, tworząc zadania i łącząc je w zespół.

Na końcu tego samego pliku scale_agents.py dodaj ten kod:

# --- Step 3: Define Tasks & Crew ---

sourcing_task = Task(
    description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
    expected_output="The product_id and price of the best matching product from the search_products tool.",
    agent=sourcing_agent
)

procurement_task = Task(
    description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
    expected_output="The purchase order details returned by the create_purchase_order tool.",
    agent=procurement_agent,
    context=[sourcing_task] # This task depends on the output of sourcing_task
)

def run_crew(item_description: str, quantity: int):
    crew = Crew(
        agents=[sourcing_agent, procurement_agent],
        tasks=[sourcing_task, procurement_task],
        process=Process.sequential, # Run tasks in order
        verbose=True,
        memory=False,
        planning=False,
    )
    
    result = crew.kickoff(inputs={
        "item_description": item_description,
        "quantity": quantity
    })
    return result

Kluczowe pojęcia

  • Task Context: context=[sourcing_task] informuje CrewAI, że zadanie związane z zamówieniami wymaga wyniku zadania związanego z pozyskiwaniem, aby można było je wykonać. Przed podjęciem decyzji o zamówieniu pracownik ds. zamówień może sprawdzić, co znalazł specjalista ds. źródeł.
  • Process.sequential: zadania są wykonywane w kolejności, w jakiej są wymienione. Jest to ważne, ponieważ zadanie zakupu zależy od wyników zadania pozyskiwania – nie możesz złożyć zamówienia, zanim nie dowiesz się, który produkt kupić.
  • memory=False / planning=False: wyłącza wbudowane funkcje pamięci i planowania CrewAI, aby w tej wersji demonstracyjnej zapewnić prostotę i przewidywalność działania.

5. Tworzenie Planera LangGraph

Zespół wykonawczy zajmuje się „jak” – wyszukiwaniem produktów, sprawdzaniem budżetów i składaniem zamówień. Ale kto decyduje o tym, „co” ma być wyświetlane? To agent planujący, który został utworzony za pomocą LangGraph.

LangGraph modeluje przepływy pracy jako automat stanowy – graf węzłów (funkcji) połączonych krawędziami (przejściami). Stan przepływa przez wykres, a każdy węzeł odczytuje i zapisuje dane w stanie udostępnionym. Jest to naturalne rozwiązanie w przypadku przepływów pracy związanych z planowaniem, w których potrzebujesz jasnego, deterministycznego przepływu sterowania: analizy prośby, delegowania zadań do zespołu i generowania raportu.

Na końcu tego samego pliku scale_agents.py dodaj ten kod:

# --- Step 4: Define LangGraph Planner ---

class PlanState(TypedDict):
    objective: str
    item_description: Optional[str]
    quantity_needed: Optional[int]
    execution_result: Optional[str]
    final_report: Optional[str]

def analyze_alert(state: PlanState) -> PlanState:
    """Node 1: Extract intent from the raw objective string."""
    print("--- ANALYZING ALERT ---")
    # In a production app, you would use an LLM here to extract details.
    # For simplicity, we simulate extraction here.
    objective = state["objective"]
    
    # Hardcoded extraction for the demo
    if "Pixel 7" in objective:
        return {
            "item_description": "Pixel 7",
            "quantity_needed": 1,
        }
    return {
        "item_description": "unknown",
        "quantity_needed": 0,
    }

def delegate_to_executor(state: PlanState) -> PlanState:
    """Node 2: Call the CrewAI Execution Crew."""
    print("--- DELEGATING TO CREW ---")
    if state["item_description"] == "unknown":
        return {"execution_result": "Failed: Unknown item"}
        
    result = run_crew(
        item_description=state["item_description"],
        quantity=state["quantity_needed"]
    )
    return {"execution_result": str(result)}

def generate_report(state: PlanState) -> PlanState:
    """Node 3: Synthesize the final outcome."""
    print("--- GENERATING REPORT ---")
    return {
        "final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
    }

# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)

workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)

app = workflow.compile()

# --- Main Execution ---
if __name__ == "__main__":
    print("Starting Multi-Agent System...")
    
    initial_state = {
        "objective": "Restock 1 Pixel 7 phones for the Tokyo office"
    }
    
    final_state = app.invoke(initial_state)
    
    print("\n=== FINAL REPORT ===")
    print(final_state["final_report"])

Kluczowe pojęcia

  • StateGraph: definiuje automat stanów. PlanState to stan z określonym typem, który jest gromadzony w miarę przetwarzania żądania przez poszczególne węzły.
  • Węzły: funkcje, które przyjmują bieżący stan i zwracają jego aktualizacje. Każdy węzeł ma jedno zadanie: analyze_alert wyodrębnia intencję, delegate_to_executor uruchamia zespół, generate_report podsumowuje wynik.
  • Krawędzie: określają przepływ między węzłami. W tym ćwiczeniu używamy prostego przepływu liniowego (analyze → delegate → report). W pełnych warsztatach rozszerzamy go o routing warunkowy – na przykład kierowanie żądań destrukcyjnych do ścieżki bezpieczeństwa zamiast do wykonawcy.

Dlaczego warto używać LangGraph do planowania? CrewAI świetnie sprawdza się w przypadku agentów wywołujących narzędzia, ale planer potrzebuje deterministycznego przepływu sterowania – „jeśli działanie jest destrukcyjne, przejdź do ścieżki bezpieczeństwa; w przeciwnym razie przekaż zadanie”. Model automatu stanowego LangGraph sprawia, że routing jest jawny i można go testować, a CrewAI obsługuje swobodne wykonywanie narzędzi poniżej.

6. Uruchomienie narzędzia Planowanie i ekipa

Przetestujmy teraz razem planistę LangGraph i zespół CrewAI.

W terminalu Cloud Shell uruchom skrypt:

uv run python scale_agents.py

Powinny się wyświetlić dane wyjściowe wskazujące wykonywane czynności:

  1. Analizowanie alertu: uruchamia się węzeł LangGraph.
  2. Delegowanie do ekipy: węzeł LangGraph wywołuje ekipę CrewAI.
  3. Wykonanie CrewAI: zobaczysz, jak specjalista ds. pozyskiwania wyszukuje produkt, a pracownik ds. zamówień sprawdza budżet i tworzy zamówienie.
  4. Raport końcowy: podsumowanie wyników zostanie wydrukowane na końcu.

Przykładowe dane wyjściowe (skrócone):

Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---

  Agent: Sourcing Specialist
  Tool: search_products  Args: {'query': 'Pixel 7'}
  Tool Completed  Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]

  Agent: Procurement Officer
  Tool: check_budget  Args: {'amount': 50}
  Tool: create_purchase_order  Args: {'product_id': 'pixel-7', 'quantity': 1}
  Tool Completed  Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}

--- GENERATING REPORT ---

=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...

Uwaga: w danych wyjściowych mogą pojawić się komunikaty [CrewAIEventsBus] Warning: Event pairing mismatch. Są to ostrzeżenia kosmetyczne pochodzące z wewnętrznego śledzenia zdarzeń CrewAI, które można bezpiecznie zignorować.

Uwaga: CrewAI może wyświetlić komunikat o wyłączonym śledzeniu. To informacja, którą można bezpiecznie zignorować.

Uwaga: w symulowanym systemie OMS obowiązuje limit budżetu w wysokości 100 USD. Aby ścieżka podstawowa zakończyła się sukcesem, utrzymuj małe ilości (poniżej ok. 2 sztuk). Na przykład 1 telefon Pixel 7 za 50 zł przejdzie weryfikację budżetu, ale 3 urządzenia za 150 zł zostaną odrzucone jako „Przekraczające budżet”.

7. Umieszczanie Planera w serwerze A2A

Planer LangGraph działa, ale jest uwięziony w procesie Pythona. Aby można było wywoływać go przez innych agentów (potencjalnie napisanych w różnych frameworkach lub działających na różnych maszynach), umieszczamy go na serwerze A2A (Agent-to-Agent).

A2A to protokół oparty na JSON-RPC 2.0, który standaryzuje sposób komunikacji agentów. Kluczowe pojęcia:

Pomysł

Cel

Karta agenta

Metadane JSON opisujące możliwości agenta (udostępniane pod adresem /.well-known/agent-card.json).

message/send

Metoda JSON-RPC do wysyłania zadania do agenta

Zadanie

Jednostka pracy ze stanem (przesłano → w trakcie → ukończono/niepowodzenie)

Artefakty

Wyniki pośrednie i końcowe dołączone do zadania

Utwórz nowy plik a2a_planner.py:

import asyncio
import os
import uvicorn

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
                       InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError

# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app


class PlannerAgentExecutor(AgentExecutor):
    """Wraps the LangGraph planner as an A2A service."""

    SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]

    async def execute(self, context: RequestContext, event_queue: EventQueue):
        objective = context.get_user_input()

        # Initialize A2A task tracking
        task = context.current_task or new_task(context.message)
        await event_queue.enqueue_event(task)
        updater = TaskUpdater(event_queue, task.id, task.context_id)

        try:
            # Run the LangGraph planner synchronously in a thread
            initial_state = {"objective": objective}
            result = await asyncio.to_thread(planner_app.invoke, initial_state)
            final_report = result.get("final_report", "No report generated.")
        except Exception as e:
            final_report = f"Execution failed: {e}"

        # Send the result back as an artifact
        await updater.add_artifact(
            [Part(root=TextPart(text=final_report))],
            name="orchestration_report"
        )
        await updater.complete()

    async def cancel(self, context, event_queue):
        raise ServerError(error=InternalError(message="Cancel not supported"))


# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
    name="Retail-Planner-A2A",
    description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
    url=f"http://localhost:{port}/",
    version="1.0.0",
    default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
    default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
    capabilities=AgentCapabilities(streaming=False),
    skills=[
        AgentSkill(
            id="plan_logistics",
            name="Plan Logistics",
            description="Analyzes inventory alerts and orchestrates procurement.",
            tags=["logistics", "planning"],
            examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
        )
    ],
)

if __name__ == "__main__":
    executor = PlannerAgentExecutor()
    handler = DefaultRequestHandler(
        agent_executor=executor, task_store=InMemoryTaskStore()
    )
    server = A2AStarletteApplication(
        agent_card=agent_card, http_handler=handler
    )
    print(f"Starting A2A Planner Server on port {port}...")
    uvicorn.run(server.build(), host="0.0.0.0", port=port)

Kluczowe pojęcia

  • Karta agenta: wyświetlana pod adresem /.well-known/agent-card.json – każdy agent może dowiedzieć się, co robi ten serwer, pobierając ten adres URL. Zawiera listę umiejętności agenta, obsługiwanych typów treści i możliwości.
  • AgentExecutor.execute() Jedyna metoda, którą wdrażasz. Odbiera przychodzące żądanie, uruchamia logikę agenta (w tym przypadku planera LangGraph) i odsyła wyniki w postaci artefaktów.
  • TaskUpdater: zarządza cyklem życia zadania – add_artifact() wysyła dane wyjściowe pośrednie lub końcowe, complete() oznacza zadanie jako ukończone. Biblioteka A2A obsługuje wszystkie elementy RPC JSON.

Przetestuj serwer A2A, uruchamiając go w terminalu:

uv run python a2a_planner.py

Otwórz kolejną kartę Cloud Shell (kliknij + obok bieżącej karty) i sprawdź, czy karta agenta jest wyświetlana:

cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool

Powinien pojawić się kod JSON karty agenta. W pierwszym terminalu pozostaw uruchomiony serwer A2A, aby przejść do następnego kroku.

8. Tworzenie Pokoju reżyserskiego ADK

Na szczycie stosu znajduje się pokój reżyserski, który został utworzony za pomocą ADK (pakietu Agent Development Kit od Google). Otrzymuje żądanie użytkownika, przekazuje je do planisty za pomocą A2A, ocenia wynik i – co najważniejsze – w przypadku niepowodzenia ponownie planuje (CUJ 2).

ADK udostępnia elementy podstawowe agenta, takie jak BaseAgent, LlmAgentInMemoryRunner. Klasę BaseAgent rozszerzamy, aby pisać niestandardową logikę administracji – wywołania A2A, klasyfikację raportów i dynamiczne ponowne planowanie za pomocą subagenta LlmAgent.

Utwórz nowy plik control_room.py:

import asyncio
import uuid
import os
import httpx

from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator

A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")


def _classify_report(report: str) -> tuple[bool, bool]:
    """Return (is_success, should_retry) for a planner report."""
    normalized = (report or "").replace("*", "").strip().lower()

    success_markers = [
        "status: success", "'status': 'success'",
        "outcome: success", "po_id", "successfully ordered",
    ]
    retryable_markers = ["not found", "discontinued", "no inventory",
                         "unknown item"]
    terminal_markers = [
        "status: failed", "over budget", "not issued",
        "procurement failed",
    ]

    if any(m in normalized for m in terminal_markers):
        return False, False      # Failed, don't retry
    if any(m in normalized for m in retryable_markers):
        return False, True       # Failed, but retryable
    if any(m in normalized for m in success_markers):
        return True, False       # Success!
    return False, False          # Unknown → treat as failure


class ControlRoomAgent(BaseAgent):
    """Top-level orchestrator: delegates via A2A, re-plans on failure."""

    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        # Extract user input from session events
        user_msg = ""
        if ctx.session and ctx.session.events:
            for evt in reversed(ctx.session.events):
                if evt.content and evt.content.role == "user":
                    user_msg = evt.content.parts[0].text
                    break

        max_attempts = 2
        current_objective = user_msg
        final_report = "No report returned."

        for attempt in range(1, max_attempts + 1):
            print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
            print(f"    Objective: {current_objective}")

            # Build the A2A JSON-RPC request
            payload = {
                "jsonrpc": "2.0",
                "id": f"req-{attempt}",
                "method": "message/send",
                "params": {
                    "message": {
                        "message_id": str(uuid.uuid4()),
                        "parts": [{"text": current_objective}],
                        "role": "user"
                    }
                }
            }

            try:
                async with httpx.AsyncClient(timeout=300.0) as client:
                    resp = await client.post(
                        f"{A2A_SERVER_URL}/", json=payload
                    )
                    data = resp.json()
                    if "error" in data:
                        final_report = data["error"].get("message", "Unknown A2A error")
                    elif "result" in data:
                        artifacts = data["result"].get("artifacts", [])
                        if artifacts and "parts" in artifacts[-1]:
                            parts = artifacts[-1]["parts"]
                            if parts and "text" in parts[0]:
                                final_report = parts[0]["text"]
            except Exception as e:
                final_report = f"Connection error: {e}"

            print(f"\n--- Report ---\n{final_report}\n")
            is_success, should_retry = _classify_report(final_report)

            if is_success:
                yield Event(
                    author=self.name,
                    invocation_id=ctx.invocation_id,
                    content=types.Content(
                        role="model",
                        parts=[types.Part.from_text(text=final_report)]
                    ),
                )
                return

            # --- Re-planning (CUJ 2) ---
            if should_retry and attempt < max_attempts:
                print("--- Re-planning with LLM ---")
                replanner = LlmAgent(
                    name=f"replanner_{attempt}",
                    model="gemini-2.5-flash",
                    instruction=(
                        "You are a strategic re-planner. The previous request "
                        "failed. Rewrite the objective to be broader or more "
                        "likely to succeed. Output ONLY the new objective text."
                    ),
                )

                # Run the re-planner as a sub-agent
                child_ctx = InvocationContext(
                    invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
                    agent=replanner,
                    session=ctx.session,
                    session_service=ctx.session_service,
                    run_config=ctx.run_config or RunConfig(),
                )
                child_ctx.user_content = types.Content(
                    role="user",
                    parts=[types.Part.from_text(text=(
                        f"Original Objective: {current_objective}\n"
                        f"Failure Reason: {final_report}\n"
                        "Please broaden the search."
                    ))]
                )

                new_objective = ""
                async for event in replanner.run_async(child_ctx):
                    if event.content and event.content.parts:
                        for part in event.content.parts:
                            if hasattr(part, "text") and part.text:
                                new_objective += part.text

                current_objective = new_objective.strip()
                print(f"New objective: {current_objective}")
                continue

            # Terminal failure (not retryable)
            if not should_retry:
                yield Event(
                    author=self.name,
                    invocation_id=ctx.invocation_id,
                    content=types.Content(
                        role="model",
                        parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
                    ),
                )
                return

        # Max attempts exhausted
        yield Event(
            author=self.name,
            invocation_id=ctx.invocation_id,
            content=types.Content(
                role="model",
                parts=[types.Part.from_text(
                    text=f"FAILED after {max_attempts} attempts: {final_report}"
                )]
            ),
        )


async def main():
    prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
    print(f"Starting Control Room with: {prompt}\n")

    agent = ControlRoomAgent(name="control_room")
    runner = InMemoryRunner(app_name="control_room", agent=agent)
    session = await runner.session_service.create_session(
        app_name="control_room", user_id="admin"
    )
    content = types.Content(
        role="user", parts=[types.Part.from_text(text=prompt)]
    )

    async for event in runner.run_async(
        user_id="admin", session_id=session.id, new_message=content
    ):
        if event.content and event.content.parts:
            for part in event.content.parts:
                if hasattr(part, "text") and part.text:
                    print(f"Result: {part.text}")

    print("\n=== CONTROL ROOM COMPLETE ===")


if __name__ == "__main__":
    asyncio.run(main())

Kluczowe pojęcia

  • BaseAgent: element ADK dla agentów niestandardowych. Tworzysz podklasę i zastępujesz funkcję _run_async_impl, aby napisać dowolną logikę orkiestracji asynchronicznej – w tym przypadku pętlę wywołania A2A + klasyfikacja + ponowne planowanie.
  • Wywołanie A2A JSON-RPC: Control Room wysyła standardowe żądanie message/send do serwera A2A planisty za pomocą httpx i analizuje odpowiedź JSON-RPC, aby wyodrębnić raport końcowy.
  • _classify_report(): prosta klasyfikacja oparta na słowach kluczowych, która na podstawie tekstu raportu określa powodzenie, błąd z możliwością ponowienia lub błąd końcowy. To uruchamia pętlę ponownego planowania.
  • Wywołanie subagenta: aby ponownie zaplanować działanie, pokój kontrolny tworzy LlmAgent i uruchamia go, tworząc podrzędny InvocationContext i wywołując replanner.run_async(child_ctx). Umożliwia to dynamiczne uruchamianie agentów LLM w ramach niestandardowej logiki orkiestracji.
  • InMemoryRunner: uruchamia agenta lokalnie z pamięcią sesji. W środowisku produkcyjnym użyjesz adk deploy, aby wdrożyć w Vertex AI Agent Engine.

9. Uruchomienie pełnego stosu

Przetestujmy teraz kompletny 3-warstwowy system: Pokój reżyserski ADK → A2A → Planer LangGraph → zespół CrewAI.

Użyj drugiej karty Cloud Shell, którą otwarto wcześniej (lub kliknij +, aby otworzyć nową), i uruchom Control Room. Ważne: każda karta Cloud Shell ma własną sesję powłoki. Musisz ponownie ustawić zmienne projektu i środowiska:

cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py

Powinien się wyświetlić pełny przepływ orkiestracji:

  1. Pokój reżyserski odbiera żądanie i wysyła message/send wywołanie JSON-RPC do serwera A2A.
  2. Serwer A2A odbiera żądanie i wywołuje planer LangGraph.
  3. Planer LangGraph analizuje żądanie i przekazuje je do zespołu CrewAI.
  4. Zespół CrewAI zarządza agentami ds. pozyskiwania i zamówień.
  5. Wynik jest przesyłany z powrotem do Pokoju reżyserskiego.

Najważniejsze ścieżki użytkownika

Spróbuj zmodyfikować ciąg promptcontrol_room.py, aby przetestować te scenariusze:

główna ścieżka użytkownika

Prompt

Co się dzieje

1. Happy Path

Restock 1 Pixel 7 phones for the Tokyo office

Wyszukiwanie -> sprawdzenie budżetu -> zamówienie (SUKCES). Działa kompleksowo.

2. Ponowne planowanie

Order 1 unit of the discontinued XR-7000 Quantum Holographic Display

Planer zwraca komunikat „Nie udało się: nieznany element”. Centrum sterowania wykrywa to i wywołuje LlmAgent ponownie planer, aby rozszerzyć wyszukiwanie. Obie próby się nie powiodą (zakodowany na stałe planer rozpoznaje tylko „Pixel 7”), ale zobaczysz w działaniu pełny mechanizm ponownego planowania.

Aby przetestować CUJ 2 (ponowne planowanie), zmień wartość promptcontrol_room.py na:

prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"

Zaprogramowany planer nie rozpozna tego elementu i zwróci komunikat „Nie udało się: nieznany element”. Centrum sterowania wykryje błąd, dynamicznie utworzy LlmAgent ponownie planujący i spróbuje ponownie z szerszym celem. Ponieważ planer rozpoznaje tylko „Pixel 7”, ponowna próba też się nie powiedzie, ale zobaczysz cały proces ponownego planowania. Wynik końcowy to FAILED after 2 attempts: ....

10. Czyszczenie danych

Aby uniknąć obciążenia konta Google Cloud bieżącymi opłatami, możesz usunąć zasoby utworzone podczas tego ćwiczenia. Możesz po prostu usunąć utworzony katalog:

cd ..
rm -rf scale-agents

11. Gratulacje

Gratulacje! Udało Ci się utworzyć system orkiestracji wieloagentowej za pomocą CrewAI, LangGraph, protokołu A2A i ADK.

Czego się dowiedziałeś(-aś)

  • Jak definiować narzędzia dla agentów za pomocą dekoratora @tool w CrewAI.
  • Jak tworzyć wyspecjalizowanych agentów z różnymi rolami, narzędziami i celami.
  • Jak połączyć agentów w sekwencyjną grupę z zależnościami między zadaniami.
  • Jak zbudować planer oparty na automacie stanowym za pomocą LangGraph, który deleguje zadania do zespołu.
  • Jak udostępnić planer jako usługę A2A za pomocą AgentCardAgentExecutor.
  • Jak utworzyć niestandardowy pakiet ADK BaseAgent, który deleguje zadania za pomocą protokołu A2A i w przypadku niepowodzenia ponownie planuje działanie, wywołując subagenta LlmAgent.
  • Dlaczego rozdzielenie planowania, realizacji i orkiestracji w ramach różnych struktur zapewnia modułowość i odporność.

Dalsze działania

Pełne warsztaty rozszerzają ten system o:

  • Panel w czasie rzeczywistym – strumieniowanie SSE do wizualizacji postępów wielu agentów.
  • Identity Shield – zabezpieczenia oparte na IAM, które blokują destrukcyjne działania na poziomie infrastruktury, a nie promptu.
  • Vertex AI Agent Engine – wdrażaj agenta pakietu ADK w zarządzanej infrastrukturze w chmurze za pomocą adk deploy

Dokumentacja