1. Giriş
Bu codelab'de CrewAI, LangGraph, A2A protokolü ve ADK (Agent Development Kit) kullanarak çoklu ajanlı bir orkestrasyon sistemi oluşturmayı öğreneceksiniz. Bir ADK kontrol odasının, planlamayı LangGraph planlayıcısına devrettiği ve perakende envanterini yeniden stoklama senaryosunu yönetmek için görevleri A2A aracılığıyla bağlı bir CrewAI yürütme ekibine gönderdiği bir sistem oluşturacaksınız.
Çoklu aracı düzenleme nedir?
Çoklu aracı sisteminde, tek bir aracı için çok karmaşık olacak görevleri tamamlamak üzere birden fazla uzmanlaşmış yapay zeka aracısı işbirliği yapar. Her şeyi yapan tek bir monolitik aracı yerine, sorunu her biri kendi araçlarına ve uzmanlığına sahip olan roller (planlayıcı ve uygulayıcı) şeklinde ayrıştırırsınız.
Bu, insan kuruluşlarının çalışma şeklini yansıtır: Bir yönetici, stratejiyi analistlere, uygulamayı ise uzmanlara devreder. Mobil uygulamaları hedeflemenin avantajları şunlardır:
- İlgi alanlarının ayrılması: Her aracı, en iyi yaptığı işe odaklanır.
- Çerçeve esnekliği: Her rol için en iyi çerçeveyi kullanın (planlama mantığı için LangGraph, araç yürütme için CrewAI)
- Ölçeklenebilirlik: Tüm sistemi değiştirmeden uzmanlaşmış aracıları ekleyin.
Senaryo
Kullanıcı, "Tokyo ofisi için 1 Pixel 7 telefonu stoklayın" gibi bir stok yenileme isteği gönderdiğinde sistem:
- LangGraph Planner, isteği analiz edip öğeyi ve miktarı çıkarır.
- Planlayıcı, yürütme görevini CrewAI Execution Crew'a devreder.
- Bir kaynak bulma uzmanı, araçları kullanarak ürün kataloğunda arama yapar.
- Bir satın alma görevlisi, bütçeyi doğrular ve araçları kullanarak satın alma siparişi verir.
- Sonuç, planlayıcıya geri gönderilir ve planlayıcı nihai bir rapor oluşturur.
User Request
│
▼
┌──────────────────────┐
│ ADK Control Room │ ← Top-level orchestrator, re-plans on failure
│ (BaseAgent) │
└──────────┬───────────┘
│ A2A (JSON-RPC)
▼
┌──────────────────────┐
│ LangGraph Planner │ ← Analyzes intent, delegates, reports
│ (State Machine) │
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ CrewAI Execution Crew│ ← Runs agents with tools
│ ├─ Sourcing Agent │ → search_products
│ └─ Procurement Agent│ → check_budget, create_purchase_order
└──────────────────────┘
Teknoloji yığını
Katman | Teknoloji | Rol |
Planlama | LangGraph | Amacı analiz eden, istekleri yönlendiren ve raporlar oluşturan durum makinesi |
Yürütme | CrewAI | Araçları sırayla çağıran role dayalı aracılar |
LLM | Vertex AI'da Gemini | Temsilcinin akıl yürütme ve araç seçme süreçlerini destekler. |
Temsilciler arası iletişim | A2A Protokolü | Farklı çerçevelerden gelen aracıların iletişim kurabilmesi için JSON-RPC 2.0 köprüsü |
Üst düzey düzenleyici | ADK (BaseAgent) | İstekleri alır, A2A üzerinden temsil eder, başarısızlık durumunda yeniden planlar |
Uygulamada görün: Mümkünse https://scale-control-room-761793285222.us-central1.run.app adresinde tam üretim sistemini deneyin. Bu sistem, burada oluşturacağınız sistemi gerçek zamanlı bir kontrol paneli, A2A protokolü ve IAM güvenliği ile genişletir.
Yapacaklarınız
- Temsilcilerin kullanması için özel araçlar tanımlayın.
- CrewAI ile uzmanlaşmış temsilciler oluşturun.
- LangGraph ile durum makinesi planlayıcı oluşturun.
- Planlayıcı ile yürütme ekibi arasındaki akışı düzenleyin.
- Çerçeveler arası iletişim için planlayıcıyı A2A Protokolü sunucusuna yerleştirin.
- A2A aracılığıyla yetki veren ve hata durumunda yeniden planlama yapan üst düzey bir ADK Kontrol Odası oluşturun.
İhtiyacınız olanlar
- Chrome gibi bir web tarayıcısı
- Faturalandırmanın etkin olduğu bir Google Cloud projesi
Bu codelab, Python ve temel LLM kavramları hakkında bilgi sahibi olan orta düzeydeki geliştiriciler içindir.
Tahmini süre: 35 dakika.
Maliyet tahmini: Bu codelab'de oluşturulan kaynakların maliyeti 1 ABD dolarından az olmalıdır.
2. Başlamadan önce
Google Cloud projesi oluşturma
- Google Cloud Console'daki proje seçici sayfasında bir Google Cloud projesi seçin veya oluşturun.
- Cloud projeniz için faturalandırmanın etkinleştirildiğinden emin olun. Bir projede faturalandırmanın etkin olup olmadığını kontrol etmeyi öğrenin.
Cloud Shell'i Başlatma
Cloud Shell, Google Cloud'da çalışan ve gerekli araçların önceden yüklendiği bir komut satırı ortamıdır.
- Google Cloud Console'un üst kısmından Cloud Shell'i etkinleştir'i tıklayın.
- Cloud Shell'e bağlandıktan sonra kimlik doğrulamanızı onaylayın:
gcloud auth list - Projenizin yapılandırıldığını onaylayın:
gcloud config get project - Projeniz beklendiği gibi ayarlanmamışsa şu şekilde ayarlayın:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
API'leri etkinleştir
Vertex AI API'yi etkinleştirmek için şu komutu çalıştırın:
gcloud services enable aiplatform.googleapis.com
Not: Cloud Shell, Google Cloud hesabınızla otomatik olarak kimlik doğrulaması yapar. Bu codelab'i Cloud Shell dışında çalıştırıyorsanız Vertex AI ile kimlik doğrulaması yapmak için gcloud auth application-default login komutunu çalıştırmanız gerekir.
Ortamınızı ayarlama
Cloud Shell'de projeniz için yeni bir dizin oluşturun ve bu dizine gidin:
mkdir scale-agents
cd scale-agents
uv aracını yükleyin ve gerekli paketleri yüklemek için bu aracı kullanın:
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow
Vertex AI için ortam değişkenlerini ayarlayın:
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
3. Araçları ve Temsilcileri Tanımlama
Çoklu aracı sisteminde, aracıların dünyayla etkileşim kurmak için araçlara ve ne yapacaklarını bilmek için belirli rollere ihtiyacı vardır.
scale_agents.py adlı bir dosya oluşturun ve aşağıdaki kodu ekleyin. Bu işlem, içe aktarmaları, sahte araçları ve CrewAI aracılarını ayarlar.
import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"
# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"
# Initialize the LLM to use Vertex AI
llm = LLM(
model="vertex_ai/gemini-2.5-flash",
temperature=0.0,
max_tokens=4096,
)
# --- Step 1: Define Tools ---
@tool("search_products")
def search_products(query: str) -> list:
"""Search for products in the catalog."""
# Mock product catalog
products = [
{"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
{"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
]
return [p for p in products if query.lower() in p["name"].lower()]
@tool("check_budget")
def check_budget(amount: float) -> dict:
"""Check if a purchase amount is within the budget."""
limit = 100.0
if amount <= limit:
return {"approved": True, "remaining": limit - amount}
return {"approved": False, "reason": f"Exceeds budget of ${limit}"}
@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
"""Create a purchase order for a product."""
return {
"status": "SUCCESS",
"po_id": f"PO-{product_id}-{quantity}",
"message": f"Successfully ordered {quantity} units of {product_id}."
}
# --- Step 2: Define Agents ---
sourcing_agent = Agent(
role="Sourcing Specialist",
goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
tools=[search_products],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
procurement_agent = Agent(
role="Procurement Officer",
goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
tools=[check_budget, create_purchase_order],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
Temel Kavramlar
@tooldecorator: CrewAI, bunu normal Python işlevlerini LLM'lerin anlayabileceği ve çağırabileceği araçlara dönüştürmek için kullanır. İşlevin tür ipuçları ve docstring'i, LLM'nin anlayabileceği bir araç şeması oluşturmak için kullanılır.- Rol, Amaç ve Arka Plan Hikayesi: Bunlar, temsilcinin karakterini tanımlar ve LLM akıl yürütmesini yönlendirir. Arka plan bilgisi yalnızca bir açıklama metni değildir. "Her zaman kataloga bakarsın" ifadesi, aracıyı halüsinasyonlar üretmek yerine araçlarını kullanmaya teşvik eder.
reasoning=False: Ajanın doğrudan yanıt vermeye çalışmak yerine standart araç çağırma döngüsünü izlemesi için genişletilmiş akıl yürütmeyi devre dışı bırakır.allow_delegation=False: Her temsilcinin işi diğer temsilcilere aktarmak yerine kendi atanan araçlarına odaklanmasını sağlar.
Neden bir yerine iki aracı? Her temsilcinin farklı araçları ve farklı bir işi vardır. Tedarik uzmanı yalnızca ürün arar, satın alma görevlisi ise yalnızca bütçeleri ve siparişleri yönetir. Bu ilgi alanlarının ayrılması, her aracının odaklanmış bir isteme ve küçük, alakalı bir araç setine sahip olduğu anlamına gelir. Bu da her şeyi tek başına yöneten bir araca kıyasla daha güvenilir bir LLM davranışı sağlar.
4. Görevleri ve Ekibi Tanımlama
Şimdi Görevler oluşturup bunları Ekip'e bağlayarak bu aracıların ne yapması gerektiğini tanımlayalım.
Aynı scale_agents.py dosyasının sonuna aşağıdaki kodu ekleyin:
# --- Step 3: Define Tasks & Crew ---
sourcing_task = Task(
description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
expected_output="The product_id and price of the best matching product from the search_products tool.",
agent=sourcing_agent
)
procurement_task = Task(
description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
expected_output="The purchase order details returned by the create_purchase_order tool.",
agent=procurement_agent,
context=[sourcing_task] # This task depends on the output of sourcing_task
)
def run_crew(item_description: str, quantity: int):
crew = Crew(
agents=[sourcing_agent, procurement_agent],
tasks=[sourcing_task, procurement_task],
process=Process.sequential, # Run tasks in order
verbose=True,
memory=False,
planning=False,
)
result = crew.kickoff(inputs={
"item_description": item_description,
"quantity": quantity
})
return result
Temel Kavramlar
- Görev Bağlamı:
context=[sourcing_task]CrewAI'ya, tedarik görevinin devam edebilmesi için kaynak bulma görevinin çıktısının gerektiğini söyler. Satın alma görevlisi, ne sipariş edeceğine karar vermeden önce Kaynak Bulma Uzmanı'nın bulduklarını görebilir. - Process.sequential: Görevler, listelendikleri sırayla yürütülür. Bu önemlidir. Çünkü satın alma görevi, tedarik görevindeki sonuçlara bağlıdır. Hangi ürünü satın alacağınızı bilmeden sipariş veremezsiniz.
memory=False/planning=False: Bu demoda yürütme işleminin basit ve öngörülebilir olması için CrewAI'ın yerleşik bellek ve planlama özelliklerini devre dışı bırakır.
5. LangGraph Planlayıcı'yı oluşturma
Yürütme ekibi, "nasıl" sorusunun cevabını verir: ürün arama, bütçe kontrolü, sipariş verme. Peki "ne"ye kim karar veriyor? Bu, LangGraph ile oluşturulan Planlama Ajanı'dır.
LangGraph, iş akışlarını durum makinesi olarak modeller. Bu, kenarlarla (geçişler) bağlanmış düğümlerden (işlevler) oluşan bir grafiktir. Durum, grafikte akar. Her düğüm, paylaşılan durumu okur ve bu duruma yazar. Bu, net ve deterministik bir kontrol akışına ihtiyacınız olan iş akışlarını planlamak için idealdir: isteği analiz etme, ekibe görev atama, rapor oluşturma.
Aynı scale_agents.py dosyasının sonuna aşağıdaki kodu ekleyin:
# --- Step 4: Define LangGraph Planner ---
class PlanState(TypedDict):
objective: str
item_description: Optional[str]
quantity_needed: Optional[int]
execution_result: Optional[str]
final_report: Optional[str]
def analyze_alert(state: PlanState) -> PlanState:
"""Node 1: Extract intent from the raw objective string."""
print("--- ANALYZING ALERT ---")
# In a production app, you would use an LLM here to extract details.
# For simplicity, we simulate extraction here.
objective = state["objective"]
# Hardcoded extraction for the demo
if "Pixel 7" in objective:
return {
"item_description": "Pixel 7",
"quantity_needed": 1,
}
return {
"item_description": "unknown",
"quantity_needed": 0,
}
def delegate_to_executor(state: PlanState) -> PlanState:
"""Node 2: Call the CrewAI Execution Crew."""
print("--- DELEGATING TO CREW ---")
if state["item_description"] == "unknown":
return {"execution_result": "Failed: Unknown item"}
result = run_crew(
item_description=state["item_description"],
quantity=state["quantity_needed"]
)
return {"execution_result": str(result)}
def generate_report(state: PlanState) -> PlanState:
"""Node 3: Synthesize the final outcome."""
print("--- GENERATING REPORT ---")
return {
"final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
}
# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)
workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)
app = workflow.compile()
# --- Main Execution ---
if __name__ == "__main__":
print("Starting Multi-Agent System...")
initial_state = {
"objective": "Restock 1 Pixel 7 phones for the Tokyo office"
}
final_state = app.invoke(initial_state)
print("\n=== FINAL REPORT ===")
print(final_state["final_report"])
Temel Kavramlar
- StateGraph: Durum makinesini tanımlar.
PlanState, her düğüm isteği işledikçe biriken türü belirtir. - Düğümler: Mevcut durumu alan ve bu durumda güncellemeler döndüren işlevler. Her düğümün tek bir sorumluluğu vardır:
analyze_alertamacı çıkarır,delegate_to_executorekibi çalıştırır,generate_reportsonucu özetler. - Kenarlar: Düğümler arasındaki akışı tanımlar. Bu codelab'de basit bir doğrusal akış (
analyze → delegate → report) kullanıyoruz. Tam atölye çalışmasında bu akış, koşullu yönlendirme ile genişletilir. Örneğin, yıkıcı istekler yürütücü yerine bir güvenlik yoluna yönlendirilir.
Planlayıcı için neden LangGraph? CrewAI, araç çağırma aracıları için harika bir seçenektir ancak planlayıcının deterministik kontrol akışına ihtiyacı vardır: "Yıkıcıysa güvenlik yoluna git, aksi takdirde yetki ver." LangGraph'ın durum makinesi modeli bu yönlendirmeyi açık ve test edilebilir hale getirirken CrewAI, aşağıdaki serbest biçimli araç yürütme işlemini gerçekleştirir.
6. Run the Planner and Crew
Şimdi LangGraph planlayıcıyı ve CrewAI ekibini birlikte test edelim.
Cloud Shell terminalinizde komut dosyasını çalıştırın:
uv run python scale_agents.py
Yapılan adımları gösteren bir çıkış görmeniz gerekir:
- Uyarıyı Analiz Etme: LangGraph düğümü çalışır.
- Crew'e görev verme: LangGraph düğümü, CrewAI ekibini çağırır.
- CrewAI Execution (CrewAI Yürütme): Kaynak Uzmanı'nın ürünü aradığını, Satın Alma Görevlisi'nin ise bütçeyi kontrol edip satın alma siparişi oluşturduğunu göreceksiniz.
- Son Rapor: Özetlenen sonuç en sonda yazdırılır.
Örnek çıkış (kısaltılmış):
Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---
Agent: Sourcing Specialist
Tool: search_products Args: {'query': 'Pixel 7'}
Tool Completed Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]
Agent: Procurement Officer
Tool: check_budget Args: {'amount': 50}
Tool: create_purchase_order Args: {'product_id': 'pixel-7', 'quantity': 1}
Tool Completed Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}
--- GENERATING REPORT ---
=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...
Not: Çıkışta [CrewAIEventsBus] Warning: Event pairing mismatch mesajlarını görebilirsiniz. Bunlar, CrewAI'nın dahili etkinlik izlemesinden kaynaklanan ve güvenle yoksayabileceğiniz kozmetik uyarılardır.
Not: CrewAI, izlemenin devre dışı bırakıldığıyla ilgili bir mesaj gösterebilir. Bu mesaj bilgilendirme amaçlıdır ve güvenle yoksayılabilir.
Not: Sahte OMS'nin 100 ABD doları bütçe sınırı vardır. İşlemin başarılı olması için miktarları düşük tutun (yaklaşık 2 birimin altında). Örneğin, 50 TL'lik 1 Pixel 7 Telefon bütçe kontrolünden geçer ancak 150 TL'lik 3 birim "Bütçeyi Aşıyor" olarak reddedilir.
7. Planlayıcıyı A2A sunucusuna sarmalama
LangGraph planlayıcı çalışıyor ancak bir Python sürecine hapsolmuş durumda. Farklı çerçevelerde yazılmış veya farklı makinelerde çalışan diğer ajanlar tarafından çağrılabilir hale getirmek için bunu bir A2A (Agent-to-Agent) sunucusuna sarmalarız.
A2A, aracıların nasıl iletişim kurduğunu standartlaştıran, JSON-RPC 2.0 tabanlı bir protokoldür. Temel kavramlar:
Kavram | Amaç |
Agent Card | Aracının özelliklerini açıklayan JSON meta verileri ( |
| Aracıya görev göndermek için kullanılan JSON-RPC yöntemi |
Görev | Durumu olan bir iş birimi (gönderildi → çalışıyor → tamamlandı/başarısız oldu) |
Artifacts | Bir göreve eklenen ara ve nihai çıktılar |
Yeni dosya oluşturma a2a_planner.py:
import asyncio
import os
import uvicorn
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError
# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app
class PlannerAgentExecutor(AgentExecutor):
"""Wraps the LangGraph planner as an A2A service."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
async def execute(self, context: RequestContext, event_queue: EventQueue):
objective = context.get_user_input()
# Initialize A2A task tracking
task = context.current_task or new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
try:
# Run the LangGraph planner synchronously in a thread
initial_state = {"objective": objective}
result = await asyncio.to_thread(planner_app.invoke, initial_state)
final_report = result.get("final_report", "No report generated.")
except Exception as e:
final_report = f"Execution failed: {e}"
# Send the result back as an artifact
await updater.add_artifact(
[Part(root=TextPart(text=final_report))],
name="orchestration_report"
)
await updater.complete()
async def cancel(self, context, event_queue):
raise ServerError(error=InternalError(message="Cancel not supported"))
# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
name="Retail-Planner-A2A",
description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
url=f"http://localhost:{port}/",
version="1.0.0",
default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
capabilities=AgentCapabilities(streaming=False),
skills=[
AgentSkill(
id="plan_logistics",
name="Plan Logistics",
description="Analyzes inventory alerts and orchestrates procurement.",
tags=["logistics", "planning"],
examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
)
],
)
if __name__ == "__main__":
executor = PlannerAgentExecutor()
handler = DefaultRequestHandler(
agent_executor=executor, task_store=InMemoryTaskStore()
)
server = A2AStarletteApplication(
agent_card=agent_card, http_handler=handler
)
print(f"Starting A2A Planner Server on port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Temel Kavramlar
- Temsilci Kartı:
/.well-known/agent-card.jsonkonumunda sunulur. Herhangi bir temsilci, bu URL'yi getirerek sunucunun ne yaptığını öğrenebilir. Bu listede, aracının becerileri, desteklenen içerik türleri ve özellikleri yer alır. AgentExecutor.execute(): Uyguladığınız tek yöntem. Gelen isteği alır, aracı mantığınızı (burada LangGraph planlayıcı) çalıştırır ve sonuçları yapıt olarak geri gönderir.TaskUpdater: Görev yaşam döngüsünü yönetir.add_artifact()ara/son çıktıları gönderir,complete()görevi tamamlandı olarak işaretler. A2A kitaplığı, tüm JSON-RPC bağlantılarını yönetir.
Bir terminalde başlatarak A2A sunucusunu test edin:
uv run python a2a_planner.py
Başka bir Cloud Shell sekmesi açın (geçerli sekmenin yanındaki + işaretini tıklayın) ve Agent Card'ın sunulduğunu doğrulayın:
cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool
Aracı kartı JSON'unu görmeniz gerekir. Sonraki adım için A2A sunucusunu ilk terminalde çalışır durumda tutun.
8. ADK Kontrol Odası'nı oluşturma
Yığının en üstünde, ADK (Google'ın Agent Development Kit'i) ile oluşturulmuş Control Room yer alır. Kullanıcının isteğini alır, A2A aracılığıyla planlayıcıya devreder, sonucu değerlendirir ve en önemlisi başarısızlık durumunda yeniden planlamayı gerçekleştirir (CUJ 2).
ADK, BaseAgent, LlmAgent ve InMemoryRunner gibi aracı temel öğeleri sağlar. Özel düzenleme mantığı (A2A çağrıları, rapor sınıflandırması ve LlmAgent alt aracısıyla dinamik yeniden planlama) yazmak için BaseAgent alt sınıfını oluştururuz.
Yeni dosya oluşturma control_room.py:
import asyncio
import uuid
import os
import httpx
from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator
A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")
def _classify_report(report: str) -> tuple[bool, bool]:
"""Return (is_success, should_retry) for a planner report."""
normalized = (report or "").replace("*", "").strip().lower()
success_markers = [
"status: success", "'status': 'success'",
"outcome: success", "po_id", "successfully ordered",
]
retryable_markers = ["not found", "discontinued", "no inventory",
"unknown item"]
terminal_markers = [
"status: failed", "over budget", "not issued",
"procurement failed",
]
if any(m in normalized for m in terminal_markers):
return False, False # Failed, don't retry
if any(m in normalized for m in retryable_markers):
return False, True # Failed, but retryable
if any(m in normalized for m in success_markers):
return True, False # Success!
return False, False # Unknown → treat as failure
class ControlRoomAgent(BaseAgent):
"""Top-level orchestrator: delegates via A2A, re-plans on failure."""
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
# Extract user input from session events
user_msg = ""
if ctx.session and ctx.session.events:
for evt in reversed(ctx.session.events):
if evt.content and evt.content.role == "user":
user_msg = evt.content.parts[0].text
break
max_attempts = 2
current_objective = user_msg
final_report = "No report returned."
for attempt in range(1, max_attempts + 1):
print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
print(f" Objective: {current_objective}")
# Build the A2A JSON-RPC request
payload = {
"jsonrpc": "2.0",
"id": f"req-{attempt}",
"method": "message/send",
"params": {
"message": {
"message_id": str(uuid.uuid4()),
"parts": [{"text": current_objective}],
"role": "user"
}
}
}
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{A2A_SERVER_URL}/", json=payload
)
data = resp.json()
if "error" in data:
final_report = data["error"].get("message", "Unknown A2A error")
elif "result" in data:
artifacts = data["result"].get("artifacts", [])
if artifacts and "parts" in artifacts[-1]:
parts = artifacts[-1]["parts"]
if parts and "text" in parts[0]:
final_report = parts[0]["text"]
except Exception as e:
final_report = f"Connection error: {e}"
print(f"\n--- Report ---\n{final_report}\n")
is_success, should_retry = _classify_report(final_report)
if is_success:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=final_report)]
),
)
return
# --- Re-planning (CUJ 2) ---
if should_retry and attempt < max_attempts:
print("--- Re-planning with LLM ---")
replanner = LlmAgent(
name=f"replanner_{attempt}",
model="gemini-2.5-flash",
instruction=(
"You are a strategic re-planner. The previous request "
"failed. Rewrite the objective to be broader or more "
"likely to succeed. Output ONLY the new objective text."
),
)
# Run the re-planner as a sub-agent
child_ctx = InvocationContext(
invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
agent=replanner,
session=ctx.session,
session_service=ctx.session_service,
run_config=ctx.run_config or RunConfig(),
)
child_ctx.user_content = types.Content(
role="user",
parts=[types.Part.from_text(text=(
f"Original Objective: {current_objective}\n"
f"Failure Reason: {final_report}\n"
"Please broaden the search."
))]
)
new_objective = ""
async for event in replanner.run_async(child_ctx):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
new_objective += part.text
current_objective = new_objective.strip()
print(f"New objective: {current_objective}")
continue
# Terminal failure (not retryable)
if not should_retry:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
),
)
return
# Max attempts exhausted
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(
text=f"FAILED after {max_attempts} attempts: {final_report}"
)]
),
)
async def main():
prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
print(f"Starting Control Room with: {prompt}\n")
agent = ControlRoomAgent(name="control_room")
runner = InMemoryRunner(app_name="control_room", agent=agent)
session = await runner.session_service.create_session(
app_name="control_room", user_id="admin"
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
)
async for event in runner.run_async(
user_id="admin", session_id=session.id, new_message=content
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
print(f"Result: {part.text}")
print("\n=== CONTROL ROOM COMPLETE ===")
if __name__ == "__main__":
asyncio.run(main())
Temel Kavramlar
BaseAgent: Özel temsilciler için ADK temel öğesi. Alt sınıfını oluşturup_run_async_implişlevini geçersiz kılarak rastgele eş zamansız düzenleme mantığı yazarsınız. Burada A2A çağrısı + sınıflandırıcı + yeniden planlama döngüsü kullanılır.- A2A JSON-RPC çağrısı: Kontrol Odası,
httpxkullanarak planlayıcının A2A sunucusuna standart birmessage/sendisteği gönderir ve nihai raporu çıkarmak için JSON-RPC yanıtını ayrıştırır. _classify_report(): Rapor metninden başarı, yeniden denenebilir hata veya nihai hata durumunu belirleyen basit anahtar kelime tabanlı sınıflandırma. Bu, yeniden planlama döngüsünü tetikler.- Alt aracı çağırma: Kontrol Odası, yeniden planlama yapmak için bir
LlmAgentoluşturur ve altInvocationContextoluşturupreplanner.run_async(child_ctx)çağırarak bunu çalıştırır. Bu sayede, özel düzenleme mantığı içinde LLM aracılarını dinamik olarak başlatabilirsiniz. InMemoryRunner: Aracıyı, bellek içi oturum deposuyla yerel olarak çalıştırır. Üretimde, Vertex AI Agent Engine'e dağıtmak içinadk deploykullanırsınız.
9. Full Stack'i çalıştırma
Şimdi de üç katmanlı sistemi tamamen test edelim: ADK Control Room → A2A → LangGraph Planner → CrewAI Crew.
Daha önce açtığınız ikinci Cloud Shell sekmesini kullanın (veya yeni bir sekme için +'yı tıklayın) ve Control Room'u çalıştırın. Önemli: Her Cloud Shell sekmesinin kendi kabuk oturumu vardır. Proje ve ortam değişkenlerini tekrar ayarlamanız gerekir:
cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py
Düzenleme akışının tamamını görmelisiniz:
- Kontrol Odası, isteği alır ve A2A sunucusuna bir
message/sendJSON-RPC çağrısı gönderir. - A2A sunucusu isteği alır ve LangGraph planlayıcıyı çağırır.
- LangGraph planlayıcı, isteği analiz eder ve CrewAI ekibine devreder.
- CrewAI ekibi, Kaynak Bulma ve Satın Alma aracılarının çalışmasını sağlar.
- Sonuç, Kontrol Odası'na kadar geri gider.
Önemli Kullanıcı Deneyimleri (CUJ'ler)
Aşağıdaki senaryoları denemek için control_room.py içindeki prompt dizesini değiştirmeyi deneyin:
CUJ | İstem | Ne olur? |
1. İdeal Yol (Happy Path) |
| Arama -> bütçe kontrolü -> satın alma siparişi (BAŞARILI). Uçtan uca çalışır. |
2. Yeniden planlama |
| Planlayıcı "Başarısız: Bilinmeyen öğe" yanıtını döndürüyor. Kontrol Odası bunu algılar ve aramayı genişletmek için |
CUJ 2 (Yeniden planlama)'yi test etmek için control_room.py içindeki prompt değerini şu şekilde değiştirin:
prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"
Sabit kodlanmış planlayıcı bu öğeyi tanımaz ve "Başarısız: Bilinmeyen öğe" yanıtını verir. Kontrol Odası, hatayı algılar, dinamik olarak bir LlmAgent yeniden planlayıcı oluşturur ve daha geniş bir hedefle yeniden dener. Planlayıcı yalnızca "Pixel 7"yi tanıdığı için yeniden deneme de başarısız olur ancak yeniden planlama döngüsünün tamamını görürsünüz. Son çıktı FAILED after 2 attempts: ... olacaktır.
10. Temizleme
Google Cloud hesabınızın sürekli olarak ücretlendirilmesini önlemek için bu codelab sırasında oluşturulan kaynakları silebilirsiniz. Oluşturduğunuz dizini kolayca kaldırabilirsiniz:
cd ..
rm -rf scale-agents
11. Tebrikler
Tebrikler! CrewAI, LangGraph, A2A Protokolü ve ADK'yı kullanarak çoklu ajanlı bir düzenleme sistemi oluşturdunuz.
Öğrendikleriniz
- CrewAI'nın
@tooldekoratörünü kullanarak temsilciler için araçları tanımlama - Farklı rollere, araçlara ve hedeflere sahip uzmanlaşmış aracıları nasıl oluşturacağınızı öğrenin.
- Görev bağımlılıkları olan sıralı bir ekipte aracıları nasıl bağlayabilirsiniz?
- LangGraph ile, ekibe görev atayan bir durum makinesi planlayıcı oluşturma
- Planlayıcıyı
AgentCardveAgentExecutorile A2A hizmeti olarak kullanıma sunma - A2A üzerinden yetki veren ve
LlmAgentalt aracısını çağırarak başarısızlık durumunda yeniden planlama yapan özel bir ADKBaseAgentoluşturma - Planlama, yürütme ve düzenlemeyi çerçeveler arasında ayırmanın size neden modülerlik ve esneklik sağladığı.
Daha fazlası
Tam kapsamlı atölye çalışması, bu sistemi aşağıdakilerle genişletir:
- Gerçek zamanlı kontrol paneli: Çoklu aracı ilerlemesini görselleştirmek için SSE akışı
- Identity Shield: Yıkıcı işlemleri istem düzeyinde değil, altyapı düzeyinde engelleyen IAM tabanlı güvenlik
- Vertex AI Agent Engine: ADK aracısını
adk deployile yönetilen bulut altyapısına dağıtın.