CrewAI, LangGraph, A2A, ADK를 사용한 에이전트 확장

1. 소개

이 Codelab에서는 CrewAI, LangGraph, A2A 프로토콜, ADK (에이전트 개발 키트)를 사용하여 멀티 에이전트 오케스트레이션 시스템을 빌드하는 방법을 알아봅니다. ADK 관제실이 LangGraph 플래너에 계획을 위임하고, LangGraph 플래너가 CrewAI 실행팀에 작업을 디스패치하는 시스템을 만듭니다. 이 모든 것은 A2A를 통해 연결되어 소매업체 재고 보충 시나리오를 처리합니다.

멀티 에이전트 조정이란 무엇인가요?

멀티 에이전트 시스템에서는 여러 전문 AI 에이전트가 협력하여 단일 에이전트가 처리하기에는 너무 복잡한 작업을 수행합니다. 모든 작업을 수행하는 하나의 모놀리식 에이전트 대신 각자 고유한 도구와 전문 지식을 갖춘 계획자와 실행자라는 역할로 문제를 분해합니다.

이는 인간 조직의 작동 방식과 유사합니다. 관리자는 분석가에게 전략을 위임하고 전문가에게 실행을 위임합니다. 장점은 다음과 같습니다.

  • 관심사 분리: 각 에이전트가 가장 잘하는 일에 집중
  • 프레임워크 유연성: 각 역할에 가장 적합한 프레임워크 사용 (계획 로직에는 LangGraph, 도구 실행에는 CrewAI)
  • 확장성: 전체 시스템을 변경하지 않고 전문 상담사 추가

시나리오

사용자가 "도쿄 사무실에 Pixel 7 휴대전화 1대를 재입고해 줘"와 같은 재입고 요청을 보내면 시스템은 다음 작업을 실행합니다.

  1. LangGraph Planner는 요청을 분석하고 항목과 수량을 추출합니다.
  2. 플래너가 CrewAI 실행 크루에 실행을 위임합니다.
  3. 소싱 전문가 상담사가 도구를 사용하여 제품 카탈로그를 검색합니다.
  4. 조달 담당자 에이전트가 예산을 검증하고 도구를 사용하여 구매 주문을 합니다.
  5. 결과는 다시 계획자에게 전달되어 최종 보고서를 생성합니다.
User Request
    
    
┌──────────────────────┐
 ADK Control Room        Top-level orchestrator, re-plans on failure
 (BaseAgent)          
└──────────┬───────────┘
            A2A (JSON-RPC)
           
┌──────────────────────┐
 LangGraph Planner       Analyzes intent, delegates, reports
 (State Machine)      
└──────────┬───────────┘
           
           
┌──────────────────────┐
 CrewAI Execution Crew   Runs agents with tools
  ├─ Sourcing Agent      search_products
  └─ Procurement Agent   check_budget, create_purchase_order
└──────────────────────┘

기술 스택

레이어

기술

역할

계획

LangGraph

의도를 분석하고, 요청을 라우팅하고, 보고서를 생성하는 상태 머신

실행

CrewAI

도구를 순차적으로 호출하는 역할 기반 에이전트

LLM

Vertex AI의 Gemini

에이전트 추론 및 도구 선택 지원

상담사 간 커뮤니케이션

A2A 프로토콜

서로 다른 프레임워크의 에이전트가 통신할 수 있도록 JSON-RPC 2.0 브리지

최상위 조정자

ADK (BaseAgent)

요청을 수신하고, A2A를 통해 위임하고, 실패 시 다시 계획

실제 작동 확인: 가능한 경우 https://scale-control-room-761793285222.us-central1.run.app에서 전체 프로덕션 시스템을 사용해 보세요. 여기에서 빌드하는 항목을 실시간 대시보드, A2A 프로토콜, IAM 보안으로 확장할 수 있습니다.

실습할 내용

  • 에이전트가 사용할 맞춤 도구를 정의합니다.
  • CrewAI로 전문 에이전트를 빌드합니다.
  • LangGraph를 사용하여 상태 머신 플래너를 만듭니다.
  • 계획자와 실행팀 간의 흐름을 오케스트레이션합니다.
  • 프레임워크 간 통신을 위해 A2A 프로토콜 서버에 플래너를 래핑합니다.
  • A2A를 통해 위임하고 실패 시 다시 계획하는 최상위 ADK Control Room을 빌드합니다.

필요한 항목

  • 웹브라우저(예: Chrome)
  • 결제가 사용 설정된 Google Cloud 프로젝트

이 Codelab은 Python과 기본 LLM 개념에 익숙한 중급 개발자를 대상으로 합니다.

예상 소요 시간: 35분

비용 추정치: 이 Codelab에서 생성된 리소스의 비용은 1달러 미만입니다.

2. 시작하기 전에

Google Cloud 프로젝트 만들기

  1. Google Cloud 콘솔의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.
  2. Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

Cloud Shell 시작

Cloud Shell은 Google Cloud에서 실행되는 명령줄 환경으로, 필요한 도구가 미리 로드되어 제공됩니다.

  1. Google Cloud 콘솔 상단에서 Cloud Shell 활성화를 클릭합니다.
  2. Cloud Shell에 연결되면 인증을 확인합니다.
    gcloud auth list
    
  3. 프로젝트가 구성되었는지 확인합니다.
    gcloud config get project
    
  4. 프로젝트가 예상대로 설정되지 않은 경우 설정합니다.
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

API 사용 설정

다음 명령어를 실행하여 Vertex AI API를 사용 설정합니다.

gcloud services enable aiplatform.googleapis.com

참고: Cloud Shell은 Google Cloud 계정으로 자동 인증됩니다. Cloud Shell 외부에서 이 Codelab을 실행하는 경우 gcloud auth application-default login을 실행하여 Vertex AI로 인증해야 합니다.

환경 설정

Cloud Shell에서 프로젝트의 새 디렉터리를 만들고 해당 디렉터리로 이동합니다.

mkdir scale-agents
cd scale-agents

uv을 설치하고 이를 사용하여 필요한 패키지를 설치합니다.

curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow

Vertex AI의 환경 변수를 설정합니다.

export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE

3. 도구 및 에이전트 정의

멀티 에이전트 시스템에서 에이전트는 세상과 상호작용하는 도구와 무엇을 해야 하는지 아는 특정 역할이 필요합니다.

scale_agents.py 파일을 만들고 다음 코드를 추가합니다. 이렇게 하면 가져오기, 모의 도구, CrewAI 에이전트가 설정됩니다.

import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict

# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"

# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"

# Initialize the LLM to use Vertex AI
llm = LLM(
    model="vertex_ai/gemini-2.5-flash",
    temperature=0.0,
    max_tokens=4096,
)

# --- Step 1: Define Tools ---

@tool("search_products")
def search_products(query: str) -> list:
    """Search for products in the catalog."""
    # Mock product catalog
    products = [
        {"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
        {"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
    ]
    return [p for p in products if query.lower() in p["name"].lower()]

@tool("check_budget")
def check_budget(amount: float) -> dict:
    """Check if a purchase amount is within the budget."""
    limit = 100.0
    if amount <= limit:
        return {"approved": True, "remaining": limit - amount}
    return {"approved": False, "reason": f"Exceeds budget of ${limit}"}

@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
    """Create a purchase order for a product."""
    return {
        "status": "SUCCESS",
        "po_id": f"PO-{product_id}-{quantity}",
        "message": f"Successfully ordered {quantity} units of {product_id}."
    }

# --- Step 2: Define Agents ---

sourcing_agent = Agent(
    role="Sourcing Specialist",
    goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
    backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
    tools=[search_products],
    llm=llm,
    verbose=True,
    allow_delegation=False,
    memory=False,
    reasoning=False,
)

procurement_agent = Agent(
    role="Procurement Officer",
    goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
    backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
    tools=[check_budget, create_purchase_order],
    llm=llm,
    verbose=True,
    allow_delegation=False,
    memory=False,
    reasoning=False,
)

주요 개념

  • @tool 데코레이터: CrewAI는 이를 사용하여 일반 Python 함수를 LLM이 이해하고 호출할 수 있는 도구로 변환합니다. 함수의 유형 힌트와 docstring은 LLM이 이해할 수 있는 도구 스키마를 생성하는 데 사용됩니다.
  • 역할, 목표, 배경: 에이전트의 페르소나를 정의하고 LLM 추론을 안내합니다. 배경은 단순히 재미를 위한 텍스트가 아닙니다. '항상 카탈로그를 검색해'라는 문구는 에이전트가 대답을 환각하는 대신 도구를 사용하도록 유도합니다.
  • reasoning=False: 에이전트가 직접 대답하려고 하지 않고 표준 도구 호출 루프를 따르도록 확장된 추론을 사용 중지합니다.
  • allow_delegation=False: 각 상담사가 다른 상담사에게 작업을 전달하는 대신 할당된 도구에 집중하도록 합니다.

에이전트가 하나가 아닌 두 개인 이유는 무엇인가요? 상담사마다 도구와 업무가 다릅니다. 소싱 전문가는 제품만 검색하고 조달 담당자는 예산과 주문만 처리합니다. 이러한 관심사 분리를 통해 각 에이전트는 집중된 프롬프트와 작고 관련성 있는 도구 세트를 갖게 되므로 모든 것을 처리하는 단일 에이전트보다 LLM 동작의 신뢰성이 높아집니다.

4. 작업 및 작업자 정의

이제 작업을 만들고 이를 크루에 연결하여 이러한 에이전트가 해야 할 일을 정의해 보겠습니다.

동일한 scale_agents.py 파일 끝에 다음 코드를 추가합니다.

# --- Step 3: Define Tasks & Crew ---

sourcing_task = Task(
    description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
    expected_output="The product_id and price of the best matching product from the search_products tool.",
    agent=sourcing_agent
)

procurement_task = Task(
    description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
    expected_output="The purchase order details returned by the create_purchase_order tool.",
    agent=procurement_agent,
    context=[sourcing_task] # This task depends on the output of sourcing_task
)

def run_crew(item_description: str, quantity: int):
    crew = Crew(
        agents=[sourcing_agent, procurement_agent],
        tasks=[sourcing_task, procurement_task],
        process=Process.sequential, # Run tasks in order
        verbose=True,
        memory=False,
        planning=False,
    )
    
    result = crew.kickoff(inputs={
        "item_description": item_description,
        "quantity": quantity
    })
    return result

주요 개념

  • 작업 컨텍스트: context=[sourcing_task]는 조달 작업을 진행하려면 소싱 작업의 출력이 필요하다고 CrewAI에 알려줍니다. 조달 담당자는 소싱 전문가가 찾은 내용을 확인한 후 주문할 항목을 결정할 수 있습니다.
  • Process.sequential: 작업이 나열된 순서대로 실행됩니다. 조달 작업은 소싱 작업의 결과에 따라 달라지므로 어떤 제품을 구매해야 하는지 알기 전에는 주문할 수 없습니다.
  • memory=False / planning=False: 이 데모에서 실행을 간단하고 예측 가능하게 유지하기 위해 CrewAI의 내장 메모리 및 계획 기능을 사용 중지합니다.

5. LangGraph 플래너 만들기

실행팀은 제품 검색, 예산 확인, 주문과 같은 '방법'을 처리합니다. 하지만 '무엇'을 결정하는 사람은 누구인가요? LangGraph로 빌드된 계획 에이전트입니다.

LangGraph는 워크플로를 상태 머신, 즉 에지 (전환)로 연결된 노드 (함수) 그래프로 모델링합니다. 상태는 그래프를 통해 흐르며 각 노드는 공유 상태에서 읽고 공유 상태에 씁니다. 명확하고 결정적인 제어 흐름이 필요한 워크플로를 계획하는 데 적합합니다. 요청을 분석하고, 작업자에게 위임하고, 보고서를 생성합니다.

동일한 scale_agents.py 파일 끝에 다음 코드를 추가합니다.

# --- Step 4: Define LangGraph Planner ---

class PlanState(TypedDict):
    objective: str
    item_description: Optional[str]
    quantity_needed: Optional[int]
    execution_result: Optional[str]
    final_report: Optional[str]

def analyze_alert(state: PlanState) -> PlanState:
    """Node 1: Extract intent from the raw objective string."""
    print("--- ANALYZING ALERT ---")
    # In a production app, you would use an LLM here to extract details.
    # For simplicity, we simulate extraction here.
    objective = state["objective"]
    
    # Hardcoded extraction for the demo
    if "Pixel 7" in objective:
        return {
            "item_description": "Pixel 7",
            "quantity_needed": 1,
        }
    return {
        "item_description": "unknown",
        "quantity_needed": 0,
    }

def delegate_to_executor(state: PlanState) -> PlanState:
    """Node 2: Call the CrewAI Execution Crew."""
    print("--- DELEGATING TO CREW ---")
    if state["item_description"] == "unknown":
        return {"execution_result": "Failed: Unknown item"}
        
    result = run_crew(
        item_description=state["item_description"],
        quantity=state["quantity_needed"]
    )
    return {"execution_result": str(result)}

def generate_report(state: PlanState) -> PlanState:
    """Node 3: Synthesize the final outcome."""
    print("--- GENERATING REPORT ---")
    return {
        "final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
    }

# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)

workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)

app = workflow.compile()

# --- Main Execution ---
if __name__ == "__main__":
    print("Starting Multi-Agent System...")
    
    initial_state = {
        "objective": "Restock 1 Pixel 7 phones for the Tokyo office"
    }
    
    final_state = app.invoke(initial_state)
    
    print("\n=== FINAL REPORT ===")
    print(final_state["final_report"])

주요 개념

  • StateGraph: 상태 머신을 정의합니다. PlanState는 각 노드가 요청을 처리할 때 누적되는 유형이 지정된 상태입니다.
  • 노드: 현재 상태를 가져와 업데이트를 반환하는 함수입니다. 각 노드에는 단일 책임이 있습니다. analyze_alert는 의도를 추출하고, delegate_to_executor는 크루를 실행하고, generate_report는 결과를 요약합니다.
  • 에지: 노드 간의 흐름을 정의합니다. 이 Codelab에서는 간단한 선형 흐름 (analyze → delegate → report)을 사용합니다. 전체 워크숍에서는 파괴적인 요청을 실행기 대신 보안 경로로 라우팅하는 등 조건부 라우팅을 사용하여 이를 확장합니다.

플래너에 LangGraph를 사용하는 이유는 무엇인가요? CrewAI는 도구 호출 에이전트에 적합하지만 플래너에는 결정적 제어 흐름이 필요합니다. '파괴적인 경우 보안 경로로 이동하고 그렇지 않으면 위임합니다.' LangGraph의 상태 머신 모델은 이 라우팅을 명시적이고 테스트 가능하게 만들고, CrewAI는 아래의 자유 형식 도구 실행을 처리합니다.

6. 플래너 및 돌격대 실행

이제 LangGraph 플래너와 CrewAI 크루를 함께 테스트해 보겠습니다.

Cloud Shell 터미널에서 스크립트를 실행합니다.

uv run python scale_agents.py

다음과 같이 수행되는 단계를 나타내는 출력이 표시됩니다.

  1. 알림 분석: LangGraph 노드가 실행됩니다.
  2. Crew에 위임: LangGraph 노드가 CrewAI 크루를 호출합니다.
  3. CrewAI 실행: 소싱 전문가가 제품을 검색하고 조달 담당자가 예산을 확인하고 구매 주문을 생성하는 것을 확인할 수 있습니다.
  4. 최종 보고서: 요약된 결과가 마지막에 출력됩니다.

출력 예 (약어):

Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---

  Agent: Sourcing Specialist
  Tool: search_products  Args: {'query': 'Pixel 7'}
  Tool Completed  Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]

  Agent: Procurement Officer
  Tool: check_budget  Args: {'amount': 50}
  Tool: create_purchase_order  Args: {'product_id': 'pixel-7', 'quantity': 1}
  Tool Completed  Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}

--- GENERATING REPORT ---

=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...

참고: 출력에 [CrewAIEventsBus] Warning: Event pairing mismatch 메시지가 표시될 수 있습니다. CrewAI의 내부 이벤트 추적에서 발생하는 외관상의 경고이며 안전하게 무시해도 됩니다.

참고: CrewAI에 추적이 사용 중지되었다는 메시지가 표시될 수 있습니다. 이는 정보 제공용이며 무시해도 안전합니다.

참고: 모의 OMS에는 $100 예산 한도가 있습니다. 성공적인 해피 패스를 위해 수량을 작게 (약 2개 미만) 유지하세요. 예를 들어 Pixel 7 휴대전화 1대의 가격이 50달러인 경우 예산 확인을 통과하지만 3대의 가격이 150달러인 경우 '예산 초과'로 거부됩니다.

7. 플래너를 A2A 서버에 래핑

LangGraph 플래너는 작동하지만 Python 프로세스 내에 갇혀 있습니다. 다른 프레임워크로 작성되었거나 다른 머신에서 실행될 수 있는 다른 에이전트가 호출할 수 있도록 A2A (에이전트 간) 서버로 래핑합니다.

A2A는 에이전트의 통신 방식을 표준화하는 JSON-RPC 2.0 기반 프로토콜입니다. 주요 개념

개념

목적

에이전트 카드

에이전트의 기능을 설명하는 JSON 메타데이터 (/.well-known/agent-card.json에서 제공됨)

message/send

에이전트에 작업을 전송하는 JSON-RPC 메서드

태스크

상태가 있는 작업 단위 (제출됨 → 작업 중 → 완료/실패)

아티팩트

작업에 연결된 중간 및 최종 출력

a2a_planner.py라는 새 파일을 만듭니다.

import asyncio
import os
import uvicorn

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
                       InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError

# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app


class PlannerAgentExecutor(AgentExecutor):
    """Wraps the LangGraph planner as an A2A service."""

    SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]

    async def execute(self, context: RequestContext, event_queue: EventQueue):
        objective = context.get_user_input()

        # Initialize A2A task tracking
        task = context.current_task or new_task(context.message)
        await event_queue.enqueue_event(task)
        updater = TaskUpdater(event_queue, task.id, task.context_id)

        try:
            # Run the LangGraph planner synchronously in a thread
            initial_state = {"objective": objective}
            result = await asyncio.to_thread(planner_app.invoke, initial_state)
            final_report = result.get("final_report", "No report generated.")
        except Exception as e:
            final_report = f"Execution failed: {e}"

        # Send the result back as an artifact
        await updater.add_artifact(
            [Part(root=TextPart(text=final_report))],
            name="orchestration_report"
        )
        await updater.complete()

    async def cancel(self, context, event_queue):
        raise ServerError(error=InternalError(message="Cancel not supported"))


# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
    name="Retail-Planner-A2A",
    description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
    url=f"http://localhost:{port}/",
    version="1.0.0",
    default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
    default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
    capabilities=AgentCapabilities(streaming=False),
    skills=[
        AgentSkill(
            id="plan_logistics",
            name="Plan Logistics",
            description="Analyzes inventory alerts and orchestrates procurement.",
            tags=["logistics", "planning"],
            examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
        )
    ],
)

if __name__ == "__main__":
    executor = PlannerAgentExecutor()
    handler = DefaultRequestHandler(
        agent_executor=executor, task_store=InMemoryTaskStore()
    )
    server = A2AStarletteApplication(
        agent_card=agent_card, http_handler=handler
    )
    print(f"Starting A2A Planner Server on port {port}...")
    uvicorn.run(server.build(), host="0.0.0.0", port=port)

주요 개념

  • 에이전트 카드: /.well-known/agent-card.json에서 제공됩니다. 에이전트는 이 URL을 가져와서 이 서버가 하는 일을 알아낼 수 있습니다. 여기에는 상담사의 기술, 지원되는 콘텐츠 유형, 기능이 나열됩니다.
  • AgentExecutor.execute(): 구현하는 유일한 메서드입니다. 들어오는 요청을 수신하고, 에이전트 로직 (여기서는 LangGraph 플래너)을 실행하고, 결과를 아티팩트로 다시 전송합니다.
  • TaskUpdater: 작업 수명 주기를 관리합니다. add_artifact()는 중간/최종 출력을 전송하고 complete()는 작업을 완료됨으로 표시합니다. A2A 라이브러리는 모든 JSON-RPC 배관을 처리합니다.

터미널에서 A2A 서버를 시작하여 테스트합니다.

uv run python a2a_planner.py

다른 Cloud Shell 탭을 열고 (현재 탭 옆에 있는 + 클릭) 에이전트 카드가 제공되는지 확인합니다.

cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool

에이전트 카드 JSON이 표시됩니다. 다음 단계를 위해 첫 번째 터미널에서 A2A 서버를 계속 실행합니다.

8. ADK 관제실 빌드

스택의 최상단은 ADK (Google의 에이전트 개발 키트)로 빌드된 Control Room입니다. 사용자의 요청을 수신하고, A2A를 통해 플래너에 위임하고, 결과를 평가하고, 중요한 실패 시 재계획을 처리합니다 (CUJ 2).

ADK는 BaseAgent, LlmAgent, InMemoryRunner와 같은 에이전트 프리미티브를 제공합니다. BaseAgent을 서브클래스화하여 맞춤 조정 로직(A2A 호출, 보고서 분류, LlmAgent 하위 에이전트를 사용한 동적 재계획)을 작성합니다.

control_room.py라는 새 파일을 만듭니다.

import asyncio
import uuid
import os
import httpx

from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator

A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")


def _classify_report(report: str) -> tuple[bool, bool]:
    """Return (is_success, should_retry) for a planner report."""
    normalized = (report or "").replace("*", "").strip().lower()

    success_markers = [
        "status: success", "'status': 'success'",
        "outcome: success", "po_id", "successfully ordered",
    ]
    retryable_markers = ["not found", "discontinued", "no inventory",
                         "unknown item"]
    terminal_markers = [
        "status: failed", "over budget", "not issued",
        "procurement failed",
    ]

    if any(m in normalized for m in terminal_markers):
        return False, False      # Failed, don't retry
    if any(m in normalized for m in retryable_markers):
        return False, True       # Failed, but retryable
    if any(m in normalized for m in success_markers):
        return True, False       # Success!
    return False, False          # Unknown → treat as failure


class ControlRoomAgent(BaseAgent):
    """Top-level orchestrator: delegates via A2A, re-plans on failure."""

    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        # Extract user input from session events
        user_msg = ""
        if ctx.session and ctx.session.events:
            for evt in reversed(ctx.session.events):
                if evt.content and evt.content.role == "user":
                    user_msg = evt.content.parts[0].text
                    break

        max_attempts = 2
        current_objective = user_msg
        final_report = "No report returned."

        for attempt in range(1, max_attempts + 1):
            print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
            print(f"    Objective: {current_objective}")

            # Build the A2A JSON-RPC request
            payload = {
                "jsonrpc": "2.0",
                "id": f"req-{attempt}",
                "method": "message/send",
                "params": {
                    "message": {
                        "message_id": str(uuid.uuid4()),
                        "parts": [{"text": current_objective}],
                        "role": "user"
                    }
                }
            }

            try:
                async with httpx.AsyncClient(timeout=300.0) as client:
                    resp = await client.post(
                        f"{A2A_SERVER_URL}/", json=payload
                    )
                    data = resp.json()
                    if "error" in data:
                        final_report = data["error"].get("message", "Unknown A2A error")
                    elif "result" in data:
                        artifacts = data["result"].get("artifacts", [])
                        if artifacts and "parts" in artifacts[-1]:
                            parts = artifacts[-1]["parts"]
                            if parts and "text" in parts[0]:
                                final_report = parts[0]["text"]
            except Exception as e:
                final_report = f"Connection error: {e}"

            print(f"\n--- Report ---\n{final_report}\n")
            is_success, should_retry = _classify_report(final_report)

            if is_success:
                yield Event(
                    author=self.name,
                    invocation_id=ctx.invocation_id,
                    content=types.Content(
                        role="model",
                        parts=[types.Part.from_text(text=final_report)]
                    ),
                )
                return

            # --- Re-planning (CUJ 2) ---
            if should_retry and attempt < max_attempts:
                print("--- Re-planning with LLM ---")
                replanner = LlmAgent(
                    name=f"replanner_{attempt}",
                    model="gemini-2.5-flash",
                    instruction=(
                        "You are a strategic re-planner. The previous request "
                        "failed. Rewrite the objective to be broader or more "
                        "likely to succeed. Output ONLY the new objective text."
                    ),
                )

                # Run the re-planner as a sub-agent
                child_ctx = InvocationContext(
                    invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
                    agent=replanner,
                    session=ctx.session,
                    session_service=ctx.session_service,
                    run_config=ctx.run_config or RunConfig(),
                )
                child_ctx.user_content = types.Content(
                    role="user",
                    parts=[types.Part.from_text(text=(
                        f"Original Objective: {current_objective}\n"
                        f"Failure Reason: {final_report}\n"
                        "Please broaden the search."
                    ))]
                )

                new_objective = ""
                async for event in replanner.run_async(child_ctx):
                    if event.content and event.content.parts:
                        for part in event.content.parts:
                            if hasattr(part, "text") and part.text:
                                new_objective += part.text

                current_objective = new_objective.strip()
                print(f"New objective: {current_objective}")
                continue

            # Terminal failure (not retryable)
            if not should_retry:
                yield Event(
                    author=self.name,
                    invocation_id=ctx.invocation_id,
                    content=types.Content(
                        role="model",
                        parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
                    ),
                )
                return

        # Max attempts exhausted
        yield Event(
            author=self.name,
            invocation_id=ctx.invocation_id,
            content=types.Content(
                role="model",
                parts=[types.Part.from_text(
                    text=f"FAILED after {max_attempts} attempts: {final_report}"
                )]
            ),
        )


async def main():
    prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
    print(f"Starting Control Room with: {prompt}\n")

    agent = ControlRoomAgent(name="control_room")
    runner = InMemoryRunner(app_name="control_room", agent=agent)
    session = await runner.session_service.create_session(
        app_name="control_room", user_id="admin"
    )
    content = types.Content(
        role="user", parts=[types.Part.from_text(text=prompt)]
    )

    async for event in runner.run_async(
        user_id="admin", session_id=session.id, new_message=content
    ):
        if event.content and event.content.parts:
            for part in event.content.parts:
                if hasattr(part, "text") and part.text:
                    print(f"Result: {part.text}")

    print("\n=== CONTROL ROOM COMPLETE ===")


if __name__ == "__main__":
    asyncio.run(main())

주요 개념

  • BaseAgent: 맞춤 에이전트를 위한 ADK 기본 요소입니다. 이를 서브클래싱하고 _run_async_impl를 재정의하여 임의의 비동기 오케스트레이션 로직(여기서는 A2A 호출 + 분류 + 재계획 루프)을 작성합니다.
  • A2A JSON-RPC 호출: Control Room은 httpx를 사용하여 플래너의 A2A 서버에 표준 message/send 요청을 전송하고 JSON-RPC 응답을 파싱하여 최종 보고서를 추출합니다.
  • _classify_report(): 보고서 텍스트에서 성공, 재시도 가능한 실패 또는 최종 실패를 결정하는 간단한 키워드 기반 분류입니다. 이를 통해 다시 계획 루프가 실행됩니다.
  • 하위 에이전트 호출: 다시 계획하기 위해 컨트롤 룸은 LlmAgent를 만들고 하위 InvocationContext를 구성하고 replanner.run_async(child_ctx)를 호출하여 실행합니다. 이를 통해 맞춤 조정 로직 내에서 LLM 에이전트를 동적으로 가동할 수 있습니다.
  • InMemoryRunner: 인메모리 세션 저장소로 에이전트를 로컬에서 실행합니다. 프로덕션에서는 adk deploy를 사용하여 Vertex AI Agent Engine에 배포합니다.

9. 전체 스택 실행

이제 ADK 관제실 → A2A → LangGraph 플래너 → CrewAI 크루의 완전한 3계층 시스템을 테스트해 보겠습니다.

이전에 연 두 번째 Cloud Shell 탭을 사용하거나 +를 클릭하여 새 탭을 열고 Control Room을 실행합니다. 중요: 각 Cloud Shell 탭에는 자체 셸 세션이 있습니다. 프로젝트 및 환경 변수를 다시 설정해야 합니다.

cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py

전체 오케스트레이션 흐름이 표시됩니다.

  1. Control Room이 요청을 수신하고 A2A 서버에 message/send JSON-RPC 호출을 전송합니다.
  2. A2A 서버가 요청을 수신하고 LangGraph 플래너를 호출합니다.
  3. LangGraph 플래너가 요청을 분석하고 CrewAI 팀에 위임합니다.
  4. CrewAI 크루가 소싱 및 조달 에이전트를 실행합니다.
  5. 결과는 관제실로 다시 전달됩니다.

중요한 사용자 여정 (CUJ)

control_room.py에서 prompt 문자열을 수정하여 다음 시나리오를 실험해 보세요.

CUJ

프롬프트

결과

1. 해피 패스

Restock 1 Pixel 7 phones for the Tokyo office

검색 -> 예산 확인 -> 구매 주문 (성공) 엔드 투 엔드로 작동합니다.

2. 다시 계획하기

Order 1 unit of the discontinued XR-7000 Quantum Holographic Display

계획 도구에서 '실패: 알 수 없는 항목'이 반환됩니다. 관제실에서 이를 감지하고 LlmAgent 재계획기를 호출하여 검색 범위를 넓힙니다. 두 시도 모두 실패하지만 (하드 코딩된 플래너는 'Pixel 7'만 인식함) 다시 계획하는 전체 메커니즘이 작동하는 것을 확인할 수 있습니다.

CUJ 2 (다시 계획)를 테스트하려면 control_room.pyprompt를 다음으로 변경하세요.

prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"

하드코딩된 플래너는 이 항목을 인식하지 못하고 '실패: 알 수 없는 항목'을 반환합니다. Control Room에서 실패를 감지하고, LlmAgent 재계획 도구를 동적으로 생성하고, 더 광범위한 목표로 다시 시도합니다. 계획 도구는 'Pixel 7'만 인식하므로 재시도도 실패하지만 전체 재계획 루프가 작동하는 것을 확인할 수 있습니다. 최종 출력은 FAILED after 2 attempts: ...입니다.

10. 삭제

Google Cloud 계정에 지속적으로 비용이 청구되지 않도록 하려면 이 Codelab에서 생성한 리소스를 삭제하면 됩니다. 생성한 디렉터리를 삭제하면 됩니다.

cd ..
rm -rf scale-agents

11. 마무리

축하합니다. CrewAI, LangGraph, A2A 프로토콜, ADK를 사용하여 멀티 에이전트 오케스트레이션 시스템을 성공적으로 빌드했습니다.

학습한 내용

  • CrewAI의 @tool 데코레이터를 사용하여 에이전트의 도구를 정의하는 방법
  • 역할, 도구, 목표가 서로 다른 전문 에이전트를 만드는 방법
  • 작업 종속 항목이 있는 순차적 크루에 에이전트를 연결하는 방법
  • 크루에 위임하는 LangGraph로 상태 머신 플래너를 빌드하는 방법
  • AgentCardAgentExecutor를 사용하여 플래너를 A2A 서비스로 노출하는 방법
  • A2A를 통해 위임하고 LlmAgent 하위 에이전트를 호출하여 실패 시 다시 계획하는 맞춤 ADK BaseAgent를 빌드하는 방법
  • 프레임워크 전반에서 계획, 실행, 오케스트레이션을 분리하면 모듈성과 복원력이 향상되는 이유

더 알아보기

전체 워크숍에서는 다음을 사용하여 이 시스템을 확장합니다.

  • 실시간 대시보드: 다중 에이전트 진행 상황을 시각화하기 위한 SSE 스트리밍
  • ID Shield: 프롬프트 수준이 아닌 인프라 수준에서 파괴적인 작업을 차단하는 IAM 기반 보안
  • Vertex AI Agent Engine: adk deploy를 사용하여 관리형 클라우드 인프라에 ADK 에이전트 배포

참조 문서