1. مقدمه
آیا تا به حال برای مدیریت تمام هزینه های شخصی خود ناامید و تنبل بوده اید؟ من هم همینطور! بنابراین چرا در این نرمافزار، ما یک دستیار مدیریت هزینه شخصی میسازیم که توسط Gemini 2.5 پشتیبانی میشود تا همه کارها را برای ما انجام دهد! از مدیریت رسیدهای آپلود شده تا تجزیه و تحلیل اینکه آیا قبلاً برای خرید قهوه زیاد هزینه کرده اید یا خیر!
این دستیار از طریق مرورگر وب در قالب یک رابط وب چت قابل دسترسی خواهد بود، که در آن میتوانید با آن ارتباط برقرار کنید، برخی از تصاویر رسید را آپلود کنید و از دستیار بخواهید آنها را ذخیره کند، یا شاید بخواهید برای دریافت فایل و تحلیل هزینهها، برخی رسیدها را جستجو کنید. و همه اینها بر اساس چارچوب Google Agent Development Kit ساخته شده است
خود برنامه به 2 سرویس تقسیم می شود: frontend و backend. به شما امکان می دهد یک نمونه اولیه سریع بسازید و احساس آن را امتحان کنید و همچنین درک کنید که قرارداد API چگونه به نظر می رسد تا هر دوی آنها را یکپارچه کند.
از طریق کد لبه، شما یک رویکرد گام به گام را به شرح زیر به کار خواهید گرفت:
- پروژه Google Cloud خود را آماده کنید و تمام API مورد نیاز را روی آن فعال کنید
- سطل را در Google Cloud Storage و پایگاه داده در Firestore راه اندازی کنید
- ایجاد نمایه سازی Firestore
- فضای کاری را برای محیط کدنویسی خود تنظیم کنید
- ساختار کد منبع عامل ADK، ابزارها، اعلان و غیره
- آزمایش عامل با استفاده از رابط کاربری توسعه وب محلی ADK
- ساخت سرویس جلویی - رابط چت با استفاده از کتابخانه Gradio ، برای ارسال برخی درخواست ها و آپلود تصاویر رسید
- ساخت سرویس پشتیبان - سرور HTTP با استفاده از FastAPI که کد عامل ADK، SessionService، و سرویس Artifact در آن قرار دارد.
- متغیرهای محیطی را مدیریت کنید و فایلهای مورد نیاز برای استقرار برنامه در Cloud Run را تنظیم کنید
- برنامه را در Cloud Run مستقر کنید
نمای کلی معماری
پیش نیازها
- کار راحت با پایتون
- درک معماری پایه تمام پشته با استفاده از سرویس HTTP
چیزی که یاد خواهید گرفت
- نمونه سازی وب فرانت اند با Gradio
- توسعه خدمات Backend با FastAPI و Pydantic
- معماری ADK Agent در حالی که از چندین قابلیت آن استفاده می کند
- استفاده از ابزار
- مدیریت جلسات و مصنوعات
- استفاده از پاسخ به تماس برای اصلاح ورودی قبل از ارسال به Gemini
- استفاده از BuiltInPlanner برای بهبود اجرای وظایف با انجام برنامه ریزی
- اشکال زدایی سریع از طریق رابط وب محلی ADK
- استراتژی بهینهسازی تعامل چندوجهی از طریق تجزیه و بازیابی اطلاعات از طریق مهندسی سریع و اصلاح درخواست Gemini با استفاده از پاسخ به تماس ADK
- Agentic Retrieval Augmented Generation با استفاده از Firestore به عنوان پایگاه داده برداری
- متغیرهای محیطی را در فایل YAML با تنظیمات Pydantic مدیریت کنید
- برنامه را با استفاده از Dockerfile در Cloud Run مستقر کنید و متغیرهای محیطی را با فایل YAML ارائه دهید
آنچه شما نیاز دارید
- مرورگر وب کروم
- یک اکانت جیمیل
- یک پروژه Cloud با فعال کردن صورتحساب
این کد لبه که برای توسعه دهندگان همه سطوح (از جمله مبتدیان) طراحی شده است، از پایتون در برنامه نمونه خود استفاده می کند. با این حال، دانش پایتون برای درک مفاهیم ارائه شده مورد نیاز نیست.
2. قبل از شروع
Active Project را در Cloud Console انتخاب کنید
این کد لبه فرض می کند که شما قبلاً یک پروژه Google Cloud با فعال بودن صورتحساب دارید. اگر هنوز آن را ندارید، می توانید دستورالعمل های زیر را برای شروع دنبال کنید.
- در Google Cloud Console ، در صفحه انتخاب پروژه، یک پروژه Google Cloud را انتخاب یا ایجاد کنید.
- مطمئن شوید که صورتحساب برای پروژه Cloud شما فعال است. با نحوه بررسی فعال بودن صورتحساب در پروژه آشنا شوید.
پایگاه داده Firestore را آماده کنید
در مرحله بعد، ما همچنین باید یک پایگاه داده Firestore ایجاد کنیم. Firestore در حالت Native یک پایگاه داده اسناد NoSQL است که برای مقیاس بندی خودکار، کارایی بالا و سهولت توسعه برنامه ساخته شده است. همچنین می تواند به عنوان یک پایگاه داده برداری عمل کند که می تواند از تکنیک Retrieval Augmented Generation برای آزمایشگاه ما پشتیبانی کند.
- " Firestore" را در نوار جستجو جستجو کنید و روی محصول Firestore کلیک کنید
- سپس، روی دکمه Create A Firestore Database کلیک کنید
- از (پیش فرض) به عنوان شناسه پایگاه داده استفاده کنید و نسخه استاندارد را انتخاب کنید. به خاطر این نسخه آزمایشی آزمایشگاهی، از Firestore Native با قوانین امنیتی باز استفاده کنید.
- همچنین متوجه خواهید شد که این پایگاه داده در واقع دارای استفاده از لایه رایگان YEAY است! پس از آن، روی دکمه ایجاد پایگاه داده کلیک کنید
پس از این مراحل، شما باید به پایگاه داده Firestore که به تازگی ایجاد کرده اید هدایت شوید
راه اندازی پروژه Cloud در ترمینال Cloud Shell
- شما از Cloud Shell استفاده خواهید کرد، یک محیط خط فرمان در حال اجرا در Google Cloud که با bq از قبل بارگذاری شده است. روی Activate Cloud Shell در بالای کنسول Google Cloud کلیک کنید.
- پس از اتصال به Cloud Shell، با استفاده از دستور زیر بررسی میکنید که قبلاً احراز هویت شدهاید و پروژه به ID پروژه شما تنظیم شده است:
gcloud auth list
- دستور زیر را در Cloud Shell اجرا کنید تا تأیید کنید که دستور gcloud از پروژه شما اطلاع دارد.
gcloud config list project
- اگر پروژه شما تنظیم نشده است، از دستور زیر برای تنظیم آن استفاده کنید:
gcloud config set project <YOUR_PROJECT_ID>
همچنین میتوانید شناسه PROJECT_ID
را در کنسول ببینید
روی آن کلیک کنید و تمام پروژه و شناسه پروژه را در سمت راست خواهید دید
- API های مورد نیاز را از طریق دستور زیر فعال کنید. این ممکن است چند دقیقه طول بکشد، پس لطفا صبور باشید.
gcloud services enable aiplatform.googleapis.com \
firestore.googleapis.com \
run.googleapis.com \
cloudbuild.googleapis.com \
cloudresourcemanager.googleapis.com
در اجرای موفقیت آمیز دستور، باید پیامی مشابه تصویر زیر مشاهده کنید:
Operation "operations/..." finished successfully.
جایگزین دستور gcloud از طریق کنسول با جستجوی هر محصول یا استفاده از این پیوند است.
اگر هر یک از API از دست رفته است، همیشه می توانید آن را در طول پیاده سازی فعال کنید.
برای دستورات و استفاده از gcloud به اسناد مراجعه کنید.
Google Cloud Storage Bucket را آماده کنید
در مرحله بعد، از همان ترمینال، باید سطل GCS را برای ذخیره فایل آپلود شده آماده کنیم. دستور زیر را برای ایجاد سطل اجرا کنید
gsutil mb -l us-central1 gs://personal-expense-assistant-receipts
این خروجی را نشان خواهد داد
Creating gs://personal-expense-assistant-receipts/...
می توانید با رفتن به منوی پیمایش در سمت چپ بالای مرورگر و انتخاب Cloud Storage -> Bucket این موضوع را تأیید کنید.
ایجاد فهرست Firestore برای جستجو
Firestore یک پایگاه داده NoSQL به صورت بومی است که عملکرد و انعطاف پذیری عالی را در مدل داده ارائه می دهد، اما در مورد پرس و جوهای پیچیده محدودیت هایی دارد. همانطور که قصد داریم از برخی پرس و جوهای چند فیلدی مرکب و جستجوی برداری استفاده کنیم، ابتدا باید مقداری شاخص ایجاد کنیم. شما می توانید در مورد جزئیات بیشتر در این مستندات مطالعه کنید
- دستور زیر را برای ایجاد ایندکس برای پشتیبانی از کوئری های ترکیبی اجرا کنید
gcloud firestore indexes composite create \
--collection-group=personal-expense-assistant-receipts \
--field-config field-path=total_amount,order=ASCENDING \
--field-config field-path=transaction_time,order=ASCENDING \
--field-config field-path=__name__,order=ASCENDING \
--database="(default)"
- و این یکی را برای پشتیبانی از جستجوی برداری اجرا کنید
gcloud firestore indexes composite create \
--collection-group="personal-expense-assistant-receipts" \
--query-scope=COLLECTION \
--field-config field-path="embedding",vector-config='{"dimension":"768", "flat": "{}"}' \
--database="(default)"
میتوانید با مراجعه به Firestore در کنسول ابری، فهرست ایجاد شده را بررسی کنید و روی نمونه پایگاه داده (پیشفرض) کلیک کنید و Indexes را در نوار پیمایش انتخاب کنید.
به Cloud Shell Editor and Setup Application Working Directory بروید
اکنون، میتوانیم ویرایشگر کد خود را برای انجام برخی موارد کدنویسی تنظیم کنیم. برای این کار از Cloud Shell Editor استفاده خواهیم کرد
- روی دکمه Open Editor کلیک کنید، با این کار یک Cloud Shell Editor باز می شود، ما می توانیم کد خود را اینجا بنویسیم
- مطمئن شوید که پروژه Cloud Code در گوشه سمت چپ پایین (نوار وضعیت) ویرایشگر Cloud Shell تنظیم شده است، همانطور که در تصویر زیر مشخص شده است و روی پروژه فعال Google Cloud که در آن صورتحساب را فعال کردهاید، تنظیم شده است. در صورت درخواست مجوز دهید . اگر از قبل دستور قبلی را دنبال کرده اید، دکمه ممکن است به جای دکمه ورود مستقیماً به پروژه فعال شده شما اشاره کند
- سپس، بیایید دایرکتوری کار قالب را برای این کد لبه از Github کلون کنیم، دستور زیر را اجرا کنیم. دایرکتوری کاری را در دایرکتوری personal-expense-assistant ایجاد می کند
git clone https://github.com/alphinside/personal-expense-assistant-adk-codelab-starter.git personal-expense-assistant
- پس از آن، به بخش بالای ویرایشگر پوسته ابری بروید و روی File->Open Folder کلیک کنید، فهرست نام کاربری خود را پیدا کنید و دایرکتوری personal-expense-assistant را پیدا کنید سپس روی دکمه OK کلیک کنید. این دایرکتوری انتخاب شده را به عنوان دایرکتوری اصلی تبدیل می کند. در این مثال، نام کاربری alvinprayuda است، از این رو مسیر دایرکتوری در زیر نشان داده شده است
حال، ویرایشگر پوسته ابری شما باید به این شکل باشد
راه اندازی محیط
محیط مجازی پایتون را آماده کنید
مرحله بعدی آماده سازی محیط توسعه است. ترمینال فعال فعلی شما باید در دایرکتوری کاری دستیار هزینه شخصی باشد. ما از Python 3.12 در این کد لبه استفاده خواهیم کرد و از مدیر پروژه uv python برای ساده سازی نیاز به ایجاد و مدیریت نسخه پایتون و محیط مجازی استفاده خواهیم کرد.
- اگر هنوز ترمینال را باز نکرده اید، آن را با کلیک بر روی Terminal -> New Terminal باز کنید یا از Ctrl + Shift + C استفاده کنید، یک پنجره ترمینال در قسمت پایین مرورگر باز می شود.
-
uv
را دانلود و با دستور زیر پایتون 3.12 را نصب کنید
curl -LsSf https://astral.sh/uv/0.6.16/install.sh | sh && \
source $HOME/.local/bin/env && \
uv python install 3.12
- حال اجازه دهید محیط مجازی را با استفاده از
uv
مقداردهی اولیه کنیم، این دستور را اجرا کنید
uv sync --frozen
این دایرکتوری .venv را ایجاد می کند و وابستگی ها را نصب می کند. نگاهی سریع به pyproject.toml اطلاعاتی را در مورد وابستگی هایی که به این صورت نشان داده شده اند به شما می دهد.
dependencies = [ "datasets>=3.5.0", "google-adk>=0.2.0", "google-cloud-firestore>=2.20.1", "gradio>=5.23.1", "pydantic>=2.10.6", "pydantic-settings[yaml]>=2.8.1", ]
- برای تست env مجازی، فایل جدید main.py ایجاد کنید و کد زیر را کپی کنید
def main():
print("Hello from personal-expense-assistant-adk!")
if __name__ == "__main__":
main()
- سپس، دستور زیر را اجرا کنید
uv run main.py
مانند شکل زیر خروجی دریافت خواهید کرد
Using CPython 3.12 Creating virtual environment at: .venv Hello from personal-expense-assistant-adk!
این نشان می دهد که پروژه پایتون به درستی راه اندازی شده است.
راه اندازی فایل های پیکربندی
اکنون باید فایل های پیکربندی این پروژه را تنظیم کنیم. ما از تنظیمات pydantic برای خواندن پیکربندی از فایل YAML استفاده می کنیم.
یک فایل با نام settings.yaml با تنظیمات زیر ایجاد کنید. روی File->New Text File کلیک کنید و کد زیر را پر کنید. سپس آن را به عنوان settings.yaml ذخیره کنید
GCLOUD_LOCATION: "us-central1"
GCLOUD_PROJECT_ID: "your_gcloud_project_id"
BACKEND_URL: "http://localhost:8081/chat"
STORAGE_BUCKET_NAME: "personal-expense-assistant-receipts"
DB_COLLECTION_NAME: "personal-expense-assistant-receipts"
برای این آزمایشگاه کد، ما با مقادیر از پیش پیکربندی شده برای GCLOUD_LOCATION
,
BACKEND_URL
,
STORAGE_BUCKET_NAME
,
DB_COLLECTION_NAME
و BACKEND_URL
استفاده میکنیم.
اکنون میتوانیم به مرحله بعدی برویم، ساخت عامل و سپس خدمات
3. Agent را با استفاده از Google ADK و Gemini 2.5 بسازید
مقدمه ای بر ساختار دایرکتوری ADK
بیایید با بررسی آنچه ADK ارائه می دهد و نحوه ساخت عامل شروع کنیم. اسناد کامل ADK در این URL قابل دسترسی است. ADK ابزارهای بسیاری را در اجرای دستور CLI خود به ما ارائه می دهد. برخی از آنها به شرح زیر است:
- ساختار دایرکتوری عامل را تنظیم کنید
- به سرعت تعامل را از طریق خروجی ورودی CLI امتحان کنید
- به سرعت رابط وب UI توسعه محلی را تنظیم کنید
حال، بیایید ساختار دایرکتوری عامل را با استفاده از دستور CLI ایجاد کنیم. دستور زیر را اجرا کنید
uv run adk create expense_manager_agent \
--model gemini-2.5-flash-preview-04-17 \
--project {your-project-id} \
--region us-central1
ساختار دایرکتوری عامل زیر را ایجاد می کند
expense_manager_agent/ ├── __init__.py ├── .env ├── agent.py
و اگر init.py و agent.py را بررسی کنید، این کد را خواهید دید
# __init__.py
from . import agent
# agent.py
from google.adk.agents import Agent
root_agent = Agent(
model='gemini-2.5-flash-preview-04-17',
name='root_agent',
description='A helpful assistant for user questions.',
instruction='Answer user questions to the best of your knowledge',
)
ایجاد عامل مدیریت هزینه ما
بیایید عامل مدیریت هزینه خود را بسازیم! فایل shpenzim_manager_agent / agent.py را باز کنید و کد زیر را که حاوی root_agent است کپی کنید.
# expense_manager_agent/agent.py
from google.adk.agents import Agent
from expense_manager_agent.tools import (
store_receipt_data,
search_receipts_by_metadata_filter,
search_relevant_receipts_by_natural_language_query,
get_receipt_data_by_image_id,
)
from expense_manager_agent.callbacks import modify_image_data_in_history
import os
from settings import get_settings
from google.adk.planners import BuiltInPlanner
from google.genai import types
SETTINGS = get_settings()
os.environ["GOOGLE_CLOUD_PROJECT"] = SETTINGS.GCLOUD_PROJECT_ID
os.environ["GOOGLE_CLOUD_LOCATION"] = SETTINGS.GCLOUD_LOCATION
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"
# Get the code file directory path and read the task prompt file
current_dir = os.path.dirname(os.path.abspath(__file__))
prompt_path = os.path.join(current_dir, "task_prompt.md")
with open(prompt_path, "r") as file:
task_prompt = file.read()
root_agent = Agent(
name="expense_manager_agent",
model="gemini-2.5-flash-preview-04-17",
description=(
"Personal expense agent to help user track expenses, analyze receipts, and manage their financial records"
),
instruction=task_prompt,
tools=[
store_receipt_data,
get_receipt_data_by_image_id,
search_receipts_by_metadata_filter,
search_relevant_receipts_by_natural_language_query,
],
planner=BuiltInPlanner(
thinking_config=types.ThinkingConfig(
thinking_budget=2048,
)
),
before_model_callback=modify_image_data_in_history,
)
توضیح کد
این اسکریپت شامل شروع عامل ما است که در آن موارد زیر را مقداردهی اولیه می کنیم:
- مدل مورد استفاده را روی
gemini-2.5-flash-preview-04-17
تنظیم کنید - توضیحات و دستورالعمل عامل را به عنوان اعلان سیستم که از
task_prompt.md
خوانده می شود تنظیم کنید. - ابزارهای لازم را برای پشتیبانی از عملکرد عامل فراهم کنید
- با استفاده از قابلیتهای تفکر فلش Gemini 2.5 قبل از ایجاد پاسخ یا اجرای نهایی، برنامهریزی را فعال کنید.
- قبل از ارسال درخواست به Gemini برای محدود کردن تعداد دادههای تصویر ارسالی قبل از پیشبینی، رهگیری پاسخ تماس را تنظیم کنید
4. پیکربندی ابزارهای عامل
نماینده مدیریت هزینه ما توانایی های زیر را دارد:
- داده ها را از تصویر رسید استخراج کنید و داده ها و فایل را ذخیره کنید
- جستجوی دقیق در داده های هزینه
- جستجوی متنی در داده های هزینه
از این رو ما به ابزارهای مناسب برای پشتیبانی از این قابلیت نیاز داریم. یک فایل جدید در پوشه cost_manager_agent ایجاد کنید و نام آن را tools.py بگذارید و کد زیر را کپی کنید
# expense_manager_agent/tools.py
import datetime
from typing import Dict, List, Any
from google.cloud import firestore
from google.cloud.firestore_v1.vector import Vector
from google.cloud.firestore_v1 import FieldFilter
from google.cloud.firestore_v1.base_query import And
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
from settings import get_settings
from google import genai
SETTINGS = get_settings()
DB_CLIENT = firestore.Client(
project=SETTINGS.GCLOUD_PROJECT_ID
) # Will use "(default)" database
COLLECTION = DB_CLIENT.collection(SETTINGS.DB_COLLECTION_NAME)
GENAI_CLIENT = genai.Client(
vertexai=True, location=SETTINGS.GCLOUD_LOCATION, project=SETTINGS.GCLOUD_PROJECT_ID
)
EMBEDDING_DIMENSION = 768
EMBEDDING_FIELD_NAME = "embedding"
INVALID_ITEMS_FORMAT_ERR = """
Invalid items format. Must be a list of dictionaries with 'name', 'price', and 'quantity' keys."""
RECEIPT_DESC_FORMAT = """
Store Name: {store_name}
Transaction Time: {transaction_time}
Total Amount: {total_amount}
Currency: {currency}
Purchased Items:
{purchased_items}
Receipt Image ID: {receipt_id}
"""
def sanitize_image_id(image_id: str) -> str:
"""Sanitize image ID by removing any leading/trailing whitespace."""
if image_id.startswith("[IMAGE-"):
image_id = image_id.split("ID ")[1].split("]")[0]
return image_id.strip()
def store_receipt_data(
image_id: str,
store_name: str,
transaction_time: str,
total_amount: float,
purchased_items: List[Dict[str, Any]],
currency: str = "IDR",
) -> str:
"""
Store receipt data in the database.
Args:
image_id (str): The unique identifier of the image. For example IMAGE-POSITION 0-ID 12345,
the ID of the image is 12345.
store_name (str): The name of the store.
transaction_time (str): The time of purchase, in ISO format ("YYYY-MM-DDTHH:MM:SS.ssssssZ").
total_amount (float): The total amount spent.
purchased_items (List[Dict[str, Any]]): A list of items purchased with their prices. Each item must have:
- name (str): The name of the item.
- price (float): The price of the item.
- quantity (int, optional): The quantity of the item. Defaults to 1 if not provided.
currency (str, optional): The currency of the transaction, can be derived from the store location.
If unsure, default is "IDR".
Returns:
str: A success message with the receipt ID.
Raises:
Exception: If the operation failed or input is invalid.
"""
try:
# In case of it provide full image placeholder, extract the id string
image_id = sanitize_image_id(image_id)
# Check if the receipt already exists
doc = get_receipt_data_by_image_id(image_id)
if doc:
return f"Receipt with ID {image_id} already exists"
# Validate transaction time
if not isinstance(transaction_time, str):
raise ValueError(
"Invalid transaction time: must be a string in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
)
try:
datetime.datetime.fromisoformat(transaction_time.replace("Z", "+00:00"))
except ValueError:
raise ValueError(
"Invalid transaction time format. Must be in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
)
# Validate items format
if not isinstance(purchased_items, list):
raise ValueError(INVALID_ITEMS_FORMAT_ERR)
for _item in purchased_items:
if (
not isinstance(_item, dict)
or "name" not in _item
or "price" not in _item
):
raise ValueError(INVALID_ITEMS_FORMAT_ERR)
if "quantity" not in _item:
_item["quantity"] = 1
# Create a combined text from all receipt information for better embedding
result = GENAI_CLIENT.models.embed_content(
model="text-embedding-004",
contents=RECEIPT_DESC_FORMAT.format(
store_name=store_name,
transaction_time=transaction_time,
total_amount=total_amount,
currency=currency,
purchased_items=purchased_items,
receipt_id=image_id,
),
)
embedding = result.embeddings[0].values
doc = {
"receipt_id": image_id,
"store_name": store_name,
"transaction_time": transaction_time,
"total_amount": total_amount,
"currency": currency,
"purchased_items": purchased_items,
EMBEDDING_FIELD_NAME: Vector(embedding),
}
COLLECTION.add(doc)
return f"Receipt stored successfully with ID: {image_id}"
except Exception as e:
raise Exception(f"Failed to store receipt: {str(e)}")
def search_receipts_by_metadata_filter(
start_time: str,
end_time: str,
min_total_amount: float = -1.0,
max_total_amount: float = -1.0,
) -> str:
"""
Filter receipts by metadata within a specific time range and optionally by amount.
Args:
start_time (str): The start datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
end_time (str): The end datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
min_total_amount (float): The minimum total amount for the filter (inclusive). Defaults to -1.
max_total_amount (float): The maximum total amount for the filter (inclusive). Defaults to -1.
Returns:
str: A string containing the list of receipt data matching all applied filters.
Raises:
Exception: If the search failed or input is invalid.
"""
try:
# Validate start and end times
if not isinstance(start_time, str) or not isinstance(end_time, str):
raise ValueError("start_time and end_time must be strings in ISO format")
try:
datetime.datetime.fromisoformat(start_time.replace("Z", "+00:00"))
datetime.datetime.fromisoformat(end_time.replace("Z", "+00:00"))
except ValueError:
raise ValueError("start_time and end_time must be strings in ISO format")
# Start with the base collection reference
query = COLLECTION
# Build the composite query by properly chaining conditions
# Notes that this demo assume 1 user only,
# need to refactor the query for multiple user
filters = [
FieldFilter("transaction_time", ">=", start_time),
FieldFilter("transaction_time", "<=", end_time),
]
# Add optional filters
if min_total_amount != -1:
filters.append(FieldFilter("total_amount", ">=", min_total_amount))
if max_total_amount != -1:
filters.append(FieldFilter("total_amount", "<=", max_total_amount))
# Apply the filters
composite_filter = And(filters=filters)
query = query.where(filter=composite_filter)
# Execute the query and collect results
search_result_description = "Search by Metadata Results:\n"
for doc in query.stream():
data = doc.to_dict()
data.pop(
EMBEDDING_FIELD_NAME, None
) # Remove embedding as it's not needed for display
search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"
return search_result_description
except Exception as e:
raise Exception(f"Error filtering receipts: {str(e)}")
def search_relevant_receipts_by_natural_language_query(
query_text: str, limit: int = 5
) -> str:
"""
Search for receipts with content most similar to the query using vector search.
This tool can be use for user query that is difficult to translate into metadata filters.
Such as store name or item name which sensitive to string matching.
Use this tool if you cannot utilize the search by metadata filter tool.
Args:
query_text (str): The search text (e.g., "coffee", "dinner", "groceries").
limit (int, optional): Maximum number of results to return (default: 5).
Returns:
str: A string containing the list of contextually relevant receipt data.
Raises:
Exception: If the search failed or input is invalid.
"""
try:
# Generate embedding for the query text
result = GENAI_CLIENT.models.embed_content(
model="text-embedding-004", contents=query_text
)
query_embedding = result.embeddings[0].values
# Notes that this demo assume 1 user only,
# need to refactor the query for multiple user
vector_query = COLLECTION.find_nearest(
vector_field=EMBEDDING_FIELD_NAME,
query_vector=Vector(query_embedding),
distance_measure=DistanceMeasure.EUCLIDEAN,
limit=limit,
)
# Execute the query and collect results
search_result_description = "Search by Contextual Relevance Results:\n"
for doc in vector_query.stream():
data = doc.to_dict()
data.pop(
EMBEDDING_FIELD_NAME, None
) # Remove embedding as it's not needed for display
search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"
return search_result_description
except Exception as e:
raise Exception(f"Error searching receipts: {str(e)}")
def get_receipt_data_by_image_id(image_id: str) -> Dict[str, Any]:
"""
Retrieve receipt data from the database using the image_id.
Args:
image_id (str): The unique identifier of the receipt image. For example, if the placeholder is
[IMAGE-ID 12345], the ID to use is 12345.
Returns:
Dict[str, Any]: A dictionary containing the receipt data with the following keys:
- receipt_id (str): The unique identifier of the receipt image.
- store_name (str): The name of the store.
- transaction_time (str): The time of purchase in UTC.
- total_amount (float): The total amount spent.
- currency (str): The currency of the transaction.
- purchased_items (List[Dict[str, Any]]): List of items purchased with their details.
Returns an empty dictionary if no receipt is found.
"""
# In case of it provide full image placeholder, extract the id string
image_id = sanitize_image_id(image_id)
# Query the receipts collection for documents with matching receipt_id (image_id)
# Notes that this demo assume 1 user only,
# need to refactor the query for multiple user
query = COLLECTION.where(filter=FieldFilter("receipt_id", "==", image_id)).limit(1)
docs = list(query.stream())
if not docs:
return {}
# Get the first matching document
doc_data = docs[0].to_dict()
doc_data.pop(EMBEDDING_FIELD_NAME, None)
return doc_data
توضیح کد
در اجرای این تابع ابزار، ابزارها را حول این 2 ایده اصلی طراحی می کنیم:
- تجزیه و تحلیل داده های رسید و نگاشت به فایل اصلی با استفاده از متغیر رشته شناسه تصویر
[IMAGE-ID <hash-of-image-1>]
- ذخیره و بازیابی داده ها با استفاده از پایگاه داده Firestore
ابزار "store_receipt_data"
این ابزار ابزار تشخیص کاراکتر نوری است، اطلاعات مورد نیاز را از داده های تصویر، همراه با شناسایی رشته Image ID تجزیه می کند و آنها را با هم نقشه می کشد تا در پایگاه داده Firestore ذخیره شوند.
علاوه بر این، این ابزار همچنین محتوای رسید را با استفاده از text-embedding-004
به جاسازی تبدیل می کند تا تمام ابرداده ها و جاسازی با هم ذخیره و نمایه شوند. امکان بازیابی انعطاف پذیری از طریق پرس و جو یا جستجوی متنی.
پس از اجرای موفقیت آمیز این ابزار، می توانید مشاهده کنید که داده های رسید قبلاً در پایگاه داده Firestore نمایه شده است مانند شکل زیر
ابزار "search_receipts_by_metadata_filter"
این ابزار درخواست کاربر را به یک فیلتر پرس و جوی فراداده تبدیل می کند که از جستجو بر اساس محدوده تاریخ و/یا کل تراکنش پشتیبانی می کند. تمام دادههای رسید منطبق را برمیگرداند، جایی که در این فرآیند، فیلد جاسازی را حذف میکنیم زیرا عامل برای درک متنی به آن نیاز ندارد.
ابزار "search_relevant_receipts_by_natural_language_query"
این ابزار Retrieval Augmented Generation (RAG) ما است. نماینده ما توانایی طراحی درخواست خود را برای بازیابی رسیدهای مربوطه از پایگاه داده برداری دارد و همچنین می تواند انتخاب کند که چه زمانی از این ابزار استفاده کند. مفهوم اجازه دادن به تصمیم مستقل از طرف عامل که آیا از این ابزار RAG استفاده می کند یا نه و درخواست خود را طراحی می کند یکی از تعاریف رویکرد Agentic RAG است.
ما نه تنها به آن اجازه میدهیم پرس و جوی خود را بسازد، بلکه به آن اجازه میدهیم تعداد اسناد مرتبطی را که میخواهد بازیابی کند، انتخاب کند. همراه با یک مهندسی سریع مناسب، به عنوان مثال
# Example prompt Always filter the result from tool search_relevant_receipts_by_natural_language_query as the returned result may contain irrelevant information
این باعث میشود که این ابزار به ابزاری قدرتمند تبدیل شود که تقریباً هر چیزی را میتواند جستجو کند، اگرچه ممکن است به دلیل ماهیت غیر دقیق جستجوی نزدیکترین همسایه ، همه نتایج مورد انتظار را به دست نیاورد.
5. تغییر زمینه مکالمه از طریق Callbacks
Google ADK ما را قادر می سازد تا زمان اجرای عامل را در سطوح مختلف "رهگیری" کنیم. شما می توانید در مورد این قابلیت دقیق در این مستندات بیشتر بخوانید. در این آزمایشگاه، ما before_model_callback
برای اصلاح درخواست قبل از ارسال به LLM استفاده میکنیم تا دادههای تصویر را در زمینه تاریخچه مکالمه قدیمی حذف کنیم (فقط شامل دادههای تصویر در 3 تعامل کاربر آخر) برای کارایی.
با این حال، ما همچنان می خواهیم که عامل در صورت نیاز، زمینه داده تصویر را داشته باشد. از این رو ما مکانیزمی را اضافه می کنیم تا پس از هر داده بایت تصویر در مکالمه، یک متغیر شناسه تصویر رشته ای اضافه کنیم. این به عامل کمک می کند تا شناسه تصویر را به داده های فایل واقعی خود پیوند دهد که می تواند هم در زمان ذخیره یا بازیابی تصویر مورد استفاده قرار گیرد. ساختار به این شکل خواهد بود
<image-byte-data-1> [IMAGE-ID <hash-of-image-1>] <image-byte-data-2> [IMAGE-ID <hash-of-image-2>] And so on..
و زمانی که دادههای بایت در تاریخچه مکالمه منسوخ میشوند، شناسه رشته همچنان وجود دارد تا دسترسی به دادهها را با کمک استفاده از ابزار فعال کند. نمونه ساختار تاریخچه پس از حذف داده های تصویر
[IMAGE-ID <hash-of-image-1>] [IMAGE-ID <hash-of-image-2>] And so on..
بیایید شروع کنیم! یک فایل جدید در پوشه cost_manager_agent ایجاد کنید و نام آن را callbacks.py بگذارید و کد زیر را کپی کنید
# expense_manager_agent/callbacks.py
import hashlib
from google.genai import types
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest
def modify_image_data_in_history(
callback_context: CallbackContext, llm_request: LlmRequest
) -> None:
# The following code will modify the request sent to LLM
# We will only keep image data in the last 3 user messages using a reverse and counter approach
# Count how many user messages we've processed
user_message_count = 0
# Process the reversed list
for content in reversed(llm_request.contents):
# Only count for user manual query, not function call
if (content.role == "user") and (content.parts[0].function_response is None):
user_message_count += 1
modified_content_parts = []
# Check any missing image ID placeholder for any image data
# Then remove image data from conversation history if more than 3 user messages
for idx, part in enumerate(content.parts):
if part.inline_data is None:
modified_content_parts.append(part)
continue
if (
(idx + 1 >= len(content.parts))
or (content.parts[idx + 1].text is None)
or (not content.parts[idx + 1].text.startswith("[IMAGE-ID "))
):
# Generate hash ID for the image and add a placeholder
image_data = part.inline_data.data
hasher = hashlib.sha256(image_data)
image_hash_id = hasher.hexdigest()[:12]
placeholder = f"[IMAGE-ID {image_hash_id}]"
# Only keep image data in the last 3 user messages
if user_message_count <= 3:
modified_content_parts.append(part)
modified_content_parts.append(types.Part(text=placeholder))
else:
# Only keep image data in the last 3 user messages
if user_message_count <= 3:
modified_content_parts.append(part)
# This will modify the contents inside the llm_request
content.parts = modified_content_parts
6. Prompt
طراحی یک عامل با تعامل و قابلیتهای پیچیده، مستلزم آن است که یک اعلان خوب برای راهنمایی عامل پیدا کنیم تا بتواند آنطور که ما میخواهیم رفتار کند.
قبلاً مکانیزمی در مورد نحوه مدیریت دادههای تصویر در تاریخچه مکالمه داشتیم و همچنین ابزارهایی داشتیم که استفاده از آنها ممکن است ساده نباشد، مانند search_relevant_receipts_by_natural_language_query.
ما همچنین می خواهیم که نماینده بتواند تصویر رسید صحیح را برای ما جستجو و بازیابی کند. این بدان معنی است که ما باید به درستی همه این اطلاعات را در یک ساختار سریع و مناسب انتقال دهیم
ما از عامل می خواهیم که خروجی را در قالب نشانه گذاری زیر ساختار دهد تا فرآیند تفکر، پاسخ نهایی و پیوست (در صورت وجود) را تجزیه کند.
# THINKING PROCESS Thinking process here # FINAL RESPONSE Response to the user here Attachments put inside json block { "attachments": [ "[IMAGE-ID <hash-id-1>]", "[IMAGE-ID <hash-id-2>]", ... ] }
بیایید با دستور زیر شروع کنیم تا به انتظارات اولیه خود از رفتار عامل مدیر هزینه دست یابیم. فایل task_prompt.md باید از قبل در دایرکتوری کاری موجود ما وجود داشته باشد، اما باید آن را به دایرکتوری cost_manager_agent منتقل کنیم. دستور زیر را برای جابجایی آن اجرا کنید
mv task_prompt.md expense_manager_agent/task_prompt.md
7. تست عامل
حالا بیایید سعی کنیم از طریق CLI با عامل ارتباط برقرار کنیم، دستور زیر را اجرا کنیم
uv run adk run expense_manager_agent
خروجی را مانند این نشان می دهد، جایی که می توانید به نوبه خود با نماینده چت کنید، اما فقط می توانید از طریق این رابط متن ارسال کنید.
Log setup complete: /tmp/agents_log/agent.xxxx_xxx.log To access latest log: tail -F /tmp/agents_log/agent.latest.log Running agent root_agent, type exit to exit. user: hello [root_agent]: Hello there! How can I help you today? user:
در حال حاضر، علاوه بر تعامل CLI، ADK همچنین به ما اجازه می دهد تا یک رابط کاربری توسعه برای تعامل و بررسی آنچه در طول تعامل می گذرد داشته باشیم. دستور زیر را برای راه اندازی سرور UI توسعه محلی اجرا کنید
uv run adk web --port 8080
مانند مثال زیر خروجی ایجاد می کند، به این معنی که ما می توانیم از قبل به رابط وب دسترسی داشته باشیم
INFO: Started server process [xxxx] INFO: Waiting for application startup. +-----------------------------------------------------------------------------+ | ADK Web Server started | | | | For local testing, access at http://localhost:8080. | +-----------------------------------------------------------------------------+ INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
اکنون برای بررسی آن، روی دکمه Web Preview در قسمت بالای Cloud Shell Editor خود کلیک کرده و Preview در پورت 8080 را انتخاب کنید.
صفحه وب زیر را مشاهده خواهید کرد که در آن می توانید عوامل موجود را در دکمه کشویی بالا سمت چپ انتخاب کنید (در مورد ما باید هزینه_manager_agent باشد) و با ربات تعامل کنید. در پنجره سمت چپ، اطلاعات زیادی در مورد جزئیات گزارش در طول اجرای Agent مشاهده خواهید کرد
بیایید برخی اقدامات را امتحان کنیم! این 2 رسید نمونه را آپلود کنید (منبع: مجموعه دادههای چهره در آغوش گرفته mousserlane/id_receipt_dataset
). روی هر تصویر کلیک راست کرده و Save Image as.. را انتخاب کنید (با این کار تصویر رسید دانلود می شود)، سپس با کلیک بر روی نماد "کلیپ" فایل را در ربات آپلود کنید و بگویید که می خواهید این رسیدها را ذخیره کنید.
پس از آن پرس و جوهای زیر را برای انجام برخی جستجو یا بازیابی فایل امتحان کنید
- "تجزیه هزینه ها و مجموع آن در طول سال 2023"
- "فایل رسید از ایندومارت به من بدهید"
هنگامی که از برخی ابزارها استفاده می کند، می توانید آنچه را که در رابط کاربری توسعه می گذرد بررسی کنید
ببینید نماینده چگونه به شما پاسخ می دهد و بررسی کنید که آیا با تمام قوانین ارائه شده در دستور داخل task_prompt.py مطابقت دارد یا خیر. تبریک می گویم! اکنون شما یک عامل توسعه کامل دارید.
اکنون زمان آن رسیده است که آن را با رابط کاربری مناسب و زیبا و قابلیت آپلود و دانلود فایل تصویر تکمیل کنید.
8. سرویس Frontend را با استفاده از Gradio بسازید
ما یک رابط وب چت خواهیم ساخت که شبیه این است
این شامل یک رابط چت با یک فیلد ورودی برای کاربران برای ارسال متن و آپلود فایل(های) تصویر رسید است.
ما سرویس frontend را با استفاده از Gradio خواهیم ساخت.
فایل جدیدی ایجاد کنید، روی File->New Text File کلیک کنید و نام آن را frontend.py بگذارید سپس کد زیر را کپی کرده و ذخیره کنید.
import mimetypes
import gradio as gr
import requests
import base64
from typing import List, Dict, Any
from settings import get_settings
from PIL import Image
import io
from schema import ImageData, ChatRequest, ChatResponse
SETTINGS = get_settings()
def encode_image_to_base64_and_get_mime_type(image_path: str) -> ImageData:
"""Encode a file to base64 string and get MIME type.
Reads an image file and returns the base64-encoded image data and its MIME type.
Args:
image_path: Path to the image file to encode.
Returns:
ImageData object containing the base64 encoded image data and its MIME type.
"""
# Read the image file
with open(image_path, "rb") as file:
image_content = file.read()
# Get the mime type
mime_type = mimetypes.guess_type(image_path)[0]
# Base64 encode the image
base64_data = base64.b64encode(image_content).decode("utf-8")
# Return as ImageData object
return ImageData(serialized_image=base64_data, mime_type=mime_type)
def decode_base64_to_image(base64_data: str) -> Image.Image:
"""Decode a base64 string to PIL Image.
Converts a base64-encoded image string back to a PIL Image object
that can be displayed or processed further.
Args:
base64_data: Base64 encoded string of the image.
Returns:
PIL Image object of the decoded image.
"""
# Decode the base64 string and convert to PIL Image
image_data = base64.b64decode(base64_data)
image_buffer = io.BytesIO(image_data)
image = Image.open(image_buffer)
return image
def get_response_from_llm_backend(
message: Dict[str, Any],
history: List[Dict[str, Any]],
) -> List[str | gr.Image]:
"""Send the message and history to the backend and get a response.
Args:
message: Dictionary containing the current message with 'text' and optional 'files' keys.
history: List of previous message dictionaries in the conversation.
Returns:
List containing text response and any image attachments from the backend service.
"""
# Extract files and convert to base64
image_data = []
if uploaded_files := message.get("files", []):
for file_path in uploaded_files:
image_data.append(encode_image_to_base64_and_get_mime_type(file_path))
# Prepare the request payload
payload = ChatRequest(
text=message["text"],
files=image_data,
session_id="default_session",
user_id="default_user",
)
# Send request to backend
try:
response = requests.post(SETTINGS.BACKEND_URL, json=payload.model_dump())
response.raise_for_status() # Raise exception for HTTP errors
result = ChatResponse(**response.json())
if result.error:
return [f"Error: {result.error}"]
chat_responses = []
if result.thinking_process:
chat_responses.append(
gr.ChatMessage(
role="assistant",
content=result.thinking_process,
metadata={"title": "🧠 Thinking Process"},
)
)
chat_responses.append(gr.ChatMessage(role="assistant", content=result.response))
if result.attachments:
for attachment in result.attachments:
image_data = attachment.serialized_image
chat_responses.append(gr.Image(decode_base64_to_image(image_data)))
return chat_responses
except requests.exceptions.RequestException as e:
return [f"Error connecting to backend service: {str(e)}"]
if __name__ == "__main__":
demo = gr.ChatInterface(
get_response_from_llm_backend,
title="Personal Expense Assistant",
description="This assistant can help you to store receipts data, find receipts, and track your expenses during certain period.",
type="messages",
multimodal=True,
textbox=gr.MultimodalTextbox(file_count="multiple", file_types=["image"]),
)
demo.launch(
server_name="0.0.0.0",
server_port=8080,
)
پس از آن، می توانیم سرویس frontend را با دستور زیر اجرا کنیم. فراموش نکنید که نام فایل main.py را به frontend.py تغییر دهید
uv run frontend.py
خروجی مشابه این را در کنسول ابری خود خواهید دید
* Running on local URL: http://0.0.0.0:8080 To create a public link, set `share=True` in `launch()`.
پس از آن می توانید رابط وب را با ctrl + کلیک روی پیوند URL محلی بررسی کنید. همچنین، میتوانید با کلیک بر روی دکمه پیشنمایش وب در سمت راست بالای Cloud Editor، به برنامه frontend دسترسی پیدا کنید و در پورت 8080 Preview را انتخاب کنید.
رابط وب را خواهید دید، با این حال هنگام تلاش برای ارسال چت، به دلیل سرویس پشتیبان که هنوز راه اندازی نشده است، با خطای مورد انتظار مواجه خواهید شد.
حالا، اجازه دهید سرویس اجرا شود و هنوز آن را نکشید. ما سرویس Backend را در برگه ترمینال دیگری اجرا خواهیم کرد
توضیح کد
در این کد فرانت اند ابتدا کاربر را قادر می سازیم متن ارسال کند و چندین فایل را آپلود کند. Gradio به ما اجازه می دهد تا این نوع عملکرد را با متد gr.ChatInterface ترکیب شده با gr.MultimodalTextbox ایجاد کنیم.
اکنون قبل از ارسال فایل و متن به بکاند، باید نوع mime فایل را همانطور که توسط backend مورد نیاز است، مشخص کنیم. همچنین باید بایت فایل تصویر را در base64 رمزگذاری کرده و همراه با mimetype ارسال کنیم.
class ImageData(BaseModel): """Model for image data with hash identifier. Attributes: serialized_image: Optional Base64 encoded string of the image content. mime_type: MIME type of the image. """ serialized_image: str mime_type: str
طرح مورد استفاده برای تعامل frontend-backend در schema.py تعریف شده است. ما از Pydantic BaseModel برای اجرای اعتبارسنجی داده ها در طرحواره استفاده می کنیم
هنگام دریافت پاسخ، ما از قبل جدا می کنیم که کدام بخش فرآیند تفکر، پاسخ نهایی و دلبستگی است. بنابراین ما میتوانیم از مؤلفه Gradio برای نمایش هر مؤلفه با مؤلفه UI استفاده کنیم.
class ChatResponse(BaseModel): """Model for a chat response. Attributes: response: The text response from the model. thinking_process: Optional thinking process of the model. attachments: List of image data to be displayed to the user. error: Optional error message if something went wrong. """ response: str thinking_process: str = "" attachments: List[ImageData] = [] error: Optional[str] = None
9. ساخت Backend Service با استفاده از FastAPI
در مرحله بعد، ما باید Backendی بسازیم که بتواند Agent ما را همراه با سایر اجزای اولیه اولیه کند تا بتوانیم زمان اجرا را اجرا کنیم.
فایل جدید ایجاد کنید، روی File->New Text File کلیک کنید و کد زیر را کپی کنید و سپس آن را به عنوان backend.py ذخیره کنید.
from expense_manager_agent.agent import root_agent as expense_manager_agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.events import Event
from fastapi import FastAPI, Body, Depends
from typing import AsyncIterator
from types import SimpleNamespace
import uvicorn
from contextlib import asynccontextmanager
import asyncio
from utils import (
extract_attachment_ids_and_sanitize_response,
download_image_from_gcs,
extract_thinking_process,
format_user_request_to_adk_content_and_store_artifacts,
)
from schema import ImageData, ChatRequest, ChatResponse
import logger
from google.adk.artifacts import GcsArtifactService
from settings import get_settings
SETTINGS = get_settings()
APP_NAME = "expense_manager_app"
# Application state to hold service contexts
class AppContexts(SimpleNamespace):
"""A class to hold application contexts with attribute access"""
session_service: InMemorySessionService = None
artifact_service: GcsArtifactService = None
expense_manager_agent_runner: Runner = None
# Initialize application state
app_contexts = AppContexts()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialize service contexts during application startup
app_contexts.session_service = InMemorySessionService()
app_contexts.artifact_service = GcsArtifactService(
bucket_name=SETTINGS.STORAGE_BUCKET_NAME
)
app_contexts.expense_manager_agent_runner = Runner(
agent=expense_manager_agent, # The agent we want to run
app_name=APP_NAME, # Associates runs with our app
session_service=app_contexts.session_service, # Uses our session manager
artifact_service=app_contexts.artifact_service, # Uses our artifact manager
)
logger.info("Application started successfully")
yield
logger.info("Application shutting down")
# Perform cleanup during application shutdown if necessary
# Helper function to get application state as a dependency
async def get_app_contexts() -> AppContexts:
return app_contexts
# Create FastAPI app
app = FastAPI(title="Personal Expense Assistant API", lifespan=lifespan)
@app.post("/chat", response_model=ChatResponse)
async def chat(
request: ChatRequest = Body(...),
app_context: AppContexts = Depends(get_app_contexts),
) -> ChatResponse:
"""Process chat request and get response from the agent"""
# Prepare the user's message in ADK format and store image artifacts
content = await asyncio.to_thread(
format_user_request_to_adk_content_and_store_artifacts,
request=request,
app_name=APP_NAME,
artifact_service=app_context.artifact_service,
)
final_response_text = "Agent did not produce a final response." # Default
# Use the session ID from the request or default if not provided
session_id = request.session_id
user_id = request.user_id
# Create session if it doesn't exist
if not app_context.session_service.get_session(
app_name=APP_NAME, user_id=user_id, session_id=session_id
):
app_context.session_service.create_session(
app_name=APP_NAME, user_id=user_id, session_id=session_id
)
try:
# Process the message with the agent
# Type annotation: runner.run_async returns an AsyncIterator[Event]
events_iterator: AsyncIterator[Event] = (
app_context.expense_manager_agent_runner.run_async(
user_id=user_id, session_id=session_id, new_message=content
)
)
async for event in events_iterator: # event has type Event
# Key Concept: is_final_response() marks the concluding message for the turn
if event.is_final_response():
if event.content and event.content.parts:
# Extract text from the first part
final_response_text = event.content.parts[0].text
elif event.actions and event.actions.escalate:
# Handle potential errors/escalations
final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
break # Stop processing events once the final response is found
logger.info(
"Received final response from agent", raw_final_response=final_response_text
)
# Extract and process any attachments and thinking process in the response
base64_attachments = []
sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response(
final_response_text
)
sanitized_text, thinking_process = extract_thinking_process(sanitized_text)
# Download images from GCS and replace hash IDs with base64 data
for image_hash_id in attachment_ids:
# Download image data and get MIME type
result = await asyncio.to_thread(
download_image_from_gcs,
artifact_service=app_context.artifact_service,
image_hash=image_hash_id,
app_name=APP_NAME,
user_id=user_id,
session_id=session_id,
)
if result:
base64_data, mime_type = result
base64_attachments.append(
ImageData(serialized_image=base64_data, mime_type=mime_type)
)
logger.info(
"Processed response with attachments",
sanitized_response=sanitized_text,
thinking_process=thinking_process,
attachment_ids=attachment_ids,
)
return ChatResponse(
response=sanitized_text,
thinking_process=thinking_process,
attachments=base64_attachments,
)
except Exception as e:
logger.error("Error processing chat request", error_message=str(e))
return ChatResponse(
response="", error=f"Error in generating response: {str(e)}"
)
# Only run the server if this file is executed directly
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8081)
پس از آن می توانیم سرویس Backend را اجرا کنیم. به یاد داشته باشید که در مرحله قبل سرویس frontend را درست اجرا کردیم، اکنون باید ترمینال جدید را باز کنیم و سعی کنیم این سرویس باطن را اجرا کنیم.
- یک ترمینال جدید ایجاد کنید. به ترمینال خود در قسمت پایین بروید و دکمه "+" را برای ایجاد یک ترمینال جدید پیدا کنید. یا می توانید Ctrl + Shift + C را برای باز کردن ترمینال جدید انجام دهید
- پس از آن، مطمئن شوید که در دایرکتوری کاری personal-expense-assistant هستید، سپس دستور زیر را اجرا کنید
uv run backend.py
- اگر موفق شد، خروجی را به این صورت نشان می دهد
INFO: Started server process [xxxxx] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)
توضیح کد
راه اندازی ADK Agent، SessionService و ArtifactService
برای اجرای Agent در سرویس Backend باید یک Runner ایجاد کنیم که هم SessionService و هم Agent ما را بگیرد. SessionService تاریخچه و وضعیت مکالمه را مدیریت می کند، بنابراین وقتی با Runner ادغام می شود، به نماینده ما توانایی دریافت زمینه مکالمات در حال انجام را می دهد.
ما همچنین از ArtifactService برای مدیریت فایل آپلود شده استفاده می کنیم. شما می توانید جزئیات بیشتر در مورد ADK Session و Artifacts را در اینجا بخوانید
... @asynccontextmanager async def lifespan(app: FastAPI): # Initialize service contexts during application startup app_contexts.session_service = InMemorySessionService() app_contexts.artifact_service = GcsArtifactService( bucket_name=SETTINGS.STORAGE_BUCKET_NAME ) app_contexts.expense_manager_agent_runner = Runner( agent=expense_manager_agent, # The agent we want to run app_name=APP_NAME, # Associates runs with our app session_service=app_contexts.session_service, # Uses our session manager artifact_service=app_contexts.artifact_service, # Uses our artifact manager ) logger.info("Application started successfully") yield logger.info("Application shutting down") # Perform cleanup during application shutdown if necessary ...
در این دمو، ما از InMemorySessionService و GcsArtifactService برای ادغام شدن با عامل خود Runner استفاده می کنیم. از آنجایی که تاریخچه مکالمه در حافظه ذخیره می شود، پس از حذف یا راه اندازی مجدد سرویس باطن، از بین می رود. ما اینها را در چرخه عمر برنامه FastAPI راه اندازی می کنیم تا به عنوان وابستگی در مسیر /chat
تزریق شوند.
آپلود و بارگیری تصویر با GcsArtifactService
تمام تصویر آپلود شده توسط GcsArtifactService به عنوان مصنوع ذخیره می شود، می توانید این را در تابع format_user_request_to_adk_content_and_store_artifacts
در داخل utils.py بررسی کنید.
... # Prepare the user's message in ADK format and store image artifacts content = await asyncio.to_thread( format_user_request_to_adk_content_and_store_artifacts, request=request, app_name=APP_NAME, artifact_service=app_context.artifact_service, ) ...
همه درخواستهایی که توسط agent runner پردازش میشوند، باید در انواع قالببندی شوند. نوع محتوا. در داخل تابع، ما همچنین هر داده تصویر را پردازش میکنیم و شناسه آن را استخراج میکنیم تا با یک مکاننمای شناسه تصویر جایگزین شود.
مکانیزم مشابهی برای دانلود پیوستها پس از استخراج شناسههای تصویر با استفاده از regex استفاده میشود:
... sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response( final_response_text ) sanitized_text, thinking_process = extract_thinking_process(sanitized_text) # Download images from GCS and replace hash IDs with base64 data for image_hash_id in attachment_ids: # Download image data and get MIME type result = await asyncio.to_thread( download_image_from_gcs, artifact_service=app_context.artifact_service, image_hash=image_hash_id, app_name=APP_NAME, user_id=user_id, session_id=session_id, ) ...
10. آزمون ادغام
اکنون، باید چندین سرویس را در تب های مختلف کنسول ابری اجرا کنید:
- سرویس Frontend در پورت 8080 اجرا می شود
* Running on local URL: http://0.0.0.0:8080 To create a public link, set `share=True` in `launch()`.
- سرویس Backend در پورت 8081 اجرا می شود
INFO: Started server process [xxxxx] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)
در حالت فعلی، باید بتوانید تصاویر رسید خود را آپلود کنید و با دستیار از برنامه وب در پورت 8080 یکپارچه چت کنید.
روی دکمه Web Preview در قسمت بالای Cloud Shell Editor خود کلیک کنید و Preview در پورت 8080 را انتخاب کنید.
حالا بیایید با دستیار تعامل داشته باشیم!
رسیدهای زیر را دانلود کنید. این محدوده تاریخ داده های رسید بین سال های 2023-2024 است و از دستیار بخواهید آن را ذخیره/بارگذاری کند
- درایو رسید (منبع مجموعه دادههای چهره در آغوش گرفته
mousserlane/id_receipt_dataset
)
چیزهای مختلف بپرس
- "تجزیه هزینه های ماهانه در طول 2023-2024 را به من بدهید"
- "قبض تراکنش قهوه را به من نشان بده"
- "فایل رسید از Yakiniku Like به من بدهید"
- و غیره
در اینجا چند قطعه از تعامل موفق است
11. استقرار در Cloud Run
اکنون، البته ما می خواهیم از هر کجا به این برنامه شگفت انگیز دسترسی داشته باشیم. برای انجام این کار، می توانیم این برنامه را بسته بندی کنیم و آن را در Cloud Run مستقر کنیم. به خاطر این نسخه ی نمایشی، این سرویس به عنوان یک سرویس عمومی که برای دیگران قابل دسترسی است در معرض دید قرار می گیرد. با این حال، به خاطر داشته باشید که این بهترین روش برای این نوع برنامه ها نیست، زیرا برای برنامه های شخصی مناسب تر است
در این کد لبه هر دو سرویس frontend و backend را در 1 ظرف قرار می دهیم. برای مدیریت هر دو سرویس به کمک سرپرست نیاز داریم. می توانید فایل supervisord.conf را بررسی کنید و Dockerfile را بررسی کنید که ما سرپرست را به عنوان نقطه ورودی تعیین کرده ایم.
در این مرحله، ما در حال حاضر تمام فایلهای مورد نیاز برای استقرار برنامههایمان در Cloud Run را داریم، اجازه دهید آن را مستقر کنیم. به ترمینال Cloud Shell بروید و مطمئن شوید که پروژه فعلی برای پروژه فعال شما پیکربندی شده است، در غیر این صورت از دستور gcloud configure برای تنظیم شناسه پروژه استفاده کرده اید:
gcloud config set project [PROJECT_ID]
سپس دستور زیر را اجرا کنید تا آن را در Cloud Run اجرا کنید.
gcloud run deploy personal-expense-assistant \
--source . \
--port=8080 \
--allow-unauthenticated \
--env-vars-file=settings.yaml \
--memory 1024Mi \
--region us-central1
اگر از شما خواسته شد که ایجاد یک رجیستری مصنوع را برای مخزن docker تأیید کنید، فقط به Y پاسخ دهید. توجه داشته باشید که ما در اینجا اجازه دسترسی غیرقانونی را می دهیم زیرا این یک برنامه آزمایشی است. توصیه این است که از احراز هویت مناسب برای برنامه های تجاری و تولیدی خود استفاده کنید.
پس از تکمیل استقرار، باید پیوندی شبیه به زیر دریافت کنید:
https://personal-expense-assistant-*******.us-central1.run.app
ادامه دهید و از برنامه خود از پنجره ناشناس یا دستگاه تلفن همراه خود استفاده کنید. از قبل باید زنده باشد.
12. چالش
اکنون زمان آن است که مهارت های اکتشافی خود را بدرخشید و صیقل دهید. آیا آنچه را که برای تغییر کد لازم است در اختیار دارید تا باطن بتواند چندین کاربر را در خود جای دهد؟ چه اجزایی باید به روز شوند؟
13. پاکسازی کنید
برای جلوگیری از تحمیل هزینه به حساب Google Cloud خود برای منابع مورد استفاده در این Codelab، این مراحل را دنبال کنید:
- در کنسول Google Cloud، به صفحه مدیریت منابع بروید.
- در لیست پروژه، پروژه ای را که می خواهید حذف کنید انتخاب کنید و سپس روی Delete کلیک کنید.
- در محاوره، شناسه پروژه را تایپ کنید و سپس روی Shut down کلیک کنید تا پروژه حذف شود.
- یا میتوانید به Cloud Run در کنسول بروید، سرویسی را که به تازگی مستقر کردهاید انتخاب کرده و حذف کنید.