ขยายขนาด Agent ด้วย CrewAI, LangGraph, A2A และ ADK

1. บทนำ

ใน Codelab นี้ คุณจะได้เรียนรู้วิธีสร้างระบบการประสานงานแบบหลาย Agent โดยใช้ CrewAI, LangGraph, โปรโตคอล A2A และ ADK (Agent Development Kit) คุณจะสร้างระบบที่ห้องควบคุม ADK มอบหมายการวางแผนให้กับ LangGraph Planner ซึ่งจะส่งงานไปยังทีมปฏิบัติการ CrewAI โดยทั้งหมดเชื่อมต่อผ่าน A2A เพื่อจัดการสถานการณ์การเติมสินค้าคงคลังของร้านค้าปลีก

การจัดการเป็นกลุ่มแบบหลาย Agent คืออะไร

ในระบบหลายเอเจนต์ AI Agent เฉพาะทางหลายตัวจะทำงานร่วมกันเพื่อทำงานที่ซับซ้อนเกินกว่าที่ Agent ตัวเดียวจะทำได้ แทนที่จะใช้ Agent แบบ Monolithic ตัวเดียวที่ทำทุกอย่าง คุณจะแบ่งปัญหาออกเป็นบทบาทต่างๆ ได้แก่ ผู้วางแผนและผู้ปฏิบัติงาน โดยแต่ละบทบาทจะมีเครื่องมือและความเชี่ยวชาญของตนเอง

ซึ่งคล้ายกับการทำงานขององค์กรที่ประกอบด้วยบุคคล โดยผู้จัดการจะมอบหมายกลยุทธ์แก่นักวิเคราะห์และมอบหมายการดำเนินการแก่ผู้เชี่ยวชาญ สิทธิประโยชน์มีดังนี้

  • การแยกความกังวล: เอเจนต์แต่ละตัวจะมุ่งเน้นสิ่งที่ทำได้ดีที่สุด
  • ความยืดหยุ่นของเฟรมเวิร์ก: ใช้เฟรมเวิร์กที่ดีที่สุดสำหรับแต่ละบทบาท (LangGraph สำหรับตรรกะการวางแผน, CrewAI สำหรับการดำเนินการเครื่องมือ)
  • ความสามารถในการปรับขนาด: เพิ่มเอเจนต์เฉพาะทางโดยไม่ต้องเปลี่ยนทั้งระบบ

สถานการณ์

เมื่อผู้ใช้ส่งคำขอให้เติมสต็อก เช่น "เติมสต็อกโทรศัพท์ Pixel 7 จำนวน 1 เครื่องสำหรับสำนักงานโตเกียว" ระบบจะดำเนินการดังนี้

  1. LangGraph Planner จะวิเคราะห์คำขอและแยกสินค้าและจำนวน
  2. เครื่องมือวางแผนจะมอบหมายการดำเนินการให้กับทีมดำเนินการของ CrewAI
  3. Agent Sourcing Specialist จะค้นหาแคตตาล็อกสินค้าโดยใช้เครื่องมือ
  4. เอเจนต์เจ้าหน้าที่จัดซื้อจะตรวจสอบงบประมาณและออกใบสั่งซื้อโดยใช้เครื่องมือ
  5. ผลลัพธ์จะไหลกลับไปยังเครื่องมือวางแผน ซึ่งจะสร้างรายงานขั้นสุดท้าย
User Request
    
    
┌──────────────────────┐
 ADK Control Room        Top-level orchestrator, re-plans on failure
 (BaseAgent)          
└──────────┬───────────┘
            A2A (JSON-RPC)
           
┌──────────────────────┐
 LangGraph Planner       Analyzes intent, delegates, reports
 (State Machine)      
└──────────┬───────────┘
           
           
┌──────────────────────┐
 CrewAI Execution Crew   Runs agents with tools
  ├─ Sourcing Agent      search_products
  └─ Procurement Agent   check_budget, create_purchase_order
└──────────────────────┘

ชุดซอฟต์แวร์โครงสร้างพื้นฐาน

เลเยอร์

เทคโนโลยี

บทบาท

การวางแผน

LangGraph

เครื่องสถานะที่วิเคราะห์เจตนา กำหนดเส้นทางคำขอ สร้างรายงาน

การดำเนินการ

CrewAI

เอเจนต์ตามบทบาทที่เรียกใช้เครื่องมือตามลำดับ

LLM

Gemini บน Vertex AI

เพิ่มประสิทธิภาพการให้เหตุผลของ Agent และการเลือกเครื่องมือ

การสื่อสารระหว่างเอเจนต์

โปรโตคอล A2A

บริดจ์ JSON-RPC 2.0 เพื่อให้ Agent จากเฟรมเวิร์กต่างๆ สื่อสารกันได้

Orchestrator ระดับบนสุด

ADK (BaseAgent)

รับคำขอ มอบสิทธิ์ผ่าน A2A วางแผนใหม่เมื่อเกิดข้อผิดพลาด

ดูการทำงาน: หากมี ให้ลองใช้ระบบเวอร์ชันที่ใช้งานจริงเต็มรูปแบบที่ https://scale-control-room-761793285222.us-central1.run.app ซึ่งจะขยายสิ่งที่คุณสร้างที่นี่ด้วยแดชบอร์ดแบบเรียลไทม์ โปรโตคอล A2A และความปลอดภัย IAM

สิ่งที่คุณต้องดำเนินการ

  • กำหนดเครื่องมือที่กำหนดเองเพื่อให้ตัวแทนใช้
  • สร้างเอเจนต์เฉพาะทางด้วย CrewAI
  • สร้างเครื่องวางแผนสถานะด้วย LangGraph
  • ประสานงานขั้นตอนระหว่างผู้วางแผนและทีมปฏิบัติการ
  • ห่อหุ้ม Planner ในเซิร์ฟเวอร์ A2A Protocol เพื่อการสื่อสารข้ามเฟรมเวิร์ก
  • สร้างห้องควบคุม ADK ระดับบนสุดที่มอบสิทธิ์ผ่าน A2A และวางแผนใหม่เมื่อเกิดข้อผิดพลาด

สิ่งที่คุณต้องมี

  • เว็บเบราว์เซอร์ เช่น Chrome
  • โปรเจ็กต์ Google Cloud ที่เปิดใช้การเรียกเก็บเงิน

Codelab นี้เหมาะสำหรับนักพัฒนาซอฟต์แวร์ระดับกลางที่คุ้นเคยกับ Python และแนวคิดพื้นฐานของ LLM

ระยะเวลาโดยประมาณ: 35 นาที

ค่าใช้จ่ายโดยประมาณ: ทรัพยากรที่สร้างในโค้ดแล็บนี้ควรมีค่าใช้จ่ายน้อยกว่า $1

2. ก่อนเริ่มต้น

สร้างโปรเจ็กต์ Google Cloud

  1. ในคอนโซล Google Cloud ในหน้าตัวเลือกโปรเจ็กต์ ให้เลือกหรือสร้างโปรเจ็กต์ Google Cloud
  2. ตรวจสอบว่าได้เปิดใช้การเรียกเก็บเงินสำหรับโปรเจ็กต์ที่อยู่ในระบบคลาวด์แล้ว ดูวิธีตรวจสอบว่าได้เปิดใช้การเรียกเก็บเงินในโปรเจ็กต์แล้วหรือไม่

เริ่มต้น Cloud Shell

Cloud Shell คือสภาพแวดล้อมบรรทัดคำสั่งที่ทำงานใน Google Cloud ซึ่งโหลดเครื่องมือที่จำเป็นไว้ล่วงหน้า

  1. คลิกเปิดใช้งาน Cloud Shell ที่ด้านบนของคอนโซล Google Cloud
  2. เมื่อเชื่อมต่อกับ Cloud Shell แล้ว ให้ยืนยันการตรวจสอบสิทธิ์โดยทำดังนี้
    gcloud auth list
    
  3. ตรวจสอบว่าได้กำหนดค่าโปรเจ็กต์แล้ว
    gcloud config get project
    
  4. หากไม่ได้ตั้งค่าโปรเจ็กต์ตามที่คาดไว้ ให้ตั้งค่าดังนี้
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

เปิดใช้ API

เรียกใช้คำสั่งต่อไปนี้เพื่อเปิดใช้ Vertex AI API

gcloud services enable aiplatform.googleapis.com

หมายเหตุ: Cloud Shell จะตรวจสอบสิทธิ์กับบัญชี Google Cloud โดยอัตโนมัติ หากเรียกใช้ Codelab นี้ภายนอก Cloud Shell คุณจะต้องเรียกใช้ gcloud auth application-default login เพื่อตรวจสอบสิทธิ์กับ Vertex AI

ตั้งค่าสภาพแวดล้อม

ใน Cloud Shell ให้สร้างไดเรกทอรีใหม่สำหรับโปรเจ็กต์และไปที่ไดเรกทอรีนั้น

mkdir scale-agents
cd scale-agents

ติดตั้ง uv แล้วใช้เพื่อติดตั้งแพ็กเกจที่จำเป็น

curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow

ตั้งค่าตัวแปรสภาพแวดล้อมสำหรับ Vertex AI ดังนี้

export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE

3. กำหนดเครื่องมือและเอเจนต์

ในระบบหลายเอเจนต์ เอเจนต์ต้องมีเครื่องมือในการโต้ตอบกับโลกภายนอก และมีบทบาทที่เฉพาะเจาะจงเพื่อทราบสิ่งที่ต้องทำ

สร้างไฟล์ชื่อ scale_agents.py แล้วเพิ่มโค้ดต่อไปนี้ ซึ่งจะตั้งค่าการนำเข้า เครื่องมือจำลอง และเอเจนต์ CrewAI

import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict

# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"

# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"

# Initialize the LLM to use Vertex AI
llm = LLM(
    model="vertex_ai/gemini-2.5-flash",
    temperature=0.0,
    max_tokens=4096,
)

# --- Step 1: Define Tools ---

@tool("search_products")
def search_products(query: str) -> list:
    """Search for products in the catalog."""
    # Mock product catalog
    products = [
        {"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
        {"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
    ]
    return [p for p in products if query.lower() in p["name"].lower()]

@tool("check_budget")
def check_budget(amount: float) -> dict:
    """Check if a purchase amount is within the budget."""
    limit = 100.0
    if amount <= limit:
        return {"approved": True, "remaining": limit - amount}
    return {"approved": False, "reason": f"Exceeds budget of ${limit}"}

@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
    """Create a purchase order for a product."""
    return {
        "status": "SUCCESS",
        "po_id": f"PO-{product_id}-{quantity}",
        "message": f"Successfully ordered {quantity} units of {product_id}."
    }

# --- Step 2: Define Agents ---

sourcing_agent = Agent(
    role="Sourcing Specialist",
    goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
    backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
    tools=[search_products],
    llm=llm,
    verbose=True,
    allow_delegation=False,
    memory=False,
    reasoning=False,
)

procurement_agent = Agent(
    role="Procurement Officer",
    goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
    backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
    tools=[check_budget, create_purchase_order],
    llm=llm,
    verbose=True,
    allow_delegation=False,
    memory=False,
    reasoning=False,
)

หัวข้อสำคัญ

  • @tool Decorator: CrewAI ใช้ Decorator นี้เพื่อเปลี่ยนฟังก์ชัน Python ปกติให้เป็นเครื่องมือที่ LLM เข้าใจและเรียกใช้ได้ ระบบจะใช้คำแนะนำประเภทและสตริงเอกสารของฟังก์ชันเพื่อสร้างสคีมาเครื่องมือที่ LLM เข้าใจได้
  • บทบาท เป้าหมาย และเรื่องราวก่อนหน้า: สิ่งเหล่านี้จะกำหนดลักษณะตัวตนของ Agent และเป็นแนวทางในการให้เหตุผลของ LLM เรื่องราวเบื้องหลังไม่ใช่แค่ข้อความเสริม แต่ "คุณค้นหาแคตตาล็อกเสมอ" จะกระตุ้นให้เอเจนต์ใช้เครื่องมือของตนเองแทนการสร้างคำตอบที่ไม่ถูกต้อง
  • reasoning=False: ปิดใช้การให้เหตุผลเพิ่มเติมเพื่อให้เอเจนต์ทำตามลูปการเรียกใช้เครื่องมือมาตรฐานแทนที่จะพยายามตอบคำถามโดยตรง
  • allow_delegation=False: ช่วยให้ตัวแทนแต่ละรายมุ่งเน้นที่เครื่องมือที่ได้รับมอบหมายแทนที่จะส่งต่องานให้ตัวแทนรายอื่น

ทำไมต้องมีเอเจนต์ 2 คนแทนที่จะเป็นคนเดียว ตัวแทนแต่ละคนมีเครื่องมือและงานที่แตกต่างกัน ผู้เชี่ยวชาญด้านการจัดหาสินค้าจะค้นหาสินค้าเท่านั้น ส่วนเจ้าหน้าที่จัดซื้อจะจัดการงบประมาณและคำสั่งซื้อเท่านั้น การแยกความกังวลนี้หมายความว่าเอเจนต์แต่ละรายจะมีพรอมต์ที่มุ่งเน้นและชุดเครื่องมือขนาดเล็กที่เกี่ยวข้อง ซึ่งจะทำให้ LLM ทำงานได้อย่างน่าเชื่อถือมากกว่าเอเจนต์เดียวที่จัดการทุกอย่าง

4. กำหนดงานและลูกเรือ

ตอนนี้มากำหนดสิ่งที่เอเจนต์เหล่านี้ต้องทำโดยการสร้างงานและเชื่อมโยงงานเหล่านั้นเข้ากับทีม

เพิ่มโค้ดต่อไปนี้ที่ส่วนท้ายของไฟล์ scale_agents.py เดียวกัน

# --- Step 3: Define Tasks & Crew ---

sourcing_task = Task(
    description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
    expected_output="The product_id and price of the best matching product from the search_products tool.",
    agent=sourcing_agent
)

procurement_task = Task(
    description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
    expected_output="The purchase order details returned by the create_purchase_order tool.",
    agent=procurement_agent,
    context=[sourcing_task] # This task depends on the output of sourcing_task
)

def run_crew(item_description: str, quantity: int):
    crew = Crew(
        agents=[sourcing_agent, procurement_agent],
        tasks=[sourcing_task, procurement_task],
        process=Process.sequential, # Run tasks in order
        verbose=True,
        memory=False,
        planning=False,
    )
    
    result = crew.kickoff(inputs={
        "item_description": item_description,
        "quantity": quantity
    })
    return result

หัวข้อสำคัญ

  • บริบทของงาน: context=[sourcing_task] บอก CrewAI ว่างานจัดซื้อต้องใช้เอาต์พุตของงานจัดหาเพื่อดำเนินการต่อ เจ้าหน้าที่จัดซื้อจะดูสิ่งที่ผู้เชี่ยวชาญด้านการจัดหาพบก่อนตัดสินใจว่าจะสั่งซื้ออะไร
  • Process.sequential: ระบบจะดำเนินการกับงานตามลำดับที่แสดง ขั้นตอนนี้มีความสำคัญเนื่องจากงานจัดซื้อขึ้นอยู่กับผลลัพธ์ของงานจัดหา คุณจะสั่งซื้อไม่ได้จนกว่าจะทราบว่าควรซื้อผลิตภัณฑ์ใด
  • memory=False / planning=False: ปิดใช้ฟีเจอร์หน่วยความจำและการวางแผนในตัวของ CrewAI เพื่อให้การดำเนินการเรียบง่ายและคาดการณ์ได้สำหรับการสาธิตนี้

5. สร้าง LangGraph Planner

ทีมดำเนินการจะจัดการ "วิธี" ซึ่งก็คือการค้นหาสินค้า การตรวจสอบงบประมาณ และการสั่งซื้อ แต่ใครเป็นคนตัดสินใจว่า "อะไร" ซึ่งก็คือ Planning Agent ที่สร้างด้วย LangGraph

LangGraph สร้างเวิร์กโฟลว์เป็นเครื่องสถานะ ซึ่งเป็นกราฟของโหนด (ฟังก์ชัน) ที่เชื่อมต่อกันด้วยขอบ (การเปลี่ยนสถานะ) สถานะจะไหลผ่านกราฟ โดยแต่ละโหนดจะอ่านและเขียนไปยังสถานะที่แชร์ ซึ่งเหมาะอย่างยิ่งสำหรับเวิร์กโฟลว์การวางแผนที่คุณต้องการโฟลว์การควบคุมที่ชัดเจนและแน่นอน เช่น วิเคราะห์คำขอ มอบหมายให้ทีมงานสร้างรายงาน

เพิ่มโค้ดต่อไปนี้ที่ส่วนท้ายของไฟล์ scale_agents.py เดียวกัน

# --- Step 4: Define LangGraph Planner ---

class PlanState(TypedDict):
    objective: str
    item_description: Optional[str]
    quantity_needed: Optional[int]
    execution_result: Optional[str]
    final_report: Optional[str]

def analyze_alert(state: PlanState) -> PlanState:
    """Node 1: Extract intent from the raw objective string."""
    print("--- ANALYZING ALERT ---")
    # In a production app, you would use an LLM here to extract details.
    # For simplicity, we simulate extraction here.
    objective = state["objective"]
    
    # Hardcoded extraction for the demo
    if "Pixel 7" in objective:
        return {
            "item_description": "Pixel 7",
            "quantity_needed": 1,
        }
    return {
        "item_description": "unknown",
        "quantity_needed": 0,
    }

def delegate_to_executor(state: PlanState) -> PlanState:
    """Node 2: Call the CrewAI Execution Crew."""
    print("--- DELEGATING TO CREW ---")
    if state["item_description"] == "unknown":
        return {"execution_result": "Failed: Unknown item"}
        
    result = run_crew(
        item_description=state["item_description"],
        quantity=state["quantity_needed"]
    )
    return {"execution_result": str(result)}

def generate_report(state: PlanState) -> PlanState:
    """Node 3: Synthesize the final outcome."""
    print("--- GENERATING REPORT ---")
    return {
        "final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
    }

# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)

workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)

app = workflow.compile()

# --- Main Execution ---
if __name__ == "__main__":
    print("Starting Multi-Agent System...")
    
    initial_state = {
        "objective": "Restock 1 Pixel 7 phones for the Tokyo office"
    }
    
    final_state = app.invoke(initial_state)
    
    print("\n=== FINAL REPORT ===")
    print(final_state["final_report"])

หัวข้อสำคัญ

  • StateGraph: กำหนดเครื่องสถานะ PlanState คือสถานะที่พิมพ์ซึ่งสะสมเมื่อแต่ละโหนดประมวลผลคำขอ
  • โหนด: ฟังก์ชันที่ใช้สถานะปัจจุบันและแสดงผลการอัปเดต แต่ละโหนดมีหน้าที่เดียวคือ analyze_alert แยกความตั้งใจ delegate_to_executor เรียกใช้ทีม และ generate_report สรุปผลลัพธ์
  • ขอบ: กำหนดโฟลว์ระหว่างโหนด ใน Codelab นี้ เราใช้โฟลว์เชิงเส้นอย่างง่าย (analyze → delegate → report) เวิร์กช็อปฉบับเต็มจะขยายความด้วยการกำหนดเส้นทางแบบมีเงื่อนไข เช่น การกำหนดเส้นทางคำขอที่ทำลายล้างไปยังเส้นทางความปลอดภัยแทนที่จะเป็นตัวดำเนินการ

เหตุใดจึงควรใช้ LangGraph สำหรับ Planner CrewAI เหมาะสำหรับเอเจนต์ที่เรียกใช้เครื่องมือ แต่ตัววางแผนต้องการโฟลว์การควบคุมที่แน่นอน ซึ่งก็คือ "หากเป็นคำสั่งที่ทำลายล้าง ให้ไปที่เส้นทางความปลอดภัย ไม่เช่นนั้นให้มอบหมาย" โมเดลเครื่องสถานะของ LangGraph ทำให้การกำหนดเส้นทางนี้ชัดเจนและทดสอบได้ ขณะที่ CrewAI จะจัดการการดำเนินการเครื่องมือแบบอิสระด้านล่าง

6. Run the Planner and Crew

ตอนนี้มาทดสอบเครื่องวางแผน LangGraph และทีม CrewAI ด้วยกัน

เรียกใช้สคริปต์ในเทอร์มินัล Cloud Shell โดยใช้คำสั่งต่อไปนี้

uv run python scale_agents.py

คุณควรเห็นเอาต์พุตที่ระบุขั้นตอนที่ดำเนินการดังนี้

  1. การวิเคราะห์การแจ้งเตือน: โหนด LangGraph ทำงาน
  2. การมอบหมายให้ Crew: โหนด LangGraph จะเรียกใช้ CrewAI crew
  3. การดำเนินการของ CrewAI: คุณจะเห็นผู้เชี่ยวชาญด้านการจัดหาสินค้าค้นหาผลิตภัณฑ์ และเจ้าหน้าที่จัดซื้อตรวจสอบงบประมาณและสร้างใบสั่งซื้อ
  4. รายงานสุดท้าย: ผลสรุปจะพิมพ์ที่ส่วนท้าย

ตัวอย่างเอาต์พุต (ย่อ)

Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---

  Agent: Sourcing Specialist
  Tool: search_products  Args: {'query': 'Pixel 7'}
  Tool Completed  Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]

  Agent: Procurement Officer
  Tool: check_budget  Args: {'amount': 50}
  Tool: create_purchase_order  Args: {'product_id': 'pixel-7', 'quantity': 1}
  Tool Completed  Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}

--- GENERATING REPORT ---

=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...

หมายเหตุ: คุณอาจเห็นข้อความ [CrewAIEventsBus] Warning: Event pairing mismatch ในเอาต์พุต คำเตือนเหล่านี้เป็นคำเตือนด้านความสวยงามจากการติดตามเหตุการณ์ภายในของ CrewAI และคุณไม่ต้องสนใจคำเตือนเหล่านี้

หมายเหตุ: CrewAI อาจแสดงข้อความเกี่ยวกับการปิดใช้การติดตาม นี่เป็นเพียงข้อมูลและคุณไม่จำเป็นต้องสนใจ

หมายเหตุ: OMS จำลองมีขีดจำกัดงบประมาณที่$100 รักษาสินค้าคงคลังให้มีจำนวนน้อย (ต่ำกว่า 2 หน่วย) เพื่อให้เส้นทางที่ราบรื่นสำเร็จ เช่น โทรศัพท์ Pixel 7 จำนวน 1 เครื่องที่ราคา $50 จะผ่านการตรวจสอบงบประมาณ แต่โทรศัพท์ 3 เครื่องที่ราคา $150 จะถูกปฏิเสธเนื่องจาก "เกินงบประมาณ"

7. ห่อหุ้ม Planner ในเซิร์ฟเวอร์ A2A

ตัววางแผน LangGraph ทำงานได้ แต่ติดอยู่ภายในกระบวนการ Python เราจึงห่อหุ้มโมเดลไว้ในเซิร์ฟเวอร์ A2A (Agent-to-Agent) เพื่อให้ตัวแทนอื่นๆ เรียกใช้ได้ ซึ่งอาจเขียนในเฟรมเวิร์กอื่นหรือทำงานในเครื่องอื่น

A2A คือโปรโตคอลที่อิงตาม JSON-RPC 2.0 ซึ่งกำหนดมาตรฐานวิธีที่เอเจนต์สื่อสาร แนวคิดหลัก

แนวคิด

วัตถุประสงค์

Agent Card

ข้อมูลเมตา JSON ที่อธิบายความสามารถของเอเจนต์ (แสดงที่ /.well-known/agent-card.json)

message/send

เมธอด JSON-RPC สำหรับส่งงานไปยังเอเจนต์

งาน

หน่วยของงานที่มีสถานะ (ส่งแล้ว → กำลังดำเนินการ → เสร็จสมบูรณ์/ล้มเหลว)

อาร์ติแฟกต์

ผลลัพธ์ระดับกลางและขั้นสุดท้ายที่แนบมากับงาน

สร้างไฟล์ใหม่ a2a_planner.py โดยทำดังนี้

import asyncio
import os
import uvicorn

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
                       InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError

# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app


class PlannerAgentExecutor(AgentExecutor):
    """Wraps the LangGraph planner as an A2A service."""

    SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]

    async def execute(self, context: RequestContext, event_queue: EventQueue):
        objective = context.get_user_input()

        # Initialize A2A task tracking
        task = context.current_task or new_task(context.message)
        await event_queue.enqueue_event(task)
        updater = TaskUpdater(event_queue, task.id, task.context_id)

        try:
            # Run the LangGraph planner synchronously in a thread
            initial_state = {"objective": objective}
            result = await asyncio.to_thread(planner_app.invoke, initial_state)
            final_report = result.get("final_report", "No report generated.")
        except Exception as e:
            final_report = f"Execution failed: {e}"

        # Send the result back as an artifact
        await updater.add_artifact(
            [Part(root=TextPart(text=final_report))],
            name="orchestration_report"
        )
        await updater.complete()

    async def cancel(self, context, event_queue):
        raise ServerError(error=InternalError(message="Cancel not supported"))


# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
    name="Retail-Planner-A2A",
    description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
    url=f"http://localhost:{port}/",
    version="1.0.0",
    default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
    default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
    capabilities=AgentCapabilities(streaming=False),
    skills=[
        AgentSkill(
            id="plan_logistics",
            name="Plan Logistics",
            description="Analyzes inventory alerts and orchestrates procurement.",
            tags=["logistics", "planning"],
            examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
        )
    ],
)

if __name__ == "__main__":
    executor = PlannerAgentExecutor()
    handler = DefaultRequestHandler(
        agent_executor=executor, task_store=InMemoryTaskStore()
    )
    server = A2AStarletteApplication(
        agent_card=agent_card, http_handler=handler
    )
    print(f"Starting A2A Planner Server on port {port}...")
    uvicorn.run(server.build(), host="0.0.0.0", port=port)

หัวข้อสำคัญ

  • การ์ดตัวแทน: แสดงที่ /.well-known/agent-card.json – ตัวแทนใดก็ได้สามารถค้นพบสิ่งที่เซิร์ฟเวอร์นี้ทำได้โดยการดึงข้อมูล URL นั้น โดยจะแสดงทักษะของเอเจนต์ ประเภทเนื้อหาที่รองรับ และความสามารถ
  • AgentExecutor.execute(): วิธีเดียวที่คุณใช้ โดยจะรับคำขอขาเข้า เรียกใช้ตรรกะของเอเจนต์ (ในที่นี้คือ LangGraph Planner) และส่งผลลัพธ์กลับมาเป็นอาร์ติแฟกต์
  • TaskUpdater: จัดการวงจรของงาน - add_artifact() ส่งเอาต์พุตขั้นกลาง/สุดท้าย complete() ทำเครื่องหมายงานว่าเสร็จแล้ว ไลบรารี A2A จะจัดการการเชื่อมต่อ JSON-RPC ทั้งหมด

ทดสอบเซิร์ฟเวอร์ A2A โดยเริ่มเซิร์ฟเวอร์ในเทอร์มินัล

uv run python a2a_planner.py

เปิดแท็บ Cloud Shell อีกแท็บ (คลิก + ข้างแท็บปัจจุบัน) แล้วตรวจสอบว่ามีการแสดงการ์ด Agent หรือไม่

cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool

คุณควรเห็น JSON ของการ์ดเอเจนต์ เปิดเซิร์ฟเวอร์ A2A ในเทอร์มินัลแรกไว้สำหรับขั้นตอนถัดไป

8. สร้างห้องควบคุม ADK

ส่วนบนสุดของสแต็กคือห้องควบคุมที่สร้างขึ้นด้วย ADK (Agent Development Kit ของ Google) โดยจะรับคำขอของผู้ใช้ ส่งต่อให้ Planner ผ่าน A2A ประเมินผลลัพธ์ และที่สำคัญคือจัดการการวางแผนใหม่เมื่อเกิดข้อผิดพลาด (CUJ 2)

ADK มีองค์ประกอบพื้นฐานของ Agent เช่น BaseAgent, LlmAgent และ InMemoryRunner เราใช้คลาสย่อย BaseAgent เพื่อเขียนตรรกะการจัดการเป็นกลุ่มที่กำหนดเอง ได้แก่ การเรียก A2A การจัดประเภทรายงาน และการวางแผนใหม่แบบไดนามิกด้วย Agent ย่อย LlmAgent

สร้างไฟล์ใหม่ control_room.py โดยทำดังนี้

import asyncio
import uuid
import os
import httpx

from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator

A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")


def _classify_report(report: str) -> tuple[bool, bool]:
    """Return (is_success, should_retry) for a planner report."""
    normalized = (report or "").replace("*", "").strip().lower()

    success_markers = [
        "status: success", "'status': 'success'",
        "outcome: success", "po_id", "successfully ordered",
    ]
    retryable_markers = ["not found", "discontinued", "no inventory",
                         "unknown item"]
    terminal_markers = [
        "status: failed", "over budget", "not issued",
        "procurement failed",
    ]

    if any(m in normalized for m in terminal_markers):
        return False, False      # Failed, don't retry
    if any(m in normalized for m in retryable_markers):
        return False, True       # Failed, but retryable
    if any(m in normalized for m in success_markers):
        return True, False       # Success!
    return False, False          # Unknown → treat as failure


class ControlRoomAgent(BaseAgent):
    """Top-level orchestrator: delegates via A2A, re-plans on failure."""

    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        # Extract user input from session events
        user_msg = ""
        if ctx.session and ctx.session.events:
            for evt in reversed(ctx.session.events):
                if evt.content and evt.content.role == "user":
                    user_msg = evt.content.parts[0].text
                    break

        max_attempts = 2
        current_objective = user_msg
        final_report = "No report returned."

        for attempt in range(1, max_attempts + 1):
            print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
            print(f"    Objective: {current_objective}")

            # Build the A2A JSON-RPC request
            payload = {
                "jsonrpc": "2.0",
                "id": f"req-{attempt}",
                "method": "message/send",
                "params": {
                    "message": {
                        "message_id": str(uuid.uuid4()),
                        "parts": [{"text": current_objective}],
                        "role": "user"
                    }
                }
            }

            try:
                async with httpx.AsyncClient(timeout=300.0) as client:
                    resp = await client.post(
                        f"{A2A_SERVER_URL}/", json=payload
                    )
                    data = resp.json()
                    if "error" in data:
                        final_report = data["error"].get("message", "Unknown A2A error")
                    elif "result" in data:
                        artifacts = data["result"].get("artifacts", [])
                        if artifacts and "parts" in artifacts[-1]:
                            parts = artifacts[-1]["parts"]
                            if parts and "text" in parts[0]:
                                final_report = parts[0]["text"]
            except Exception as e:
                final_report = f"Connection error: {e}"

            print(f"\n--- Report ---\n{final_report}\n")
            is_success, should_retry = _classify_report(final_report)

            if is_success:
                yield Event(
                    author=self.name,
                    invocation_id=ctx.invocation_id,
                    content=types.Content(
                        role="model",
                        parts=[types.Part.from_text(text=final_report)]
                    ),
                )
                return

            # --- Re-planning (CUJ 2) ---
            if should_retry and attempt < max_attempts:
                print("--- Re-planning with LLM ---")
                replanner = LlmAgent(
                    name=f"replanner_{attempt}",
                    model="gemini-2.5-flash",
                    instruction=(
                        "You are a strategic re-planner. The previous request "
                        "failed. Rewrite the objective to be broader or more "
                        "likely to succeed. Output ONLY the new objective text."
                    ),
                )

                # Run the re-planner as a sub-agent
                child_ctx = InvocationContext(
                    invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
                    agent=replanner,
                    session=ctx.session,
                    session_service=ctx.session_service,
                    run_config=ctx.run_config or RunConfig(),
                )
                child_ctx.user_content = types.Content(
                    role="user",
                    parts=[types.Part.from_text(text=(
                        f"Original Objective: {current_objective}\n"
                        f"Failure Reason: {final_report}\n"
                        "Please broaden the search."
                    ))]
                )

                new_objective = ""
                async for event in replanner.run_async(child_ctx):
                    if event.content and event.content.parts:
                        for part in event.content.parts:
                            if hasattr(part, "text") and part.text:
                                new_objective += part.text

                current_objective = new_objective.strip()
                print(f"New objective: {current_objective}")
                continue

            # Terminal failure (not retryable)
            if not should_retry:
                yield Event(
                    author=self.name,
                    invocation_id=ctx.invocation_id,
                    content=types.Content(
                        role="model",
                        parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
                    ),
                )
                return

        # Max attempts exhausted
        yield Event(
            author=self.name,
            invocation_id=ctx.invocation_id,
            content=types.Content(
                role="model",
                parts=[types.Part.from_text(
                    text=f"FAILED after {max_attempts} attempts: {final_report}"
                )]
            ),
        )


async def main():
    prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
    print(f"Starting Control Room with: {prompt}\n")

    agent = ControlRoomAgent(name="control_room")
    runner = InMemoryRunner(app_name="control_room", agent=agent)
    session = await runner.session_service.create_session(
        app_name="control_room", user_id="admin"
    )
    content = types.Content(
        role="user", parts=[types.Part.from_text(text=prompt)]
    )

    async for event in runner.run_async(
        user_id="admin", session_id=session.id, new_message=content
    ):
        if event.content and event.content.parts:
            for part in event.content.parts:
                if hasattr(part, "text") and part.text:
                    print(f"Result: {part.text}")

    print("\n=== CONTROL ROOM COMPLETE ===")


if __name__ == "__main__":
    asyncio.run(main())

หัวข้อสำคัญ

  • BaseAgent: องค์ประกอบพื้นฐานของ ADK สำหรับเอเจนต์ที่กำหนดเอง คุณสามารถสร้างคลาสย่อยและแทนที่ _run_async_impl เพื่อเขียนตรรกะการจัดการเป็นกลุ่มแบบไม่พร้อมกันที่กำหนดเองได้ ซึ่งในที่นี้คือการเรียก A2A + ตัวแยกประเภท + วนซ้ำการวางแผนซ้ำ
  • การเรียกใช้ A2A JSON-RPC: Control Room จะส่งคำขอ message/send มาตรฐานไปยังเซิร์ฟเวอร์ A2A ของผู้วางแผนโดยใช้ httpx และแยกวิเคราะห์การตอบกลับ JSON-RPC เพื่อดึงรายงานสุดท้าย
  • _classify_report(): การจัดประเภทตามคีย์เวิร์ดอย่างง่ายที่กำหนดความสำเร็จ ความล้มเหลวที่ลองใหม่ได้ หรือความล้มเหลวที่สิ้นสุดจากข้อความรายงาน ซึ่งจะขับเคลื่อนลูปการวางแผนใหม่
  • การเรียกใช้ Agent ย่อย: หากต้องการวางแผนใหม่ Control Room จะสร้าง LlmAgent และเรียกใช้โดยสร้าง InvocationContext ย่อยและเรียกใช้ replanner.run_async(child_ctx) ซึ่งช่วยให้คุณสร้างเอเจนต์ LLM แบบไดนามิกภายในตรรกะการจัดระเบียบที่กำหนดเองได้
  • InMemoryRunner: เรียกใช้เอเจนต์ในเครื่องด้วยที่เก็บเซสชันในหน่วยความจำ ในเวอร์ชันที่ใช้งานจริง คุณจะใช้ adk deploy เพื่อติดตั้งใช้งาน Vertex AI Agent Engine

9. เรียกใช้ Full Stack

ตอนนี้มาทดสอบระบบ 3 เลเยอร์ที่สมบูรณ์กันเลย: ห้องควบคุม ADK → A2A → LangGraph Planner → ทีม CrewAI

ใช้แท็บ Cloud Shell ที่ 2 ที่คุณเปิดไว้ก่อนหน้านี้ (หรือคลิก + เพื่อเปิดแท็บใหม่) แล้วเรียกใช้ Control Room สำคัญ: แท็บ Cloud Shell แต่ละแท็บจะมีเซสชันเชลล์ของตัวเอง คุณต้องตั้งค่าตัวแปรโปรเจ็กต์และตัวแปรสภาพแวดล้อมอีกครั้ง

cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py

คุณควรเห็นโฟลว์การจัดสรรทั้งหมดดังนี้

  1. Control Room จะรับคำขอและส่งmessage/sendการเรียก JSON-RPC ไปยังเซิร์ฟเวอร์ A2A
  2. เซิร์ฟเวอร์ A2A จะรับคำขอและเรียกใช้เครื่องวางแผน LangGraph
  3. เครื่องมือวางแผน LangGraph จะวิเคราะห์คำขอและมอบหมายให้ทีม CrewAI
  4. ทีม CrewAI จะเรียกใช้เอเจนต์การจัดหาและการจัดซื้อ
  5. ผลลัพธ์จะไหลกลับไปที่ห้องควบคุม

เส้นทางของผู้ใช้ที่สำคัญ (CUJ)

ลองแก้ไขpromptสตริงในcontrol_room.pyเพื่อทดสอบสถานการณ์ต่อไปนี้

CUJ

พรอมต์

สิ่งที่เกิดขึ้น

1. เส้นทางที่ไม่พบข้อผิดพลาด

Restock 1 Pixel 7 phones for the Tokyo office

Search -> budget check -> purchase order (SUCCESS) ทำงานตั้งแต่ต้นจนจบ

2. วางแผนใหม่

Order 1 unit of the discontinued XR-7000 Quantum Holographic Display

เครื่องมือวางแผนแสดงผล "ล้มเหลว: รายการที่ไม่รู้จัก" Control Room จะตรวจพบสิ่งนี้และเรียกใช้LlmAgentเครื่องมือวางแผนใหม่เพื่อขยายการค้นหา ทั้ง 2 ครั้งไม่สำเร็จ (เครื่องมือวางแผนที่ฮาร์ดโค้ดรู้จักเฉพาะ "Pixel 7") แต่คุณจะเห็นกลไกการวางแผนใหม่ทั้งหมดในการทำงาน

หากต้องการทดสอบ CUJ 2 (การวางแผนใหม่) ให้เปลี่ยน prompt ใน control_room.py เป็น

prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"

เครื่องมือวางแผนที่ฮาร์ดโค้ดจะไม่รู้จักรายการนี้และจะแสดงข้อความ "ล้มเหลว: ไม่รู้จักรายการ" Control Room จะตรวจพบความล้มเหลว สร้างLlmAgentเครื่องมือวางแผนใหม่แบบไดนามิก และลองอีกครั้งโดยมีวัตถุประสงค์ที่กว้างขึ้น เนื่องจากเครื่องวางแผนรู้จักเฉพาะ "Pixel 7" การลองอีกครั้งจึงไม่สำเร็จเช่นกัน แต่คุณจะเห็นวงจรการวางแผนซ้ำทั้งหมดในการทำงาน เอาต์พุตสุดท้ายจะเป็น FAILED after 2 attempts: ...

10. ล้างข้อมูล

หากต้องการหลีกเลี่ยงการเรียกเก็บเงินอย่างต่อเนื่องในบัญชี Google Cloud คุณสามารถลบทรัพยากรที่สร้างขึ้นในระหว่าง Codelab นี้ได้ คุณสามารถนำไดเรกทอรีที่สร้างออกได้โดยทำดังนี้

cd ..
rm -rf scale-agents

11. ขอแสดงความยินดี

ยินดีด้วย คุณสร้างระบบการประสานงานแบบหลาย Agent โดยใช้ CrewAI, LangGraph, โปรโตคอล A2A และ ADK ได้สำเร็จแล้ว

สิ่งที่คุณได้เรียนรู้

  • วิธีกำหนดเครื่องมือสำหรับเอเจนต์โดยใช้ตัวตกแต่ง @tool ของ CrewAI
  • วิธีสร้างเอเจนต์เฉพาะทางที่มีบทบาท เครื่องมือ และเป้าหมายที่แตกต่างกัน
  • วิธีเชื่อมโยงเอเจนต์เข้ากับทีมตามลำดับที่มีการขึ้นต่อกันของงาน
  • วิธีสร้างเครื่องมือวางแผน State Machine ด้วย LangGraph ที่มอบหมายให้ Crew
  • วิธีเปิดเผยเครื่องมือวางแผนเป็นบริการ A2A ด้วย AgentCard และ AgentExecutor
  • วิธีสร้าง ADK ที่กำหนดเองBaseAgentซึ่งมอบสิทธิ์ผ่าน A2A และวางแผนใหม่เมื่อไม่สำเร็จโดยการเรียกใช้LlmAgent Agent ย่อย
  • เหตุใดการแยกการวางแผน การดำเนินการ และการจัดการเป็นกลุ่มในเฟรมเวิร์กต่างๆ จึงช่วยให้คุณมีความสามารถในการปรับเปลี่ยนและมีความยืดหยุ่น

ไปต่อ

เวิร์กช็อปแบบเต็มจะขยายระบบนี้ด้วยสิ่งต่อไปนี้

  • แดชบอร์ดแบบเรียลไทม์ - การสตรีม SSE เพื่อแสดงภาพความคืบหน้าของหลายเอเจนต์
  • Identity Shield - ความปลอดภัยที่อิงตาม IAM ซึ่งบล็อกการดำเนินการที่เป็นอันตรายในระดับโครงสร้างพื้นฐาน ไม่ใช่ระดับพรอมต์
  • Vertex AI Agent Engine - นำ Agent ของ ADK ไปใช้งานในโครงสร้างพื้นฐานระบบคลาวด์ที่มีการจัดการด้วย adk deploy

เอกสารอ้างอิง