1. مقدمة
هل شعرت يومًا بالإحباط والكسل الشديد لإدارة جميع نفقاتك الشخصية؟ أنا أيضًا. لهذا السبب، سننشئ في هذا الدليل التعليمي عن البرمجة مساعدًا لإدارة النفقات الشخصية، وهو مستند إلى Gemini 2.5 لتنفيذ جميع المهام نيابةً عنا. بدءًا من إدارة الإيصالات المحمَّلة ووصولاً إلى تحليل ما إذا كنت قد أنفقت الكثير من المال لشراء قهوة، يمكنك إجراء الكثير من الإجراءات.
يمكن الوصول إلى هذا المساعد من خلال متصفّح الويب في شكل واجهة ويب للمحادثة، حيث يمكنك التواصل معه وتحميل بعض صور الإيصالات وطلب تخزينها، أو يمكنك البحث عن بعض الإيصالات للحصول على الملف وإجراء بعض تحليلات النفقات. وكل ذلك مستنِد إلى إطار عمل Google Agent Development Kit.
تم تقسيم التطبيق نفسه إلى خدمتَين: الواجهة الأمامية والخلفية، ما يتيح لك إنشاء نموذج أولي سريع وتجربته، وفهم كيفية دمج واجهة برمجة التطبيقات لكلتا الخدمتَين.
من خلال ورشة رموز البرامج، ستطبّق نهجًا خطوة بخطوة على النحو التالي:
- تجهيز مشروعك على Google Cloud وتفعيل جميع واجهات برمجة التطبيقات المطلوبة فيه
- إعداد حزمة على Google Cloud Storage وقاعدة بيانات على Firestore
- إنشاء فهرسة Firestore
- إعداد مساحة العمل لبيئة الترميز
- تنظيم رمز المصدر الخاص بوكيل ADK والأدوات والطلبات وما إلى ذلك
- اختبار الوكيل باستخدام واجهة مستخدم تطوير الويب على ADK
- أنشئ خدمة الواجهة الأمامية - واجهة المحادثة باستخدام مكتبة Gradio لإرسال بعض طلبات البحث وتحميل صور الإيصالات
- إنشاء خدمة الخلفية - خادم HTTP باستخدام FastAPI الذي يتضمّن رمز وكيل ADK وSessionService وArtifact Service
- إدارة متغيّرات البيئة وإعداد الملفات المطلوبة لنشر التطبيق على Cloud Run
- نشر التطبيق على Cloud Run
نظرة عامة على البنية
المتطلبات الأساسية
- الشعور بالارتياح عند العمل باستخدام لغة Python
- فهم أساسي لبنية الحزمة الكاملة باستخدام خدمة HTTP
ما ستتعرّف عليه
- إنشاء نماذج أولية لواجهة الويب باستخدام Gradio
- تطوير خدمة الخلفية باستخدام FastAPI وPydantic
- تصميم وكيل ADK مع الاستفادة من إمكاناته المتعددة
- استخدام الأداة
- إدارة الجلسات والعناصر
- استخدام دالة الاستدعاء لتعديل الإدخال قبل إرساله إلى Gemini
- استخدام BuiltInPlanner لتحسين تنفيذ المهام من خلال التخطيط
- تصحيح الأخطاء بسرعة من خلال واجهة الويب المحلية لخدمة ADK
- استراتيجية لتحسين التفاعل المتعدّد الوسائط من خلال تحليل المعلومات واستردادها من خلال هندسة الطلبات وتعديل طلب Gemini باستخدام ميزة ADK callback
- إنشاء مُحسَّن لاسترداد المعلومات من خلال Firestore بصفتها قاعدة بيانات متّجه
- إدارة متغيّرات البيئة في ملف YAML باستخدام Pydantic-settings
- نشر التطبيق على Cloud Run باستخدام Dockerfile وتوفير متغيرات البيئة باستخدام ملف YAML
المتطلبات
- متصفّح الويب Chrome
- حساب Gmail
- مشروع على Cloud تم تفعيل الفوترة فيه
تم تصميم هذا المختبر البرمجي للمطوّرين من جميع المستويات (بما في ذلك المبتدئين)، ويستخدم لغة Python في نموذج تطبيقه. ومع ذلك، لا يُشترط معرفة Python لفهم المفاهيم المعروضة.
2. قبل البدء
اختيار مشروع نشط في Cloud Console
تفترض ورشة رموز البرامج هذه أنّ لديك مشروعًا على Google Cloud مفعّل فيه نظام الفوترة. إذا لم يكن لديك التطبيق بعد، يمكنك اتّباع التعليمات أدناه للبدء.
- في Google Cloud Console، في صفحة أداة اختيار المشاريع، اختَر مشروعًا على Google Cloud أو أنشِئه.
- تأكَّد من تفعيل الفوترة لمشروعك على Cloud. تعرَّف على كيفية التحقّق مما إذا كانت الفوترة مفعَّلة في أحد المشاريع.
إعداد قاعدة بيانات Firestore
بعد ذلك، سنحتاج أيضًا إلى إنشاء قاعدة بيانات Firestore. Firestore في الوضع الأصلي هو قاعدة بيانات مستندات NoSQL تم إنشاؤها للتوسّع التلقائي والأداء العالي وسهولة تطوير التطبيقات. ويمكن أن تعمل أيضًا كقاعدة بيانات متجهات يمكنها دعم تقنية "الإنشاء المعزّز لاسترداد المعلومات" في مختبرنا.
- ابحث عن firestore في شريط البحث، ثم انقر على منتج Firestore.
- بعد ذلك، انقر على الزر إنشاء قاعدة بيانات Firestore.
- استخدِم (default) كاسم معرّف قاعدة البيانات، واترك الإصدار العادي محدّدًا. لأغراض هذا العرض التوضيحي في المختبر، استخدِم Firestore Native مع قواعد أمان Open.
- ستلاحظ أيضًا أنّ قاعدة البيانات هذه تحتوي على استخدام الطبقة المجانية YEAY! بعد ذلك، انقر على زر إنشاء قاعدة بيانات.
بعد تنفيذ هذه الخطوات، من المفترض أن تتم إعادة توجيهك إلى قاعدة بيانات Firestore التي أنشأتها للتو.
إعداد مشروع Cloud في Cloud Shell Terminal
- ستستخدم Cloud Shell، وهي بيئة سطر أوامر تعمل في Google Cloud ومزوّدة مسبقًا بـ bq. انقر على "تفعيل Cloud Shell" في أعلى "وحدة تحكّم Google Cloud".
- بعد الاتصال بخدمة Cloud Shell، عليك التحقّق من أنّك سبق أن تم مصادقة حسابك وأنّ المشروع قد تم ضبطه على معرّف مشروعك باستخدام الأمر التالي:
gcloud auth list
- شغِّل الأمر التالي في Cloud Shell للتأكّد من أنّ الأمر gcloud يعرف مشروعك.
gcloud config list project
- إذا لم يتم ضبط مشروعك، استخدِم الأمر التالي لضبطه:
gcloud config set project <YOUR_PROJECT_ID>
بدلاً من ذلك، يمكنك أيضًا الاطّلاع على معرّف PROJECT_ID
في وحدة التحكّم.
انقر عليه وستظهر لك كل بيانات مشروعك ورقم تعريفه على الجانب الأيمن.
- فعِّل واجهات برمجة التطبيقات المطلوبة من خلال الأمر الموضَّح أدناه. قد تستغرق هذه العملية بضع دقائق، لذا يُرجى الانتظار.
gcloud services enable aiplatform.googleapis.com \
firestore.googleapis.com \
run.googleapis.com \
cloudbuild.googleapis.com \
cloudresourcemanager.googleapis.com
عند تنفيذ الأمر بنجاح، من المفترض أن تظهر لك رسالة مشابهة للرسالة الموضّحة أدناه:
Operation "operations/..." finished successfully.
يمكنك استخدام وحدة التحكّم للبحث عن كل منتج أو استخدام هذا الرابط كبديل لأمر gcloud.
إذا فاتتك أي واجهة برمجة تطبيقات، يمكنك تفعيلها في أي وقت أثناء عملية التنفيذ.
راجِع المستندات لمعرفة أوامر gcloud وكيفية استخدامها.
إعداد حزمة Google Cloud Storage
بعد ذلك، سنحتاج إلى إعداد حزمة GCS من المحطة الطرفية نفسها لتخزين الملف الذي تم تحميله. نفِّذ الأمر التالي لإنشاء الحزمة.
gsutil mb -l us-central1 gs://personal-expense-assistant-receipts
سيظهر هذا الناتج
Creating gs://personal-expense-assistant-receipts/...
يمكنك التحقّق من ذلك من خلال الانتقال إلى قائمة التنقّل في أعلى يمين المتصفّح واختيار مساحة التخزين في السحابة الإلكترونية -> الحزمة.
إنشاء فهرس Firestore للبحث
Firestore هي قاعدة بيانات NoSQL بشكلٍ أساسي، ما يوفر أداءً ممتازًا ومرونة في نموذج البيانات، ولكنّها تفرض قيودًا عند إجراء طلبات بحث معقّدة. بما أنّنا نخطّط لاستخدام بعض طلبات البحث المركبة متعددة الحقول والبحث بالاستناد إلى المتجهات، سنحتاج إلى إنشاء بعض الفهارس أولاً. يمكنك الاطّلاع على مزيد من التفاصيل في هذه المستندات.
- نفِّذ الأمر التالي لإنشاء فهرس لتفعيل طلبات البحث المركبة.
gcloud firestore indexes composite create \
--collection-group=personal-expense-assistant-receipts \
--field-config field-path=total_amount,order=ASCENDING \
--field-config field-path=transaction_time,order=ASCENDING \
--field-config field-path=__name__,order=ASCENDING \
--database="(default)"
- وشغِّل هذا الإجراء لتفعيل البحث بالاستناد إلى المتجهات.
gcloud firestore indexes composite create \
--collection-group="personal-expense-assistant-receipts" \
--query-scope=COLLECTION \
--field-config field-path="embedding",vector-config='{"dimension":"768", "flat": "{}"}' \
--database="(default)"
يمكنك التحقّق من الفهرس الذي تم إنشاؤه من خلال الانتقال إلى Firestore في وحدة تحكّم السحابة الإلكترونية والنقر على مثيل قاعدة البيانات (default) واختيار الفهارس في شريط التنقّل.
الانتقال إلى محرِّر Cloud Shell وإعداد دليل عمل التطبيق
الآن، يمكننا إعداد محرِّر الرموز البرمجية لإجراء بعض عمليات الترميز. سنستخدم "محرر Cloud Shell" لإجراء ذلك.
- انقر على الزر Open Editor (فتح المحرِّر)، سيؤدي ذلك إلى فتح محرِّر Cloud Shell، ويمكننا كتابة الرمز هنا
- تأكَّد من ضبط مشروع Cloud Code في أسفل يمين الشاشة (شريط الحالة) في محرِّر Cloud Shell، كما هو موضّح في الصورة أدناه، ومن ضبطه على مشروع Google Cloud النشط الذي فعّلت فيه الفوترة. انقر على تفويض إذا طُلب منك ذلك. إذا اتّبعت الأمر السابق، قد يشير الزرّ أيضًا مباشرةً إلى مشروعك المفعّل بدلاً من زر تسجيل الدخول.
- بعد ذلك، لننسخ دليل العمل الخاص بالنموذج لهذا الدليل التعليمي من Github، وننفِّذ الأمر التالي. سيؤدي ذلك إلى إنشاء دليل العمل في دليل personal-expense-assistant.
git clone https://github.com/alphinside/personal-expense-assistant-adk-codelab-starter.git personal-expense-assistant
- بعد ذلك، انتقِل إلى أعلى قسم من "محرر Cloud Shell" وانقر على ملف (File)->فتح مجلد (Open Folder)، وابحث عن دليل اسم المستخدم (username) وابحث عن دليل مساعِد النفقات الشخصية (personal-expense-assistant) ثم انقر على الزر حسنًا (OK). سيؤدي ذلك إلى جعل الدليل الذي تم اختياره هو الدليل الرئيسي للعمل. في هذا المثال، اسم المستخدم هو alvinprayuda، وبالتالي يظهر مسار الدليل أدناه.
من المفترض أن يظهر محرِّر Cloud Shell الآن على النحو التالي:
إعداد البيئة
إعداد بيئة Python الافتراضية
الخطوة التالية هي إعداد بيئة التطوير. يجب أن يكون المحطة الطرفية النشطة الحالية داخل الدليل العامل personal-expense-assistant. سنستخدم الإصدار 3.12 من لغة بايثون في هذا البرنامج التعليمي، وسنستخدم uv python project manager لتبسيط الحاجة إلى إنشاء إصدار بايثون وبيئة افتراضية وإدارتهما.
- إذا لم يسبق لك فتح المحطة الطرفية، افتحها بالنقر على المحطة الطرفية -> محطة طرفية جديدة، أو استخدِم Ctrl + Shift + C، وسيؤدي ذلك إلى فتح نافذة محطة طرفية في أسفل المتصفح.
- تنزيل
uv
وتثبيت python 3.12 باستخدام الأمر التالي
curl -LsSf https://astral.sh/uv/0.6.16/install.sh | sh && \
source $HOME/.local/bin/env && \
uv python install 3.12
- لنبدأ الآن بتهيئة البيئة الافتراضية باستخدام
uv
. نفِّذ الأمر التالي:
uv sync --frozen
سيؤدي ذلك إلى إنشاء الدليل .venv وتثبيت التبعيات. ستمنحك نظرة سريعة على pyproject.toml معلومات عن التبعيات المعروضة على النحو التالي:
dependencies = [ "datasets>=3.5.0", "google-adk>=0.2.0", "google-cloud-firestore>=2.20.1", "gradio>=5.23.1", "pydantic>=2.10.6", "pydantic-settings[yaml]>=2.8.1", ]
- لاختبار البيئة الافتراضية، أنشئ ملفًا جديدًا باسم main.py وانسخ الرمز البرمجي التالي.
def main():
print("Hello from personal-expense-assistant-adk!")
if __name__ == "__main__":
main()
- بعد ذلك، شغِّل الأمر التالي:
uv run main.py
ستظهر لك النتائج كما هو موضّح أدناه.
Using CPython 3.12 Creating virtual environment at: .venv Hello from personal-expense-assistant-adk!
يشير ذلك إلى أنّه يتم إعداد مشروع Python بشكل صحيح.
إعداد ملفات الضبط
سنحتاج الآن إلى إعداد ملفات الإعداد لهذا المشروع. نستخدم pydantic-settings لقراءة الإعدادات من ملف YAML.
أنشئ ملفًا باسم settings.yaml باستخدام الإعدادات التالية. انقر على ملف->ملف نصي جديد واملأ الملف بالرمز البرمجي التالي. بعد ذلك، احفظ الملف باسم settings.yaml.
GCLOUD_LOCATION: "us-central1"
GCLOUD_PROJECT_ID: "your_gcloud_project_id"
BACKEND_URL: "http://localhost:8081/chat"
STORAGE_BUCKET_NAME: "personal-expense-assistant-receipts"
DB_COLLECTION_NAME: "personal-expense-assistant-receipts"
في هذا الدليل التعليمي حول الرموز البرمجية، سنستخدم القيم التي تم ضبطها مسبقًا لكل من GCLOUD_LOCATION
,
BACKEND_URL
,
STORAGE_BUCKET_NAME
,
DB_COLLECTION_NAME
وBACKEND_URL
.
يمكننا الآن الانتقال إلى الخطوة التالية، وهي إنشاء موظّف الدعم ثم الخدمات.
3- إنشاء موظّف الدعم باستخدام Google ADK وGemini 2.5
مقدّمة عن بنية دليل ADK
لنبدأ باستكشاف الميزات التي يوفّرها ADK وكيفية إنشاء موظّف الدّعم. يمكن الوصول إلى المستندات الكاملة لواجهة برمجة التطبيقات في عنوان URL هذا . يوفّر لنا ADK العديد من الأدوات ضمن تنفيذ أوامر واجهة سطر الأوامر. في ما يلي بعض هذه الشروط :
- إعداد بنية دليل موظّفي الدعم
- تجربة التفاعل بسرعة من خلال إدخال وإخراج واجهة سطر الأوامر
- إعداد واجهة مستخدم الويب لتطوير التطبيقات على الجهاز بسرعة
الآن، لننشئ بنية دليل موظّف الدّعم باستخدام أمر واجهة سطر الأوامر. تنفيذ الأمر التالي
uv run adk create expense_manager_agent \
--model gemini-2.5-flash-preview-04-17 \
--project {your-project-id} \
--region us-central1
سيؤدي ذلك إلى إنشاء بنية دليل موظّفي الدعم التالية:
expense_manager_agent/ ├── __init__.py ├── .env ├── agent.py
وإذا فحصت init.py وagent.py، سيظهر لك هذا الرمز.
# __init__.py
from . import agent
# agent.py
from google.adk.agents import Agent
root_agent = Agent(
model='gemini-2.5-flash-preview-04-17',
name='root_agent',
description='A helpful assistant for user questions.',
instruction='Answer user questions to the best of your knowledge',
)
إنشاء وكيل "إدارة النفقات"
لنبدأ بإنشاء موظّف دعم لإدارة النفقات. افتح ملف expense_manager_agent/agent.py وانسخ الرمز البرمجي أدناه الذي سيحتوي على root_agent.
# expense_manager_agent/agent.py
from google.adk.agents import Agent
from expense_manager_agent.tools import (
store_receipt_data,
search_receipts_by_metadata_filter,
search_relevant_receipts_by_natural_language_query,
get_receipt_data_by_image_id,
)
from expense_manager_agent.callbacks import modify_image_data_in_history
import os
from settings import get_settings
from google.adk.planners import BuiltInPlanner
from google.genai import types
SETTINGS = get_settings()
os.environ["GOOGLE_CLOUD_PROJECT"] = SETTINGS.GCLOUD_PROJECT_ID
os.environ["GOOGLE_CLOUD_LOCATION"] = SETTINGS.GCLOUD_LOCATION
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"
# Get the code file directory path and read the task prompt file
current_dir = os.path.dirname(os.path.abspath(__file__))
prompt_path = os.path.join(current_dir, "task_prompt.md")
with open(prompt_path, "r") as file:
task_prompt = file.read()
root_agent = Agent(
name="expense_manager_agent",
model="gemini-2.5-flash-preview-04-17",
description=(
"Personal expense agent to help user track expenses, analyze receipts, and manage their financial records"
),
instruction=task_prompt,
tools=[
store_receipt_data,
get_receipt_data_by_image_id,
search_receipts_by_metadata_filter,
search_relevant_receipts_by_natural_language_query,
],
planner=BuiltInPlanner(
thinking_config=types.ThinkingConfig(
thinking_budget=2048,
)
),
before_model_callback=modify_image_data_in_history,
)
شرح الرمز
يحتوي هذا النص البرمجي على عملية بدء موظّف الدعم التي نبدأ فيها ما يلي:
- اضبط النموذج الذي سيتم استخدامه على
gemini-2.5-flash-preview-04-17
. - إعداد وصف موظّف الدعم والتعليمات على أنّها طلب النظام الذي يتم قراءته من
task_prompt.md
- توفير الأدوات اللازمة لدعم وظيفة موظّف الدعم
- تفعيل ميزة التخطيط قبل إنشاء الردّ النهائي أو تنفيذه باستخدام إمكانات التفكير السريع في Gemini 2.5
- إعداد اعتراض طلب إعادة الاتصال قبل إرسال طلب إلى Gemini للحد من عدد بيانات الصور المُرسَلة قبل إجراء التوقّعات
4. ضبط "أدوات موظّفي الدعم"
سيتوفّر لموظّف إدارة النفقات لدينا الإمكانات التالية:
- استخراج البيانات من صورة الإيصال وتخزين البيانات والملف
- البحث الدقيق في بيانات النفقات
- البحث السياقي في بيانات النفقات
لذلك، نحتاج إلى الأدوات المناسبة لدعم هذه الوظيفة. أنشئ ملفًا جديدًا ضمن دليل expense_manager_agent وسَمِّه tools.py وانسخ الرمز البرمجي أدناه.
# expense_manager_agent/tools.py
import datetime
from typing import Dict, List, Any
from google.cloud import firestore
from google.cloud.firestore_v1.vector import Vector
from google.cloud.firestore_v1 import FieldFilter
from google.cloud.firestore_v1.base_query import And
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
from settings import get_settings
from google import genai
SETTINGS = get_settings()
DB_CLIENT = firestore.Client(
project=SETTINGS.GCLOUD_PROJECT_ID
) # Will use "(default)" database
COLLECTION = DB_CLIENT.collection(SETTINGS.DB_COLLECTION_NAME)
GENAI_CLIENT = genai.Client(
vertexai=True, location=SETTINGS.GCLOUD_LOCATION, project=SETTINGS.GCLOUD_PROJECT_ID
)
EMBEDDING_DIMENSION = 768
EMBEDDING_FIELD_NAME = "embedding"
INVALID_ITEMS_FORMAT_ERR = """
Invalid items format. Must be a list of dictionaries with 'name', 'price', and 'quantity' keys."""
RECEIPT_DESC_FORMAT = """
Store Name: {store_name}
Transaction Time: {transaction_time}
Total Amount: {total_amount}
Currency: {currency}
Purchased Items:
{purchased_items}
Receipt Image ID: {receipt_id}
"""
def sanitize_image_id(image_id: str) -> str:
"""Sanitize image ID by removing any leading/trailing whitespace."""
if image_id.startswith("[IMAGE-"):
image_id = image_id.split("ID ")[1].split("]")[0]
return image_id.strip()
def store_receipt_data(
image_id: str,
store_name: str,
transaction_time: str,
total_amount: float,
purchased_items: List[Dict[str, Any]],
currency: str = "IDR",
) -> str:
"""
Store receipt data in the database.
Args:
image_id (str): The unique identifier of the image. For example IMAGE-POSITION 0-ID 12345,
the ID of the image is 12345.
store_name (str): The name of the store.
transaction_time (str): The time of purchase, in ISO format ("YYYY-MM-DDTHH:MM:SS.ssssssZ").
total_amount (float): The total amount spent.
purchased_items (List[Dict[str, Any]]): A list of items purchased with their prices. Each item must have:
- name (str): The name of the item.
- price (float): The price of the item.
- quantity (int, optional): The quantity of the item. Defaults to 1 if not provided.
currency (str, optional): The currency of the transaction, can be derived from the store location.
If unsure, default is "IDR".
Returns:
str: A success message with the receipt ID.
Raises:
Exception: If the operation failed or input is invalid.
"""
try:
# In case of it provide full image placeholder, extract the id string
image_id = sanitize_image_id(image_id)
# Check if the receipt already exists
doc = get_receipt_data_by_image_id(image_id)
if doc:
return f"Receipt with ID {image_id} already exists"
# Validate transaction time
if not isinstance(transaction_time, str):
raise ValueError(
"Invalid transaction time: must be a string in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
)
try:
datetime.datetime.fromisoformat(transaction_time.replace("Z", "+00:00"))
except ValueError:
raise ValueError(
"Invalid transaction time format. Must be in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
)
# Validate items format
if not isinstance(purchased_items, list):
raise ValueError(INVALID_ITEMS_FORMAT_ERR)
for _item in purchased_items:
if (
not isinstance(_item, dict)
or "name" not in _item
or "price" not in _item
):
raise ValueError(INVALID_ITEMS_FORMAT_ERR)
if "quantity" not in _item:
_item["quantity"] = 1
# Create a combined text from all receipt information for better embedding
result = GENAI_CLIENT.models.embed_content(
model="text-embedding-004",
contents=RECEIPT_DESC_FORMAT.format(
store_name=store_name,
transaction_time=transaction_time,
total_amount=total_amount,
currency=currency,
purchased_items=purchased_items,
receipt_id=image_id,
),
)
embedding = result.embeddings[0].values
doc = {
"receipt_id": image_id,
"store_name": store_name,
"transaction_time": transaction_time,
"total_amount": total_amount,
"currency": currency,
"purchased_items": purchased_items,
EMBEDDING_FIELD_NAME: Vector(embedding),
}
COLLECTION.add(doc)
return f"Receipt stored successfully with ID: {image_id}"
except Exception as e:
raise Exception(f"Failed to store receipt: {str(e)}")
def search_receipts_by_metadata_filter(
start_time: str,
end_time: str,
min_total_amount: float = -1.0,
max_total_amount: float = -1.0,
) -> str:
"""
Filter receipts by metadata within a specific time range and optionally by amount.
Args:
start_time (str): The start datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
end_time (str): The end datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
min_total_amount (float): The minimum total amount for the filter (inclusive). Defaults to -1.
max_total_amount (float): The maximum total amount for the filter (inclusive). Defaults to -1.
Returns:
str: A string containing the list of receipt data matching all applied filters.
Raises:
Exception: If the search failed or input is invalid.
"""
try:
# Validate start and end times
if not isinstance(start_time, str) or not isinstance(end_time, str):
raise ValueError("start_time and end_time must be strings in ISO format")
try:
datetime.datetime.fromisoformat(start_time.replace("Z", "+00:00"))
datetime.datetime.fromisoformat(end_time.replace("Z", "+00:00"))
except ValueError:
raise ValueError("start_time and end_time must be strings in ISO format")
# Start with the base collection reference
query = COLLECTION
# Build the composite query by properly chaining conditions
# Notes that this demo assume 1 user only,
# need to refactor the query for multiple user
filters = [
FieldFilter("transaction_time", ">=", start_time),
FieldFilter("transaction_time", "<=", end_time),
]
# Add optional filters
if min_total_amount != -1:
filters.append(FieldFilter("total_amount", ">=", min_total_amount))
if max_total_amount != -1:
filters.append(FieldFilter("total_amount", "<=", max_total_amount))
# Apply the filters
composite_filter = And(filters=filters)
query = query.where(filter=composite_filter)
# Execute the query and collect results
search_result_description = "Search by Metadata Results:\n"
for doc in query.stream():
data = doc.to_dict()
data.pop(
EMBEDDING_FIELD_NAME, None
) # Remove embedding as it's not needed for display
search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"
return search_result_description
except Exception as e:
raise Exception(f"Error filtering receipts: {str(e)}")
def search_relevant_receipts_by_natural_language_query(
query_text: str, limit: int = 5
) -> str:
"""
Search for receipts with content most similar to the query using vector search.
This tool can be use for user query that is difficult to translate into metadata filters.
Such as store name or item name which sensitive to string matching.
Use this tool if you cannot utilize the search by metadata filter tool.
Args:
query_text (str): The search text (e.g., "coffee", "dinner", "groceries").
limit (int, optional): Maximum number of results to return (default: 5).
Returns:
str: A string containing the list of contextually relevant receipt data.
Raises:
Exception: If the search failed or input is invalid.
"""
try:
# Generate embedding for the query text
result = GENAI_CLIENT.models.embed_content(
model="text-embedding-004", contents=query_text
)
query_embedding = result.embeddings[0].values
# Notes that this demo assume 1 user only,
# need to refactor the query for multiple user
vector_query = COLLECTION.find_nearest(
vector_field=EMBEDDING_FIELD_NAME,
query_vector=Vector(query_embedding),
distance_measure=DistanceMeasure.EUCLIDEAN,
limit=limit,
)
# Execute the query and collect results
search_result_description = "Search by Contextual Relevance Results:\n"
for doc in vector_query.stream():
data = doc.to_dict()
data.pop(
EMBEDDING_FIELD_NAME, None
) # Remove embedding as it's not needed for display
search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"
return search_result_description
except Exception as e:
raise Exception(f"Error searching receipts: {str(e)}")
def get_receipt_data_by_image_id(image_id: str) -> Dict[str, Any]:
"""
Retrieve receipt data from the database using the image_id.
Args:
image_id (str): The unique identifier of the receipt image. For example, if the placeholder is
[IMAGE-ID 12345], the ID to use is 12345.
Returns:
Dict[str, Any]: A dictionary containing the receipt data with the following keys:
- receipt_id (str): The unique identifier of the receipt image.
- store_name (str): The name of the store.
- transaction_time (str): The time of purchase in UTC.
- total_amount (float): The total amount spent.
- currency (str): The currency of the transaction.
- purchased_items (List[Dict[str, Any]]): List of items purchased with their details.
Returns an empty dictionary if no receipt is found.
"""
# In case of it provide full image placeholder, extract the id string
image_id = sanitize_image_id(image_id)
# Query the receipts collection for documents with matching receipt_id (image_id)
# Notes that this demo assume 1 user only,
# need to refactor the query for multiple user
query = COLLECTION.where(filter=FieldFilter("receipt_id", "==", image_id)).limit(1)
docs = list(query.stream())
if not docs:
return {}
# Get the first matching document
doc_data = docs[0].to_dict()
doc_data.pop(EMBEDDING_FIELD_NAME, None)
return doc_data
شرح الرمز
في عملية تنفيذ وظائف هذه الأدوات، نصمّم الأدوات استنادًا إلى الفكرتَين الرئيسيتَين التاليتَين:
- تحليل بيانات الإيصال وربطها بالملف الأصلي باستخدام العنصر النائب لسلاسل أرقام تعريف الصور
[IMAGE-ID <hash-of-image-1>]
- تخزين البيانات واستردادها باستخدام قاعدة بيانات Firestore
أداة "store_receipt_data"
هذه الأداة هي أداة التعرّف البصري على الحروف، وستعمل على تحليل المعلومات المطلوبة من بيانات الصورة، بالإضافة إلى التعرّف على سلسلة رقم تعريف الصورة وربطها معًا لتتم تخزينها في قاعدة بيانات Firestore.
بالإضافة إلى ذلك، تحوّل هذه الأداة أيضًا محتوى الإيصال إلى محتوى مضمّن باستخدام text-embedding-004
حتى يتم تخزين جميع البيانات الوصفية والمحتوى المضمّن والفهرسة معًا. السماح بالاستجابة المرنة من خلال طلب بحث أو بحث سياقي
بعد تنفيذ هذه الأداة بنجاح، يمكنك ملاحظة أنّه سبق أن تم فهرسة بيانات الإيصال في قاعدة بيانات Firestore كما هو موضّح أدناه.
الأداة "search_receipts_by_metadata_filter"
تحوّل هذه الأداة طلب بحث المستخدم إلى فلتر طلب بحث للبيانات الوصفية يتيح البحث حسب النطاق الزمني و/أو إجمالي المعاملات. سيعرض هذا الإجراء جميع بيانات الإيصالات المطابقة، وسنحذف حقل التضمين في هذه العملية لأنّه لا يحتاج إليه موظّف الدعم للفهم السياقي.
الأداة "search_relevant_receipts_by_natural_language_query"
هذه هي أداة "الإنشاء المعزّز لاسترداد المعلومات" (RAG). يمكن لموظف الدعم تصميم طلب بحث خاص به لاسترداد الإيصالات ذات الصلة من قاعدة بيانات المتجهات، ويمكنه أيضًا اختيار وقت استخدام هذه الأداة. إنّ السماح للموظف باتخاذ قرار مستقل بشأن استخدام أداة تحديد أولوية الطلبات هذه أو عدم استخدامها وتصميم طلبه الخاص هو أحد تعريفات نهج تحديد أولوية الطلبات من قِبل الموظف.
لا نسمح له فقط بإنشاء طلب بحث خاص به، بل نسمح له أيضًا باختيار عدد المستندات ذات الصلة التي يريد استرجاعها. بالإضافة إلى تصميم مناسب للطلبات، على سبيل المثال:
# Example prompt Always filter the result from tool search_relevant_receipts_by_natural_language_query as the returned result may contain irrelevant information
سيجعل ذلك هذه الأداة فعّالة ويمكنها البحث عن أي شيء تقريبًا، إلا أنّها قد لا تعرِض جميع النتائج المتوقّعة بسبب الطبيعة غير الدقيقة لبحث أقرب جار.
5- تعديل سياق المحادثة من خلال طلبات إعادة الاتصال
تتيح لنا Google ADK "اعتراض" وقت تشغيل موظّف الدّعم على مستويات مختلفة. يمكنك الاطّلاع على مزيد من المعلومات حول هذه الميزة التفصيلية في هذه المستندات . في هذا المختبر، نستخدم before_model_callback
لتعديل الطلب قبل إرساله إلى النموذج اللغوي الكبير لإزالة بيانات الصور في سياق سجلّ المحادثات القديم ( لا تتضمّن سوى بيانات الصور في آخر 3 تفاعلات للمستخدِم) لزيادة الكفاءة.
ومع ذلك، نريد أن يحصل موظّف الدعم على سياق بيانات الصورة عند الحاجة. لذلك، نضيف آلية لإضافة عنصر نائب لـ "معرّف الصورة" من النوع "سلسلة" بعد كل بيانات بايت للصورة في المحادثة. سيساعد ذلك موظّف الدعم في ربط رقم تعريف الصورة ببيانات الملف الفعلية التي يمكن استخدامها في وقت تخزين الصورة أو استرجاعها. ستبدو البنية على النحو التالي:
<image-byte-data-1> [IMAGE-ID <hash-of-image-1>] <image-byte-data-2> [IMAGE-ID <hash-of-image-2>] And so on..
وعندما تصبح بيانات البايت قديمة في سجلّ المحادثات، يظل معرّف السلسلة متوفّرًا لإتاحة الوصول إلى البيانات من خلال استخدام الأداة. مثال على بنية السجلّ بعد إزالة بيانات الصورة
[IMAGE-ID <hash-of-image-1>] [IMAGE-ID <hash-of-image-2>] And so on..
لنبدأ. أنشئ ملفًا جديدًا ضمن الدليل expense_manager_agent وسَمِّه callbacks.py وانسخ الرمز البرمجي أدناه.
# expense_manager_agent/callbacks.py
import hashlib
from google.genai import types
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest
def modify_image_data_in_history(
callback_context: CallbackContext, llm_request: LlmRequest
) -> None:
# The following code will modify the request sent to LLM
# We will only keep image data in the last 3 user messages using a reverse and counter approach
# Count how many user messages we've processed
user_message_count = 0
# Process the reversed list
for content in reversed(llm_request.contents):
# Only count for user manual query, not function call
if (content.role == "user") and (content.parts[0].function_response is None):
user_message_count += 1
modified_content_parts = []
# Check any missing image ID placeholder for any image data
# Then remove image data from conversation history if more than 3 user messages
for idx, part in enumerate(content.parts):
if part.inline_data is None:
modified_content_parts.append(part)
continue
if (
(idx + 1 >= len(content.parts))
or (content.parts[idx + 1].text is None)
or (not content.parts[idx + 1].text.startswith("[IMAGE-ID "))
):
# Generate hash ID for the image and add a placeholder
image_data = part.inline_data.data
hasher = hashlib.sha256(image_data)
image_hash_id = hasher.hexdigest()[:12]
placeholder = f"[IMAGE-ID {image_hash_id}]"
# Only keep image data in the last 3 user messages
if user_message_count <= 3:
modified_content_parts.append(part)
modified_content_parts.append(types.Part(text=placeholder))
else:
# Only keep image data in the last 3 user messages
if user_message_count <= 3:
modified_content_parts.append(part)
# This will modify the contents inside the llm_request
content.parts = modified_content_parts
6. الطلب
يتطلّب تصميم وكيل يتمتع بتفاعل وإمكانات معقّدة العثور على طلب توجيهي جيد بما يكفي لتوجيه الوكيل كي يتمكّن من التصرّف بالطريقة التي نريدها.
في السابق، كانت لدينا آلية حول كيفية التعامل مع بيانات الصور في سجلّ المحادثات، وكانت لدينا أيضًا أدوات قد لا تكون سهلة الاستخدام، مثل search_relevant_receipts_by_natural_language_query.
نريد أيضًا أن يتمكّن موظّف الدعم من البحث عن صورة الإيصال الصحيحة واستردادها إلينا. وهذا يعني أنّنا نحتاج إلى نقل كل هذه المعلومات بشكل صحيح في بنية طلب مناسبة.
سنطلب من موظّف الدعم تنظيم الإخراج بتنسيق Markdown التالي لتحليل عملية التفكير والردّ النهائي والمرفق ( إن وُجد).
# THINKING PROCESS Thinking process here # FINAL RESPONSE Response to the user here Attachments put inside json block { "attachments": [ "[IMAGE-ID <hash-id-1>]", "[IMAGE-ID <hash-id-2>]", ... ] }
لنبدأ بالطلب التالي لتحقيق توقّعاتنا الأولية بشأن سلوك موظّف دعم إدارة النفقات. من المفترض أن يكون ملف task_prompt.md متوفّرًا في دليل العمل الحالي، ولكن علينا نقله إلى دليل expense_manager_agent. شغِّل الأمر التالي لنقله.
mv task_prompt.md expense_manager_agent/task_prompt.md
7. اختبار الوكيل
الآن، لنحاول التواصل مع موظّف الدعم من خلال واجهة سطر الأوامر، وننفِّذ الأمر التالي:
uv run adk run expense_manager_agent
ستعرض النتيجة على النحو التالي، حيث يمكنك الدردشة بالتناوب مع موظّف الدعم، ولكن يمكنك إرسال نص فقط من خلال هذه الواجهة.
Log setup complete: /tmp/agents_log/agent.xxxx_xxx.log To access latest log: tail -F /tmp/agents_log/agent.latest.log Running agent root_agent, type exit to exit. user: hello [root_agent]: Hello there! How can I help you today? user:
بالإضافة إلى التفاعل مع واجهة سطر الأوامر، تتيح لنا أداة ADK أيضًا الحصول على واجهة مستخدم تطوير للتفاعل مع ما يحدث أثناء التفاعل وفحصه. تنفيذ الأمر التالي لبدء خادم واجهة المستخدم لتطوير التطبيقات على الجهاز
uv run adk web --port 8080
سيؤدي ذلك إلى ظهور ناتج مثل المثال التالي، ما يعني أنّه يمكننا الوصول إلى واجهة الويب.
INFO: Started server process [xxxx] INFO: Waiting for application startup. +-----------------------------------------------------------------------------+ | ADK Web Server started | | | | For local testing, access at http://localhost:8080. | +-----------------------------------------------------------------------------+ INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
الآن، للتحقّق من ذلك، انقر على الزر معاينة الويب في أعلى منطقة "محرر Cloud Shell" واختَر معاينة على المنفذ 8080.
ستظهر لك صفحة الويب التالية التي يمكنك من خلالها اختيار موظّفي الدعم المتاحين في الزر المنسدلة في أعلى يمين الصفحة ( في حالتنا، يجب أن يكون expense_manager_agent) والتفاعل مع الروبوت. ستظهر لك العديد من المعلومات حول تفاصيل السجلّ أثناء وقت تشغيل موظّف الدّعم في النافذة اليمنى.
لنحاول تنفيذ بعض الإجراءات. حمِّل مثالَي الإيصالَين التاليَين ( المصدر : مجموعات بيانات Hugging Facemousserlane/id_receipt_dataset
) . انقر بزر الماوس الأيمن على كل صورة واختَر حفظ الصورة باسم. ( سيؤدي ذلك إلى تنزيل صورة الإيصال)، ثم حمِّل الملف إلى الروبوت من خلال النقر على رمز "الاقتصاص" وقل أنّك تريد تخزين هذه الإيصالات.
بعد ذلك، جرِّب طلبات البحث التالية لإجراء بعض عمليات البحث أو استرداد الملفات.
- "تقديم تفاصيل النفقات وإجماليها خلال عام 2023"
- "أريد الحصول على ملف الإيصال من Indomaret"
عند استخدام بعض الأدوات، يمكنك فحص ما يحدث في واجهة مستخدم التطوير.
اطّلِع على ردّ موظّف الدعم وتحقّق مما إذا كان يمتثل لجميع القواعد الواردة في الطلب داخل task_prompt.py. تهانينا! لديك الآن وكيل تطوير يعمل بشكل كامل.
حان الوقت الآن لإكمالها من خلال واجهة مستخدم مناسبة وجميلة وإمكانيات لتحميل ملف الصورة وتنزيله.
8. إنشاء خدمة واجهة مستخدم باستخدام Gradio
سننشئ واجهة ويب للمحادثة تبدو على النحو التالي:
يحتوي على واجهة محادثة تتضمّن حقل إدخال للمستخدمين لإرسال النصوص وتحميل ملفات صور الإيصالات.
سننشئ خدمة الواجهة الأمامية باستخدام Gradio.
أنشئ ملفًا جديدًا، وانقر على ملف->ملف نصي جديد، واسمه frontend.py، ثم انسخ الرمز البرمجي التالي واحفظه.
import mimetypes
import gradio as gr
import requests
import base64
from typing import List, Dict, Any
from settings import get_settings
from PIL import Image
import io
from schema import ImageData, ChatRequest, ChatResponse
SETTINGS = get_settings()
def encode_image_to_base64_and_get_mime_type(image_path: str) -> ImageData:
"""Encode a file to base64 string and get MIME type.
Reads an image file and returns the base64-encoded image data and its MIME type.
Args:
image_path: Path to the image file to encode.
Returns:
ImageData object containing the base64 encoded image data and its MIME type.
"""
# Read the image file
with open(image_path, "rb") as file:
image_content = file.read()
# Get the mime type
mime_type = mimetypes.guess_type(image_path)[0]
# Base64 encode the image
base64_data = base64.b64encode(image_content).decode("utf-8")
# Return as ImageData object
return ImageData(serialized_image=base64_data, mime_type=mime_type)
def decode_base64_to_image(base64_data: str) -> Image.Image:
"""Decode a base64 string to PIL Image.
Converts a base64-encoded image string back to a PIL Image object
that can be displayed or processed further.
Args:
base64_data: Base64 encoded string of the image.
Returns:
PIL Image object of the decoded image.
"""
# Decode the base64 string and convert to PIL Image
image_data = base64.b64decode(base64_data)
image_buffer = io.BytesIO(image_data)
image = Image.open(image_buffer)
return image
def get_response_from_llm_backend(
message: Dict[str, Any],
history: List[Dict[str, Any]],
) -> List[str | gr.Image]:
"""Send the message and history to the backend and get a response.
Args:
message: Dictionary containing the current message with 'text' and optional 'files' keys.
history: List of previous message dictionaries in the conversation.
Returns:
List containing text response and any image attachments from the backend service.
"""
# Extract files and convert to base64
image_data = []
if uploaded_files := message.get("files", []):
for file_path in uploaded_files:
image_data.append(encode_image_to_base64_and_get_mime_type(file_path))
# Prepare the request payload
payload = ChatRequest(
text=message["text"],
files=image_data,
session_id="default_session",
user_id="default_user",
)
# Send request to backend
try:
response = requests.post(SETTINGS.BACKEND_URL, json=payload.model_dump())
response.raise_for_status() # Raise exception for HTTP errors
result = ChatResponse(**response.json())
if result.error:
return [f"Error: {result.error}"]
chat_responses = []
if result.thinking_process:
chat_responses.append(
gr.ChatMessage(
role="assistant",
content=result.thinking_process,
metadata={"title": "🧠 Thinking Process"},
)
)
chat_responses.append(gr.ChatMessage(role="assistant", content=result.response))
if result.attachments:
for attachment in result.attachments:
image_data = attachment.serialized_image
chat_responses.append(gr.Image(decode_base64_to_image(image_data)))
return chat_responses
except requests.exceptions.RequestException as e:
return [f"Error connecting to backend service: {str(e)}"]
if __name__ == "__main__":
demo = gr.ChatInterface(
get_response_from_llm_backend,
title="Personal Expense Assistant",
description="This assistant can help you to store receipts data, find receipts, and track your expenses during certain period.",
type="messages",
multimodal=True,
textbox=gr.MultimodalTextbox(file_count="multiple", file_types=["image"]),
)
demo.launch(
server_name="0.0.0.0",
server_port=8080,
)
بعد ذلك، يمكننا محاولة تشغيل خدمة الواجهة الأمامية باستخدام الأمر التالي. لا تنسَ إعادة تسمية الملف main.py إلى frontend.py.
uv run frontend.py
ستظهر لك نتيجة مشابهة لما يلي في وحدة تحكّم السحابة الإلكترونية.
* Running on local URL: http://0.0.0.0:8080 To create a public link, set `share=True` in `launch()`.
بعد ذلك، يمكنك التحقّق من واجهة الويب عند الضغط مع الاستمرار على Ctrl والنقر على رابط عنوان URL المحلي. بدلاً من ذلك، يمكنك أيضًا الوصول إلى تطبيق الواجهة الأمامية من خلال النقر على زر معاينة الويب في أعلى يسار "محرر السحابة الإلكترونية"، واختيار معاينة على المنفذ 8080.
ستظهر لك واجهة الويب، ولكن ستظهر لك رسالة خطأ متوقّعة عند محاولة إرسال المحادثة بسبب عدم إعداد الخدمة الخلفية بعد.
الآن، عليك السماح بتشغيل الخدمة وعدم إيقافها بعد. سنشغّل خدمة الخلفية في علامة تبويب أخرى في وحدة التحكّم.
شرح الرمز
في رمز الواجهة الأمامية هذا، نتيح للمستخدم أولاً إرسال نص وتحميل ملفات متعددة. تتيح لنا أداة Gradio إنشاء هذا النوع من الوظائف باستخدام طريقة gr.ChatInterface مع gr.MultimodalTextbox.
الآن، قبل إرسال الملف والنص إلى الخلفية، علينا معرفة نوع MIME للملف لأنّه مطلوب في الخلفية. نحتاج أيضًا إلى ترميز وحدات بت ملف الصورة بترميز base64 وإرسالها مع mimetype.
class ImageData(BaseModel): """Model for image data with hash identifier. Attributes: serialized_image: Optional Base64 encoded string of the image content. mime_type: MIME type of the image. """ serialized_image: str mime_type: str
يتم تحديد المخطّط المستخدَم للتفاعل بين الواجهة الأمامية والخلفية في schema.py. نستخدم Pydantic BaseModel لفرض التحقّق من صحة البيانات في المخطّط.
عند تلقّي الردّ، نفصل بين الجزء المتعلق بالتفكير والردّ النهائي والمرفق. وبالتالي، يمكننا استخدام مكوّن Gradio لعرض كل مكوّن باستخدام مكوّن واجهة المستخدم.
class ChatResponse(BaseModel): """Model for a chat response. Attributes: response: The text response from the model. thinking_process: Optional thinking process of the model. attachments: List of image data to be displayed to the user. error: Optional error message if something went wrong. """ response: str thinking_process: str = "" attachments: List[ImageData] = [] error: Optional[str] = None
9- إنشاء خدمة خلفية باستخدام FastAPI
بعد ذلك، سنحتاج إلى إنشاء الخلفية التي يمكنها بدء تشغيل "وكيل الدعم" مع المكونات الأخرى لتتمكّن من تنفيذ وقت تشغيل "وكيل الدعم".
أنشئ ملفًا جديدًا، وانقر على ملف->ملف نصي جديد، وانسخ الرمز البرمجي التالي والصقه، ثم احفظه باسم backend.py.
from expense_manager_agent.agent import root_agent as expense_manager_agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.events import Event
from fastapi import FastAPI, Body, Depends
from typing import AsyncIterator
from types import SimpleNamespace
import uvicorn
from contextlib import asynccontextmanager
import asyncio
from utils import (
extract_attachment_ids_and_sanitize_response,
download_image_from_gcs,
extract_thinking_process,
format_user_request_to_adk_content_and_store_artifacts,
)
from schema import ImageData, ChatRequest, ChatResponse
import logger
from google.adk.artifacts import GcsArtifactService
from settings import get_settings
SETTINGS = get_settings()
APP_NAME = "expense_manager_app"
# Application state to hold service contexts
class AppContexts(SimpleNamespace):
"""A class to hold application contexts with attribute access"""
session_service: InMemorySessionService = None
artifact_service: GcsArtifactService = None
expense_manager_agent_runner: Runner = None
# Initialize application state
app_contexts = AppContexts()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialize service contexts during application startup
app_contexts.session_service = InMemorySessionService()
app_contexts.artifact_service = GcsArtifactService(
bucket_name=SETTINGS.STORAGE_BUCKET_NAME
)
app_contexts.expense_manager_agent_runner = Runner(
agent=expense_manager_agent, # The agent we want to run
app_name=APP_NAME, # Associates runs with our app
session_service=app_contexts.session_service, # Uses our session manager
artifact_service=app_contexts.artifact_service, # Uses our artifact manager
)
logger.info("Application started successfully")
yield
logger.info("Application shutting down")
# Perform cleanup during application shutdown if necessary
# Helper function to get application state as a dependency
async def get_app_contexts() -> AppContexts:
return app_contexts
# Create FastAPI app
app = FastAPI(title="Personal Expense Assistant API", lifespan=lifespan)
@app.post("/chat", response_model=ChatResponse)
async def chat(
request: ChatRequest = Body(...),
app_context: AppContexts = Depends(get_app_contexts),
) -> ChatResponse:
"""Process chat request and get response from the agent"""
# Prepare the user's message in ADK format and store image artifacts
content = await asyncio.to_thread(
format_user_request_to_adk_content_and_store_artifacts,
request=request,
app_name=APP_NAME,
artifact_service=app_context.artifact_service,
)
final_response_text = "Agent did not produce a final response." # Default
# Use the session ID from the request or default if not provided
session_id = request.session_id
user_id = request.user_id
# Create session if it doesn't exist
if not app_context.session_service.get_session(
app_name=APP_NAME, user_id=user_id, session_id=session_id
):
app_context.session_service.create_session(
app_name=APP_NAME, user_id=user_id, session_id=session_id
)
try:
# Process the message with the agent
# Type annotation: runner.run_async returns an AsyncIterator[Event]
events_iterator: AsyncIterator[Event] = (
app_context.expense_manager_agent_runner.run_async(
user_id=user_id, session_id=session_id, new_message=content
)
)
async for event in events_iterator: # event has type Event
# Key Concept: is_final_response() marks the concluding message for the turn
if event.is_final_response():
if event.content and event.content.parts:
# Extract text from the first part
final_response_text = event.content.parts[0].text
elif event.actions and event.actions.escalate:
# Handle potential errors/escalations
final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
break # Stop processing events once the final response is found
logger.info(
"Received final response from agent", raw_final_response=final_response_text
)
# Extract and process any attachments and thinking process in the response
base64_attachments = []
sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response(
final_response_text
)
sanitized_text, thinking_process = extract_thinking_process(sanitized_text)
# Download images from GCS and replace hash IDs with base64 data
for image_hash_id in attachment_ids:
# Download image data and get MIME type
result = await asyncio.to_thread(
download_image_from_gcs,
artifact_service=app_context.artifact_service,
image_hash=image_hash_id,
app_name=APP_NAME,
user_id=user_id,
session_id=session_id,
)
if result:
base64_data, mime_type = result
base64_attachments.append(
ImageData(serialized_image=base64_data, mime_type=mime_type)
)
logger.info(
"Processed response with attachments",
sanitized_response=sanitized_text,
thinking_process=thinking_process,
attachment_ids=attachment_ids,
)
return ChatResponse(
response=sanitized_text,
thinking_process=thinking_process,
attachments=base64_attachments,
)
except Exception as e:
logger.error("Error processing chat request", error_message=str(e))
return ChatResponse(
response="", error=f"Error in generating response: {str(e)}"
)
# Only run the server if this file is executed directly
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8081)
بعد ذلك، يمكننا محاولة تشغيل خدمة الخلفية. تذكَّر أنّنا في الخطوة السابقة شغّلنا خدمة الواجهة الأمامية، والآن سنحتاج إلى فتح وحدة تحكّم جديدة ومحاولة تشغيل خدمة الخلفية هذه.
- أنشئ محطة طرفية جديدة. انتقِل إلى وحدة التحكّم في أسفل الشاشة وابحث عن الزر "+" لإنشاء وحدة تحكّم جديدة. يمكنك بدلاً من ذلك استخدام Ctrl + Shift + C لفتح وحدة طرفية جديدة.
- بعد ذلك، تأكَّد من أنّك في دليل العمل personal-expense-assistant ثم نفِّذ الأمر التالي:
uv run backend.py
- في حال نجاح العملية، ستظهر النتيجة على النحو التالي:
INFO: Started server process [xxxxx] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)
شرح الرمز
بدء ADK Agent وSessionService وArtifactService
لتشغيل موظّف الدعم في خدمة الخلفية، سنحتاج إلى إنشاء Runner يستخدِم كلّ من SessionService وموظّف الدعم. ستدير SessionService سجلّ المحادثات وحالتها، وبالتالي عند دمجها مع Runner، ستمنح موظّف الدعم إمكانية تلقّي سياق المحادثات الجارية.
نستخدم أيضًا ArtifactService لمعالجة الملف الذي تم تحميله. يمكنك الاطّلاع على مزيد من التفاصيل حول جلسة والعناصر في "إعلانات شبكة البحث".
... @asynccontextmanager async def lifespan(app: FastAPI): # Initialize service contexts during application startup app_contexts.session_service = InMemorySessionService() app_contexts.artifact_service = GcsArtifactService( bucket_name=SETTINGS.STORAGE_BUCKET_NAME ) app_contexts.expense_manager_agent_runner = Runner( agent=expense_manager_agent, # The agent we want to run app_name=APP_NAME, # Associates runs with our app session_service=app_contexts.session_service, # Uses our session manager artifact_service=app_contexts.artifact_service, # Uses our artifact manager ) logger.info("Application started successfully") yield logger.info("Application shutting down") # Perform cleanup during application shutdown if necessary ...
في هذا العرض التوضيحي، نستخدم InMemorySessionService وGcsArtifactService للدمج مع وكيلنا Runner. بما أنّه يتم تخزين سجلّ المحادثات في الذاكرة، سيتم فقدانه بعد إيقاف خدمة الخلفية أو إعادة تشغيلها. نُنشئ هذه العناصر داخل دورة حياة تطبيق FastAPI ليتم إدراجها كعنصر تابع في مسار /chat
.
تحميل الصور وتنزيلها باستخدام GcsArtifactService
سيتم تخزين كل الصور المحمَّلة كعناصر من خلال GcsArtifactService، ويمكنك التحقّق من ذلك داخل دالة format_user_request_to_adk_content_and_store_artifacts
داخل utils.py.
... # Prepare the user's message in ADK format and store image artifacts content = await asyncio.to_thread( format_user_request_to_adk_content_and_store_artifacts, request=request, app_name=APP_NAME, artifact_service=app_context.artifact_service, ) ...
يجب تنسيق جميع الطلبات التي ستعالجها أداة "تشغيل موظّفي الدعم" إلى النوع types.Content. داخل الدالة، نعالج أيضًا بيانات كل صورة ونستخرج معرّفها ليتم استبداله بعنصر نائب لمعرّف الصورة.
يتم استخدام آلية مشابهة لتنزيل المرفقات بعد استخراج أرقام تعريف الصور باستخدام التعبير العادي:
... sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response( final_response_text ) sanitized_text, thinking_process = extract_thinking_process(sanitized_text) # Download images from GCS and replace hash IDs with base64 data for image_hash_id in attachment_ids: # Download image data and get MIME type result = await asyncio.to_thread( download_image_from_gcs, artifact_service=app_context.artifact_service, image_hash=image_hash_id, app_name=APP_NAME, user_id=user_id, session_id=session_id, ) ...
10. اختبار الدمج
من المفترض أن تكون لديك الآن خدمات متعددة يتم تشغيلها في علامات تبويب مختلفة في Cloud Console:
- تشغيل خدمة الواجهة الأمامية على المنفذ 8080
* Running on local URL: http://0.0.0.0:8080 To create a public link, set `share=True` in `launch()`.
- تشغيل خدمة الخلفية على المنفذ 8081
INFO: Started server process [xxxxx] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)
في الحالة الحالية، من المفترض أن تتمكّن من تحميل صور الإيصالات والدردشة بسلاسة مع المساعد من تطبيق الويب على المنفذ 8080.
انقر على الزر معاينة الويب في أعلى منطقة "محرر Cloud Shell" واختَر معاينة على المنفذ 8080.
لنتفاعل الآن مع "مساعد Google".
نزِّل الإيصالات التالية. يتراوح النطاق الزمني لبيانات الإيصالات هذه بين عامَي 2023 و2024، ويُرجى توجيه المساعد إلى تخزينها أو تحميلها.
- Receipt Drive ( مصدر مجموعات بيانات Hugging Face
mousserlane/id_receipt_dataset
)
طرح أسئلة متنوعة
- "أريد تقسيم النفقات الشهرية خلال الفترة من 2023 إلى 2024"
- "أريد الاطّلاع على إيصال معاملة شراء القهوة"
- "أريد الحصول على ملف الإيصال من Yakiniku Like"
- Etc
في ما يلي بعض المقتطفات عن التفاعل الناجح:
11. النشر على Cloud Run
الآن، نريد بالطبع الوصول إلى هذا التطبيق الرائع من أي مكان. لتنفيذ ذلك، يمكننا تجميع هذا التطبيق ونشره على Cloud Run. لأغراض هذا العرض التجريبي، سيتم عرض هذه الخدمة كخدمة عامة يمكن للآخرين الوصول إليها. ومع ذلك، يُرجى العِلم أنّ هذه ليست أفضل ممارسة لهذا النوع من التطبيقات لأنّها أكثر ملاءمةً للتطبيقات الشخصية.
في هذا الدليل التعليمي حول الرموز البرمجية، سنضع كلّ من خدمة الواجهة الأمامية وخدمة الخلفية في حاوية واحدة. سنحتاج إلى مساعدة supervisord لإدارة كلتا الخدمتَين. يمكنك فحص ملف supervisord.conf والاطّلاع على Dockerfile الذي ضبطنا فيه supervisord كنقطة دخول.
في هذه المرحلة، لدينا جميع الملفات اللازمة لنشر تطبيقاتنا على Cloud Run، لننشرها. انتقِل إلى Cloud Shell Terminal وتأكَّد من ضبط المشروع الحالي على مشروعك النشط، وإذا لم يكن الأمر كذلك، عليك استخدام الأمر gcloud configure لضبط رقم تعريف المشروع:
gcloud config set project [PROJECT_ID]
بعد ذلك، نفِّذ الأمر التالي لنشره على Cloud Run.
gcloud run deploy personal-expense-assistant \
--source . \
--port=8080 \
--allow-unauthenticated \
--env-vars-file=settings.yaml \
--memory 1024Mi \
--region us-central1
إذا طُلب منك الموافقة على إنشاء سجلّ للعناصر في مستودع Docker، ما عليك سوى الإجابة بنعم. يُرجى العِلم أنّنا نسمح بالوصول غير المُعتمَد هنا لأنّ هذا تطبيق تجريبي. ننصحك باستخدام المصادقة المناسبة لتطبيقات المؤسسات وتطبيقات الإنتاج.
بعد اكتمال عملية النشر، من المفترض أن يصلك رابط مشابه لما يلي:
https://personal-expense-assistant-*******.us-central1.run.app
يمكنك استخدام تطبيقك من نافذة التصفّح المتخفي أو من جهازك الجوّال. من المفترض أن يكون التطبيق متوفّرًا.
12. التحدي
حان الوقت الآن لإبراز مهاراتك في الاستكشاف وتحسينها. هل لديك ما يلزم لتغيير الرمز حتى تتمكّن الخلفية من استيعاب مستخدمين متعدّدين؟ ما هي المكوّنات التي يجب تعديلها؟
13. تَنظيم
لتجنُّب تحصيل رسوم من حسابك على Google Cloud مقابل الموارد المستخدَمة في هذا الدليل التعليمي للترميز، اتّبِع الخطوات التالية:
- في وحدة تحكّم Google Cloud، انتقِل إلى صفحة إدارة الموارد.
- في قائمة المشاريع، اختَر المشروع الذي تريد حذفه، ثم انقر على حذف.
- في مربّع الحوار، اكتب رقم تعريف المشروع، ثم انقر على إيقاف لحذف المشروع.
- بدلاً من ذلك، يمكنك الانتقال إلى Cloud Run في وحدة التحكّم، واختيار الخدمة التي تم نشرها للتو وحذفها.