Way Back Home - Event-Driven Architecture with Google ADK, A2A, and Kafka

1. The Mission

Hikaye

Keşfedilmemiş bir sektörün sessizliğinde sürükleniyorsunuz. Büyük bir Güneş Darbesi, geminizi bir yarıktan geçirerek sizi evrenin hiçbir yıldız haritasında bulunmayan bir köşesinde mahsur bıraktı.

Günlerce süren zorlu onarım çalışmalarının ardından nihayet ayaklarınızın altında motorların uğultusunu hissediyorsunuz. Roketiniz düzeltildi. Hatta Ana Geminin uzun menzilli bir bağlantısını bile güvence altına almayı başardınız. Kalkış için izniniz var. Eve gitmeye hazırsınız.

Ancak flash sürücüyü kullanmaya hazırlanırken statik gürültüden bir tehlike sinyali duyulur. Sensörleriniz yardım sinyali alır. Beş sivil, X-42 gezegeninin yüzeyinde mahsur kalmıştır. Tek kaçış umutları, yörüngedeki ana gemilerine tehlike sinyali göndermek için senkronize edilmesi gereken 15 antik kapsüle bağlıdır.

Ancak kapsüller, ana navigasyon bilgisayarı hasar görmüş bir uydu istasyonu tarafından kontrol ediliyor. Kapsüller amaçsızca sürükleniyor. Uyduya arka kapı bağlantısı kurmayı başardık ancak yukarı bağlantı, yıldızlararası ciddi parazitlerden etkileniyor ve bu durum, istek-yanıt döngülerinde büyük gecikmelere neden oluyor.

Hedef

İstek/yanıt modeli çok yavaş olduğundan, telemetriyi gürültüden geçirerek aktarmak için sunucu tarafından gönderilen etkinlikler (SSE) ile etkinliğe dayalı bir mimari (EDA) dağıtmamız gerekir.

Misyon

Pod'ları belirli sinyal artırıcı şekillere (daire, yıldız, çizgi) zorlamak için gereken karmaşık vektör matematiğini hesaplayabilen özel bir Agent oluşturmanız gerekir. Bu aracıyı uydunun yeni mimarisine bağlamanız gerekir.

Ne oluşturacaksınız?

Genel Bakış

  • 15 kapsülden oluşan bir filoyu anlık olarak görselleştirmek ve kontrol etmek için React tabanlı bir Heads-Up Display (HUD).
  • Google Agent Development Kit'i (ADK) kullanan bir üretken yapay zeka aracısı. Bu araç, doğal dil komutlarına göre pod'lar için karmaşık geometrik şekilleri hesaplar.
  • Merkezi hub olarak işlev gören, Server-Sent Events (SSE) aracılığıyla ön uçla iletişim kuran Python tabanlı bir Uydu İstasyonu arka ucu.
  • Apache Kafka'yı kullanarak yapay zeka aracısının uydu kontrol sisteminden ayrılmasını sağlayan ve esnek, eşzamansız iletişime olanak tanıyan bir etkinliğe dayalı mimari.

Öğrenecekleriniz

Teknoloji / Kavram

Açıklama

Google ADK (Agent Development Kit)

Bu çerçeveyi, Gemini modelleri tarafından desteklenen özel bir yapay zeka aracısı oluşturmak, test etmek ve yapılandırmak için kullanacaksınız.

Etkinliğe Dayalı Mimari (EDA)

Bileşenlerin etkinlikler aracılığıyla eşzamansız olarak iletişim kurduğu, uygulamanın daha esnek ve ölçeklenebilir olmasını sağlayan ayrılmış bir sistem oluşturmanın ilkelerini öğreneceksiniz.

Apache Kafka

Kafka'yı, farklı mikro hizmetler arasındaki komut ve veri akışını yönetmek için dağıtılmış bir etkinlik akışı platformu olarak ayarlayıp kullanacaksınız.

Sunucu Tarafından Gönderilen Etkinlikler (SSE)

SSE'yi, sunucudan React ön ucuna anlık telemetri verileri göndermek için FastAPI arka ucunda uygulayacak ve kullanıcı arayüzünü sürekli güncel tutacaksınız.

A2A (Agent-to-Agent) Protokolü

Aracınızı A2A sunucusuna nasıl yerleştireceğinizi öğrenecek ve böylece daha büyük bir aracı ekosisteminde standartlaştırılmış iletişim ve birlikte çalışabilirlik sağlayacaksınız.

FastAPI

Bu yüksek performanslı Python web çerçevesini kullanarak temel arka uç hizmeti olan Uydu İstasyonu'nu oluşturacaksınız.

Tepki

Dinamik ve etkileşimli bir kullanıcı arayüzü oluşturmak için SSE akışına abone olan modern bir ön uç uygulamasıyla çalışacaksınız.

Sistem Kontrolü'nde üretken yapay zeka

Büyük bir dil modelinin (LLM) yalnızca sohbet etmenin dışında, belirli ve veriye dayalı görevleri (ör. koordinat oluşturma) gerçekleştirmesi için nasıl istem oluşturulabileceğini öğreneceksiniz.

2. Ortamınızı ayarlama

Cloud Shell'e erişme

👉Google Cloud Console'un üst kısmında Cloud Shell'i etkinleştir'i tıklayın (Cloud Shell bölmesinin üst kısmındaki terminal şeklindeki simge). cloud-shell.png

👉 "Open Editor" (Düzenleyiciyi aç) düğmesini tıklayın (kalemli açık bir klasöre benzer). Bu işlem, pencerede Cloud Shell Kod Düzenleyici'yi açar. Sol tarafta bir dosya gezgini görürsünüz. open-editor.png

👉Bulut IDE'sinde terminali açın.

03-05-new-terminal.png

👉💻 Terminalde, aşağıdaki komutu kullanarak kimliğinizin doğrulandığını ve projenin proje kimliğinize ayarlandığını doğrulayın:

gcloud auth list

Hesabınız (ACTIVE) olarak listelenir.

Ön koşullar

ℹ️ 0. Düzey İsteğe Bağlıdır (Ancak Önerilir)

Bu görevi 0. seviyede tamamlayabilirsiniz ancak önce tamamlamak daha sürükleyici bir deneyim sunar. Böylece ilerledikçe işaretinizin küresel haritada yandığını görebilirsiniz.

Proje ortamını ayarlama

Terminalinize geri dönerek etkin projeyi ayarlayıp gerekli Google Cloud hizmetlerini (Cloud Run, Vertex AI vb.) etkinleştirerek yapılandırmayı tamamlayın.

👉💻 Terminalinizde proje kimliğini ayarlayın:

gcloud config set project $(cat ~/project_id.txt) --quiet

👉💻 Gerekli Hizmetleri Etkinleştirme:

gcloud services enable  compute.googleapis.com \
                        artifactregistry.googleapis.com \
                        run.googleapis.com \
                        cloudbuild.googleapis.com \
                        iam.googleapis.com \
                        aiplatform.googleapis.com \
                        cloudresourcemanager.googleapis.com

Bağımlılıkları yükleme

👉💻 5. seviyeye gidin ve gerekli Python paketlerini yükleyin:

cd $HOME/way-back-home/level_5
uv sync

Temel bağımlılıklar şunlardır:

Paket

Amaç

fastapi

Uydu İstasyonu ve SSE akışı için yüksek performanslı web çerçevesi

uvicorn

FastAPI uygulamasını çalıştırmak için ASGI sunucusu gerekir.

google-adk

Formation Agent'ı oluşturmak için kullanılan Agent Development Kit

a2a-sdk

Standartlaştırılmış iletişim için aracıdan aracıya protokol kitaplığı

aiokafka

Etkinlik döngüsü için eşzamansız Kafka istemcisi

google-genai

Gemini modellerine erişmek için yerel istemci

numpy

Simülasyon için vektör matematiği ve koordinat hesaplamaları

websockets

Gerçek zamanlı çift yönlü iletişim desteği

python-dotenv

Ortam değişkenlerini ve yapılandırma sırlarını yönetir.

sse-starlette

Sunucu tarafından gönderilen etkinliklerin (SSE) verimli şekilde işlenmesi

requests

Harici API çağrıları için basit HTTP kitaplığı

Kurulumu Doğrulama

Koda geçmeden önce tüm sistemlerin sorunsuz çalıştığından emin olalım. Google Cloud projenizi, API'lerinizi ve Python bağımlılıklarınızı denetlemek için doğrulama komut dosyasını çalıştırın.

👉💻 Doğrulama komut dosyasını çalıştırın:

source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh

👀 Bir dizi yeşil onay işareti (✅) görmelisiniz.

  • Kırmızı çarpılar (❌) görürseniz çıktıda önerilen düzeltme komutlarını uygulayın (ör. gcloud services enable ... veya pip install ...).
  • Not: .env için sarı uyarı şu an kabul edilebilir. Bu dosyayı sonraki adımda oluşturacağız.
🚀 Verifying Mission Charlie (Level 5) Infrastructure...

✅ Google Cloud Project: xxxxxx
✅ Cloud APIs: Active
✅ Python Environment: Ready

🎉 SYSTEMS ONLINE. READY FOR MISSION.

3. LLM ile Kapsül Konumlarını Biçimlendirme

Kurtarma operasyonumuzun "beynini" oluşturmamız gerekiyor. Bu, Google ADK (Agent Development Kit) kullanılarak oluşturulmuş bir temsilci olacaktır. Tek amacı, özel bir geometrik navigasyon aracı olarak işlev görmektir. Standart LLM'ler sohbet etmeyi severken uzayın derinliklerinde diyalog değil, veri gerekir. Bu temsilciyi "Star" gibi bir komut alıp 15 kapsülümüz için ham JSON koordinatları döndürecek şekilde programlayacağız.

Temsilci

Aracıyı iskeleleme

👉💻 Aracı dizininize gitmek ve ADK oluşturma sihirbazını başlatmak için aşağıdaki komutları çalıştırın:

cd $HOME/way-back-home/level_5/agent
uv run adk create formation

CLI, etkileşimli bir kurulum sihirbazı başlatır. Aracınızı yapılandırmak için aşağıdaki yanıtları kullanın:

  1. Model seçin: 1. Seçenek'i (Gemini Flash) belirleyin.
    • Not: Belirli sürüm değişiklik gösterebilir. Hız için her zaman "Flash" varyantını seçin.
  2. Arka uç seçin: 2. Seçenek'i (Vertex AI) belirleyin.
  3. Google Cloud proje kimliğini girin: Varsayılanı (ortamınızdan algılanan) kabul etmek için Enter tuşuna basın.
  4. Google Cloud Bölgesi'ni girin: Varsayılanı (us-central1) kabul etmek için Enter tuşuna basın.

👀 Terminal etkileşiminiz aşağıdaki gibi görünmelidir:

(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation

Choose a model for the root agent:
1. gemini-2.5-flash
2. Other models (fill later)
Choose model (1, 2): 1

1. Google AI
2. Vertex AI
Choose a backend (1, 2): 2

You need an existing Google Cloud account and project...
Enter Google Cloud project ID [your-project-id]: <PRESS ENTER>
Enter Google Cloud region [us-central1]: <PRESS ENTER>

Agent created in /home/user/way-back-home/level_5/agent/formation:
- .env
- __init__.py
- agent.py

Agent created başarı mesajını görmeniz gerekir. Bu işlem, şimdi değiştireceğimiz iskelet kodu oluşturur.

👉✏️ Düzenleyicinizde yeni oluşturulan $HOME/way-back-home/level_5/agent/formation/agent.py dosyasına gidin ve dosyayı açın. Dosyanın tüm içeriğini aşağıdaki kodla değiştirin. Bu, aracının adını günceller ve katı operasyonel parametrelerini sağlar.

import os
from google.adk.agents import Agent

root_agent = Agent(
    name="formation_agent",
    model="gemini-2.5-flash",
    instruction="""
    You are the **Formation Controller AI**.
    Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.

    ### FIELD SPECIFICATIONS
    - **Canvas Size**: 800px (width) x 600px (height).
    - **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
    - **Center Point**: x=400, y=300 (Use this as the origin for shapes).
    - **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.

    ### FORMATION RULES
    When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
    1.  **CIRCLE**: Evenly spaced around a center point (R=200).
    2.  **STAR**: 5 points or a star-like distribution.
    3.  **X**: A large X crossing the screen.
    4.  **LINE**: A horizontal line across the middle.
    5.  **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
    6.  **RANDOM**: Scatter randomly within safe bounds.
    7.  **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.

    ### OUTPUT FORMAT
    You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
    Refuse to answer non-formation questions.

    **JSON Structure**:
    ```json
    [
        {"x": 400, "y": 300},
        {"x": 420, "y": 300},
        ... (15 total items)
    ]
    ```
    """
)
  • Geometrik Hassasiyet: Sistem isteminde "Tuval Boyutu" ve "Güvenli Kenar Boşlukları" tanımlanarak aracının, kapsülleri ekranın dışına veya kullanıcı arayüzü öğelerinin altına yerleştirmemesi sağlanır.
  • JSON Zorunluluğu: LLM'ye"Biçimlendirme dışı soruları yanıtlamayı reddet " ve "Ön açıklama yok" talimatını vererek, aşağı akış kodumuzun (uydu) yanıtı ayrıştırmaya çalışırken kilitlenmemesini sağlıyoruz.
  • Ayrılmış Mantık: Bu aracı henüz Kafka hakkında bilgi sahibi değil. Yalnızca matematik yapmayı bilir. Bir sonraki adımda bu "Beyin"i bir Kafka sunucusuna yerleştireceğiz.

Temsilciyi Yerel Olarak Test Etme

Temsilciyi Kafka "sinir sistemine" bağlamadan önce doğru şekilde çalıştığından emin olmamız gerekir. Geçerli JSON koordinatları oluşturduğunu doğrulamak için doğrudan terminalde temsilcinizle etkileşimde bulunabilirsiniz.

👉💻 Aracınızla sohbet oturumu başlatmak için adk run komutunu kullanın.

cd $HOME/way-back-home/level_5/agent
uv run adk run formation
  1. Giriş: Circle yazıp Enter tuşuna basın.
    • Başarı ölçütleri: Ham bir JSON listesi görmelisiniz (ör. [{"x": 400, "y": 200}, ...]). JSON'dan önce "Koordinatlar:" gibi bir Markdown metni olmadığından emin olun.
  2. Giriş: Line yazıp Enter tuşuna basın.
    • Başarı ölçütleri: Koordinatların yatay bir çizgi oluşturduğunu doğrulayın (y değerleri benzer olmalıdır).

Aracı çıktısının temiz JSON olduğunu onayladıktan sonra Kafka sunucusuna sarmaya hazırsınız.

👉💻 Çıkmak için Ctrl+C tuşuna basın.

4. Formation Agent için A2A sunucusu oluşturma

A2A'yı (Temsilciden Temsilciye) anlama

A2A (Agent-to-Agent) protokolü, yapay zeka aracıları arasında sorunsuz birlikte çalışmayı sağlamak için tasarlanmış açık bir standarttır. Bu çerçeve, temsilcilerin basit metin alışverişinin ötesine geçmesini sağlar. Temsilciler, görevleri devredebilir, karmaşık işlemleri koordine edebilir ve dağıtılmış bir ekosistemde ortak hedeflere ulaşmak için uyumlu bir birim olarak çalışabilir.

A2A

A2A aktarımlarını anlama: HTTP, gRPC ve Kafka

A2A protokolü, istemcilerin ve aracıların iletişim kurması için iki farklı yol sunar. Bu yolların her biri farklı mimari ihtiyaçlara hizmet eder. HTTP (JSON-RPC), tüm web ortamlarında evrensel olarak çalışan varsayılan ve her yerde bulunan standarttır. gRPC, verimli ve kesin olarak türü belirlenmiş iletişim için Protocol Buffers'ı kullanan yüksek performanslı seçeneğimizdir. Laboratuvarda Kafka aktarımı da sağlıyorum. Bu, sistemlerin ayrıştırılmasının öncelikli olduğu sağlam ve etkinlik odaklı mimariler için tasarlanmış özel bir uygulamadır.

Taşıma

Bu aktarımlar, verilerin akışını oldukça farklı şekilde yönetir. HTTP modelinde istemci bir JSON isteği gönderir ve bağlantıyı açık tutarak aracının görevini tamamlamasını ve sonucu tek seferde döndürmesini bekler. gRPC, ikili veriler ve HTTP/2 kullanarak bunu optimize eder. Böylece hem basit istek-yanıt döngülerine hem de aracının güncellemeleri (ör. "düşünce" veya "yapay içerik oluşturuldu") anında gönderdiği gerçek zamanlı akışa olanak tanır. Kafka uygulaması eşzamanlı olarak çalışır: İstemci, son derece dayanıklı bir "istek konusu"na istek yayınlar ve ayrı bir "yanıt konusu"nu dinler. Sunucu, iletiyi uygun olduğunda alır, işler ve sonucu geri gönderir. Bu nedenle, ikisi hiçbir zaman doğrudan iletişim kurmaz.

Seçim, hız, karmaşıklık ve kalıcılıkla ilgili özel gereksinimlerinize bağlıdır. HTTP'yi kullanmaya başlamak ve hatalarını ayıklamak en kolay yöntemdir. Bu nedenle, basit entegrasyonlar için mükemmeldir. gRPC ise düşük gecikme süresinin ve akış görev güncellemelerinin kritik olduğu, hizmetten hizmete dahili iletişim için üstün bir seçimdir. Ancak Kafka, istekleri diskte bir kuyrukta depoladığından dayanıklı bir seçenek olarak öne çıkıyor. Bu sayede, aracı sunucusu kilitlense veya yeniden başlatılsa bile görevleriniz devam ediyor. Bu da HTTP veya gRPC'nin sunamayacağı bir dayanıklılık ve ayrıştırma düzeyi sağlıyor.

Özel aktarım katmanı: Kafka

Kafka, operasyonun beynini (Formation Agent) fiziksel kontrollerden (Uydu İstasyonu) ayıran asenkron omurga görevi görür. Sistem, aracı karmaşık vektörleri hesaplarken senkron bağlantı için beklemek zorunda kalmak yerine aracı, sonuçlarını bir Kafka konusuna etkinlik olarak yayınlar. Bu, kalıcı bir arabellek görevi görerek uydunun talimatları kendi hızında kullanmasına olanak tanır ve önemli ağ gecikmesi veya geçici sistem kilitlenmesi olsa bile oluşum verilerinin asla kaybolmamasını sağlar.

Kafka'yı kullanarak yavaş ve doğrusal bir süreci, talimatların ve telemetrinin bağımsız olarak aktığı, görev HUD'sunu yoğun yapay zeka işleme sırasında bile duyarlı tutan esnek bir akış hattına dönüştürürsünüz.

Kafka

Kafka nedir?

Kafka, dağıtılmış bir etkinlik akışı platformudur. Etkinliğe Dayalı Mimaride (EDA):

  1. Üreticiler, "Konular"a mesaj yayınlar.
  2. Tüketiciler bu konulara abone olur ve mesaj geldiğinde tepki verir.

Kafka'yı neden kullanmalısınız?

Sistemlerinizin bağlantısını keser. Formation Agent, gönderenin kimliğini veya durumunu bilmesine gerek kalmadan gelen istekleri bekleyerek bağımsız bir şekilde çalışır. Bu, sorumluluğu ayırarak uydu çevrimdışı olsa bile iş akışının bozulmamasını sağlar. Kafka, uydu yeniden bağlanana kadar mesajları saklar.

Google Cloud Pub/Sub hakkında ne söylenebilir?

Bu işlem için Google Cloud Pub/Sub'ı kesinlikle kullanabilirsiniz. Pub/Sub, Google'ın sunucusuz mesajlaşma hizmetidir. Kafka, yüksek işleme hızı ve "yeniden oynatılabilir" akışlar için harika olsa da Pub/Sub genellikle kullanım kolaylığı nedeniyle tercih edilir. Bu laboratuvarda, sağlam ve kalıcı bir mesaj yolu simüle etmek için Kafka'yı kullanıyoruz.

Yerel Kafka Kümesini Başlatma

Aşağıdaki komut bloğunun tamamını kopyalayıp terminalinize yapıştırın. Bu komut, resmi Kafka görüntüsünü indirir ve arka planda başlatır.

👉💻 Terminalinizde şu komutları çalıştırın:

# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5

# Run the Kafka container in detached mode
docker run -d \
  --name mission-kafka \
  -p 9092:9092 \
  -e KAFKA_PROCESS_ROLES='broker,controller' \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  apache/kafka:4.2.0-rc1

👉💻 docker ps komutuyla kapsayıcının çalıştığını kontrol edin.

docker ps

👀 mission-kafka kapsayıcısının çalıştığını ve 9092 bağlantı noktasının kullanıma sunulduğunu onaylayan bir çıkış görmeniz gerekir.

CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS          PORTS                               NAMES
c1a2b3c4d5e6   apache/kafka:4.2.0-rc1    "/opt/kafka/bin/kafka..."   15 seconds ago   Up 14 seconds   0.0.0.0:9092->9092/tcp, 9093/tcp   mission-kafka

Kafka konusu nedir?

Kafka konusunu, iletiler için özel bir kanal veya kategori olarak düşünebilirsiniz. Etkinlik kayıtlarının oluşturulma sırasına göre saklandığı bir günlük defteri gibidir. Üreticiler belirli konulara mesaj yazar, tüketiciler ise bu konulardan okur. Bu, göndereni alıcıdan ayırır. Üreticinin, verileri hangi tüketicinin okuyacağını bilmesi gerekmez. Yalnızca doğru "kanala" göndermesi yeterlidir. Görevimizde iki konu oluşturacağız: biri aracıya formasyon istekleri göndermek, diğeri ise aracının yanıtlarını uydu tarafından okunmak üzere yayınlamak için.

Kafka

👉💻 Çalışan Docker container'da gerekli konuları oluşturmak için aşağıdaki komutları çalıştırın.

# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-formation-request \
  --bootstrap-server 127.0.0.1:9092

# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-reply-satellite-dashboard \
  --bootstrap-server 127.0.0.1:9092

👉💻 Kanallarınızın açık olduğunu onaylamak için liste komutunu çalıştırın:

docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --list \
  --bootstrap-server 127.0.0.1:9092

👀 Oluşturduğunuz konuların adlarını görürsünüz.

a2a-formation-request
a2a-reply-satellite-dashboard

Kafka örneğiniz artık tamamen yapılandırılmış durumda ve görev açısından kritik verileri yönlendirmeye hazır.

Kafka A2A sunucusunu uygulama

Ajanlar Arası (A2A) Protokolü, bağımsız ajan sistemleri arasında birlikte çalışabilirlik için standartlaştırılmış bir çerçeve oluşturur. Farklı ekipler tarafından geliştirilen veya farklı altyapılarda çalışan aracıların, her bağlantı için özel entegrasyon mantığına gerek kalmadan birbirini keşfetmesine ve etkili bir şekilde birlikte çalışmasına olanak tanır.

Referans uygulama olan a2a-python, bu tür aracı uygulamalarını çalıştırmak için temel bir kitaplıktır. Tasarımının temel özelliklerinden biri genişletilebilirliktir. İletişim katmanını soyutlayarak geliştiricilerin HTTP gibi protokolleri başkalarıyla değiştirmesine olanak tanır.

A2A Flow

Bu projede, a2a-python-kafka adlı özel bir Kafka uygulaması kullanarak bu genişletilebilirlikten yararlanıyoruz. Bu uygulamayı, A2A standardının, aracı iletişimini farklı mimari ihtiyaçlara uyarlamanıza nasıl olanak tanıdığını göstermek için kullanacağız. Bu örnekte, eşzamanlı HTTP'nin yerini eşzamansız bir etkinlik veri yolu alıyor.

Formation Agent için A2A'yı etkinleştirme

Artık aracımızı bir A2A sunucusuna yerleştirerek şunları yapabilen birlikte çalışabilir bir hizmete dönüştüreceğiz:

  • Kafka konusundaki görevleri dinleyin.
  • Alınan görevleri işlenmek üzere temel ADK aracısına devreder.
  • Sonucu bir yanıt konusunda yayınlayın.

👉✏️ $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py bölümünde #REPLACE-CREATE-KAFKA-A2A-SERVER yerine aşağıdaki kodu girin:

async def create_kafka_server(
    agent: BaseAgent,
    *,
    bootstrap_servers: str | List[str] = "localhost:9092",
    request_topic: str = "a2a-formation-request",
    consumer_group_id: str = "a2a-agent-group",
    agent_card: Optional[Union[AgentCard, str]] = None,
    runner: Optional[Runner] = None,
    **kafka_config: Any,
) -> KafkaServerApp:
  """Convert an ADK agent to a A2A Kafka Server application.
  Args:
      agent: The ADK agent to convert
      bootstrap_servers: Kafka bootstrap servers.
      request_topic: Topic to consume requests from.
      consumer_group_id: Consumer group ID for the server.
      agent_card: Optional pre-built AgentCard object or path to agent card
                  JSON. If not provided, will be built automatically from the
                  agent.
      runner: Optional pre-built Runner object. If not provided, a default
              runner will be created using in-memory services.
      **kafka_config: Additional Kafka configuration.

  Returns:
      A KafkaServerApp that can be run with .run() or .start()
  """
  # Set up ADK logging
  adk_logger = logging.getLogger("google_adk")
  adk_logger.setLevel(logging.INFO)

  async def create_runner() -> Runner:
    """Create a runner for the agent."""
    return Runner(
        app_name=agent.name or "adk_agent",
        agent=agent,
        # Use minimal services - in a real implementation these could be configured
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
        credential_service=InMemoryCredentialService(),
    )

  # Create A2A components
  task_store = InMemoryTaskStore()

  agent_executor = A2aAgentExecutor(
      runner=runner or create_runner,
  )
  
  # Initialize logic handler
  from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
  
  logic_handler = DefaultRequestHandler(
      agent_executor=agent_executor, task_store=task_store
  )

  # Prepare Agent Card
  rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
      
  # Create Kafka Server App
  server_app = KafkaServerApp(
      request_handler=logic_handler,
      bootstrap_servers=bootstrap_servers,
      request_topic=request_topic,
      consumer_group_id=consumer_group_id,
      **kafka_config
  )
  
  return server_app

Bu kod, temel bileşenleri ayarlar:

  1. The Runner: Aracının çalışma süresini sağlar (bellek, kimlik bilgileri vb. işleme).
  2. Görev deposu: İsteklerin durumunu "Beklemede"den "Tamamlandı"ya geçerken izler.
  3. Aracı Yürütücü: Kafka'dan bir görev alır ve koordinatları hesaplaması için aracıya iletir.
  4. KafkaServerApp: Kafka aracısına fiziksel bağlantıyı yönetir.

A2A Kafka

Ortam değişkenlerini yapılandırma

ADK kurulumu, aracının klasöründe Google Vertex AI ayarlarınızın bulunduğu bir .env dosyası oluşturdu. Bunu proje köküne taşımamız ve Kafka kümemizin koordinatlarını eklememiz gerekiyor.

Dosyayı kopyalamak ve Kafka sunucusu adresini eklemek için aşağıdaki komutları çalıştırın:

cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env

# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env

# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env

A2A Interstellar Loop'u doğrulama

Şimdi, Kafka kümesi üzerinden manuel bir sinyal gönderip aracının yanıtını izleyerek eşzamansız etkinlik döngüsünün doğru şekilde çalıştığından emin olacağız.

A2A Interstellar Loop&#39;u doğrulama

Bir etkinliğin tüm yaşam döngüsünü görmek için üç ayrı terminal kullanacağız.

Terminal A: Oluşum Aracısı (A2A Kafka Sunucusu)

👉💻 Bu terminal, Kafka'yı dinleyen ve geometrik matematik işlemleri için Gemini'ı kullanan Python sürecini çalıştırır.

cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh 

# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git

# Start the Agent Server
uv run agent/server.py

Şunu görene kadar bekleyin:

[INFO] Kafka Server App Started. Starting to consume requests...

Terminal B: Uydu Dinleyici (Tüketici)

👉💻 Bu terminalde yanıt konusunu dinleyeceğiz. Bu, uydunun talimatları beklemesini simüle eder.

# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-reply-satellite-dashboard \
  --from-beginning \
  --property "print.headers=true"

Bu terminal boşta görünür. Temsilcinin mesaj yayınlaması bekleniyor.

Terminal C: Komutanın Sinyali (Üretici)

👉💻 Şimdi, a2a-formation-request konusuna A2A biçimli işlenmemiş bir istek göndereceğiz. Temsilcinin yanıtı nereye göndereceğini bilmesi için belirli Kafka üstbilgilerini eklememiz gerekir.

echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-formation-request \
  --property "parse.headers=true" \
  --property "headers.key.separator==" \
  --property "headers.delimiter=|"

Sonucu Analiz Etme

👀 Döngü başarılı olursa B Terminali'ne geçin. Büyük bir JSON bloğu anında görünmelidir. E-posta, gönderdiğimiz başlıkla correlation_id:ping-manual-01 başlar. Ardından task nesnesi gelir. Bu JSON'daki parts bölümüne yakından bakarsanız Gemini'ın 15 pod'unuz için hesapladığı ham X ve Y koordinatlarını görürsünüz:

{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n  {\"x\": 400, \"y\": 150},\n  {\"x\": 257, \"y\": 254},\n  {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}

Temsilciyi alıcıdan başarıyla ayırdınız. Sistemimiz artık tamamen olaya dayalı olduğundan istek-yanıt gecikmesinin "yıldızlararası gürültüsü" artık önemli değil.

Devam etmeden önce, ağ bağlantı noktalarını boşaltmak için arka plan işlemlerini durdurun.

👉💻 Her terminalde (A, B ve C):

  • Çalışan işlemi sonlandırmak için Ctrl + C tuşuna basın.

5. The Satellite Station (A2A Kafka Client and SSE)

Bu adımda Uydu İstasyonu'nu oluşturuyoruz. Bu, Kafka kümesi ile pilotun görsel ekranı (React Frontend) arasındaki köprüdür. Bu sunucu hem Kafka istemcisi (aracıyla iletişim kurmak için) hem de SSE yayıncısı (tarayıcıyla iletişim kurmak için) olarak işlev görür.

Kafka istemcisi nedir?

Kafka kümesini bir radyo istasyonu olarak düşünün. Kafka istemcisi, radyo alıcısıdır. KafkaClientTransport, uygulamamızın şunları yapmasına olanak tanır:

  1. Mesaj üretme: "Görev" (ör. "Yıldız oluşumu") istemini istemciye gönderir.
  2. Yanıtı kullanma: Temsilciden koordinatları geri almak için belirli bir "Yanıt Konusu"nu dinleyin.

1. Bağlantıyı Başlatma

Sunucu başlatıldığında Kafka bağlantısının başlatılmasını ve kapatıldığında düzgün bir şekilde kapatılmasını sağlamak için FastAPI'nin lifespan etkinlik işleyicisini kullanırız.

👉✏️ $HOME/way-back-home/level_5/satellite/main.py bölümünde #REPLACE-CONNECT-TO-KAFKA-CLUSTER yerine aşağıdaki kodu girin:

@asynccontextmanager
async def lifespan(app: FastAPI):
    global kafka_transport
    logger.info("Initializing Kafka Client Transport...")
    
    bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
    request_topic = "a2a-formation-request"
    reply_topic = "a2a-reply-satellite-dashboard"
    
    # Create AgentCard for the Client
    client_card = AgentCard(
        name="SatelliteDashboard",
        description="Satellite Dashboard Client",
        version="1.0.0",
        url="https://example.com/satellite-dashboard",
        capabilities=AgentCapabilities(),
        default_input_modes=["text/plain"],
        default_output_modes=["text/plain"],
        skills=[]
    )
    
    kafka_transport = KafkaClientTransport(
            agent_card=client_card,
            bootstrap_servers=bootstrap_server,
            request_topic=request_topic,
            reply_topic=reply_topic,
    )
    
    try:
        await kafka_transport.start()
        logger.info("Kafka Client Transport Started Successfully.")
    except Exception as e:
        logger.error(f"Failed to start Kafka Client: {e}")
        
    yield
    
    if kafka_transport:
        logger.info("Stopping Kafka Client Transport...")
        await kafka_transport.stop()
        logger.info("Kafka Client Transport Stopped.")

2. Komut gönderme

Kontrol panelinde bir düğmeyi tıkladığınızda /formation bitiş noktası tetiklenir. Üretici olarak işlev görür, isteğinizi resmi bir A2A Message içine sarar ve temsilciye gönderir.

Formation

Key Logic:

  • Eşzamansız İletişim: kafka_transport.send_message isteği gönderir ve yeni koordinatların reply_topic'ye ulaşmasını bekler.
  • Yanıt ayrıştırma: Gemini, markdown bloklarında koordinatlar döndürebilir (ör. json ... ). Aşağıdaki kod, bunları temizler ve dizeyi bir Python nokta listesine dönüştürür.

👉✏️ $HOME/way-back-home/level_5/satellite/main.py bölümünde #REPLACE-FORMATION-REQUEST yerine aşağıdaki kodu girin:

@app.post("/formation")
async def set_formation(req: FormationRequest):
    global FORMATION, PODS
    FORMATION = req.formation
    logger.info(f"Received formation request: {FORMATION}")
    
    if not kafka_transport:
        logger.error("Kafka Transport is not initialized!")
        return {"status": "error", "message": "Backend Not Connected"}
    
    try:
        # Construct A2A Message
        prompt = f"Create a {FORMATION} formation"
        logger.info(f"Sending A2A Message: '{prompt}'")
        
        from a2a.types import TextPart, Part, Role
        import uuid
        
        msg_id = str(uuid.uuid4())
        message_parts = [Part(TextPart(text=prompt))]
        
        msg_obj = Message(
            message_id=msg_id,
            role=Role.user,
            parts=message_parts
        )
        
        message_params = MessageSendParams(
            message=msg_obj
        )
        
        # Send and Wait for Response
        ctx = ClientCallContext()
        ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
        response = await kafka_transport.send_message(message_params, context=ctx)
        
        logger.info("Received A2A Response.")
        
        content = None
        if isinstance(response, Message):
            content = response.parts[0].root.text if response.parts else None
        elif isinstance(response, Task):
            if response.artifacts and response.artifacts[0].parts:
                content = response.artifacts[0].parts[0].root.text

        if content:
            logger.info(f"Response Content: {content[:100]}...")
            try:
                clean_content = content.replace("```json", "").replace("```", "").strip()
                coords = json.loads(clean_content)
                
                if isinstance(coords, list):
                    logger.info(f"Parsed {len(coords)} coordinates.")
                    for i, pod_target in enumerate(coords):
                        if i < len(PODS):
                            PODS[i]["x"] = pod_target["x"]
                            PODS[i]["y"] = pod_target["y"]
                    return {"status": "success", "formation": FORMATION}
                else:
                    logger.error("Response JSON is not a list.")
            except json.JSONDecodeError as e:
                logger.error(f"Failed to parse Agent JSON response: {e}")
        else:
            logger.error(f"Could not extract content from response type {type(response)}")

    except Exception as e:
        logger.error(f"Error calling agent via Kafka: {e}")
        return {"status": "error", "message": str(e)}

Sunucu Tarafından Gönderilen Etkinlikler (SSE)

Standart API'ler "İstek-Yanıt" modelini kullanır. HUD'mız için pod konumlarının "Canlı Yayın"ı gerekir.

Neden SSE? WebSockets'in (çift yönlü ve daha karmaşık) aksine SSE, sunucudan tarayıcıya basit ve tek yönlü bir veri akışı sağlar. Kontrol panelleri, borsa ticker'ları veya yıldızlararası telemetri için mükemmeldir.

SSE

Kodumuzda nasıl çalışır? Her yarım saniyede 15 kapsülün mevcut konumunu alan ve bunları güncelleme olarak tarayıcıya "iten" sonsuz bir döngü olan event_generator oluştururuz.

👉✏️ $HOME/way-back-home/level_5/satellite/main.py bölümünde #REPLACE-SSE-STREAM yerine aşağıdaki kodu girin:

@app.get("/stream")
async def message_stream(request: Request):
    async def event_generator():
        logger.info("New SSE stream connected")
        try:
            while True:
                current_pods = list(PODS) 
                
                # Send updates one by one to simulate low-bandwidth scanning
                for pod in current_pods:
                     payload = {"pod": pod}
                     yield {
                         "event": "pod_update",
                         "data": json.dumps(payload)
                     }
                     await asyncio.sleep(0.02)
                
                # Send formation info occasionally
                yield {
                    "event": "formation_update",
                    "data": json.dumps({"formation": FORMATION})
                }
                
                # Main loop delay
                await asyncio.sleep(0.5)
                
        except asyncio.CancelledError:
             logger.info("SSE stream disconnected (cancelled)")
        except Exception as e:
             logger.error(f"SSE stream error: {e}")
             
    return EventSourceResponse(event_generator())

Tam Görev Döngüsünü Uygulama

Son kullanıcı arayüzünü kullanıma sunmadan önce sistemin uçtan uca çalıştığını doğrulayalım. Temsilciyi manuel olarak tetikleyip kablodaki ham veri yükünü göreceğiz.

Doğrula

Üç ayrı terminal sekmesi açın.

Terminal A: The Formation Agent (A2A Server)

👉💻 Bu, görevleri dinleyen ve geometrik hesaplamaları yapan ADK aracısıdır.

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Agent Server
uv run agent/server.py

Terminal B: Uydu İstasyonu (Kafka İstemcisi)

👉💻 Bu FastAPI sunucusu,"Alıcı" olarak işlev görür. Kafka yanıtlarını dinler ve bunları canlı bir SSE akışına dönüştürür.

cd $HOME/way-back-home/level_5

# Start the Satellite Station
uv run satellite/main.py

Terminal C: Manuel HUD

Send Formation Command (Trigger): 👉💻 Aynı C terminalinde, biçimlendirme sürecini tetikleyin:

# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
     -H "Content-Type: application/json" \
     -d '{"formation": "STAR"}'

👀 Yeni koordinatları görmeniz gerekir.

INFO:satellite.main:Received formation request: STAR
INFO:satellite.main:Sending A2A Message: 'Create a STAR formation'
INFO:satellite.main:Received A2A Response.
INFO:satellite.main:Response Content: ```json ...
INFO:satellite.main:Parsed 15 coordinates.

Bu, uydunun dahili pod koordinatlarını güncellediğini onaylar.

👉💻 Önce canlı telemetri akışını dinlemek, ardından da oluşum değişikliğini tetiklemek için curl kullanacağız.

# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream

👀 curl -N komutunuzun çıkışını izleyin. pod_update etkinliklerindeki x ve y koordinatları, Yıldız formasyonunun yeni konumlarını yansıtmaya başlar.

Devam etmeden önce, iletişim bağlantı noktalarını boşaltmak için çalışan tüm işlemleri durdurun.

Her terminalde (A, B, C ve tetikleyici terminal): Ctrl + C tuşuna basın.

6. Go Rescue!

Sistemi başarıyla oluşturdunuz. Şimdi görevi hayata geçirme zamanı. Artık React tabanlı baş üstü ekranını (HUD) kullanıma sunacağız. Bu kontrol paneli, SSE aracılığıyla uydu istasyonuna bağlanır ve 15 kapsülü gerçek zamanlı olarak görselleştirmenize olanak tanır.

Genel Bakış

Komut verdiğinizde yalnızca bir işlevi çağırmazsınız. Kafka üzerinden geçen, bir yapay zeka aracısı tarafından işlenen ve canlı telemetri olarak ekranınıza geri aktarılan bir etkinliği tetiklersiniz.

Doğrula

İki ayrı terminal sekmesi açın.

Terminal A: The Formation Agent (A2A Server)

👉💻 Bu, görevleri dinleyen ve Gemini'ı kullanarak geometrik matematik işlemleri yapan ADK Aracısıdır. Terminalde şunu çalıştırın:

cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py

Terminal B: Uydu İstasyonu ve Görsel Kontrol Paneli

👉💻 Öncelikle ön uç uygulamasını oluşturun.

cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build

👉💻 Şimdi hem arka uç mantığına hem de ön uç kullanıcı arayüzüne hizmet edecek FastAPI sunucusunu başlatın.

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Satellite Station
uv run satellite/main.py

Yayınlama ve Doğrulama

  1. 👉 Önizlemeyi açma: Cloud Shell araç çubuğunda Web önizlemesi simgesini tıklayın. Bağlantı noktasını değiştir'i seçin, 8000 olarak ayarlayın ve Değiştir ve Önizle'yi tıklayın. Starfield HUD'nızı gösteren yeni bir tarayıcı sekmesi açılır. *Web-Preview
  2. 👉 Telemetri Akışını Doğrulama:
    • Kullanıcı arayüzü yüklendikten sonra rastgele dağıtılmış 15 pod görmeniz gerekir.
    • Pod'lar hafifçe titreşiyorsa veya "sekiyorsa" SSE akışınız etkindir ve uydu istasyonu konumlarını başarıyla yayınlıyordur. Başlatma
  3. 👉 Oluşum başlatma: Kontrol panelinde "BAŞLAT" düğmesini tıklayın. Yıldız
  4. 👀 Etkinlik Döngüsünü İzleme: Mimarinin nasıl çalıştığını görmek için terminallerinizi izleyin:
    • Terminal B (Uydu İstasyonu) şunları kaydeder: Sending A2A Message: 'Create a STAR formation'.
    • Terminal A (Oluşturma Aracısı), Gemini'a danışırken etkinliği gösterir.
    • Terminal B (Uydu İstasyonu), Received A2A Response koordinatlarını kaydedip ayrıştırır.
  5. 👀 Görsel Onay: Kontrol panelinizdeki 15 kapsülün rastgele konumlardan 5 köşeli yıldız şekline sorunsuz bir şekilde kaymasını izleyin.
  6. 👉 Deneme:
    • 3 farklı diziliş için "X" veya "LINE"'ı deneyin. X
    • Özel Amaç: "Kalp" veya "Üçgen" gibi benzersiz bir şey yazmak için manuel girişi kullanın. Daire
    • Üretken yapay zeka kullandığınız için aracı, açıklayabileceğiniz tüm geometrik şekillerin matematiksel işlemlerini hesaplamaya çalışır.

3 desen oluşturduktan sonra bağlantıyı başarıyla yeniden kurdunuz. BİTTİ

GÖREV TAMAMLANDI!

Veriler kesintisiz bir şekilde gürültüden geçerken akış dengelenir. Komutunuzla birlikte 15 antik kapsül, yıldızlar arasında senkronize bir dansa başlar.

Bitiş

Üç zorlu kalibrasyon aşamasında telemetrinin yerine oturduğunu izlediniz. Her hizalama ile sinyal güçlendi ve sonunda yıldızlararası paraziti bir umut ışığı gibi deldi.

Etkinliğe Dayalı Temsilci'yi ustaca uygulamanız sayesinde beş kurtulan, X-42'nin yüzeyinden helikopterle alınarak kurtarma gemisine güvenli bir şekilde ulaştırıldı. Sizin sayenizde beş hayat kurtuldu.

0. Seviye'ye katıldıysanız Way Back Home görevindeki ilerleme durumunuzu kontrol etmeyi unutmayın. Yıldızlara dönüş yolculuğunuz devam ediyor.BİTTİ