Kỹ thuật RAG nâng cao

1. Giới thiệu

Tổng quan

Cơ chế Tạo sinh tăng cường truy xuất (RAG) giúp cải thiện câu trả lời của Mô hình ngôn ngữ lớn (LLM) bằng cách dựa vào kiến thức bên ngoài. Tuy nhiên, để xây dựng một hệ thống RAG sẵn sàng cho hoạt động sản xuất, bạn cần làm nhiều việc hơn là chỉ tìm kiếm vectơ đơn giản. Bạn phải tối ưu hoá cách dữ liệu được nhập, cách xếp hạng kết quả phù hợp và cách xử lý cụm từ tìm kiếm của người dùng.

Trong phòng thí nghiệm toàn diện này, bạn sẽ tạo một ứng dụng RAG mạnh mẽ bằng cách sử dụng Cloud SQL cho PostgreSQL (mở rộng bằng pgvector) và Vertex AI. Bạn sẽ tìm hiểu 3 kỹ thuật nâng cao:

  1. Chiến lược phân đoạn: Bạn sẽ quan sát cách các phương pháp phân chia văn bản khác nhau (Ký tự, Đệ quy, Mã thông báo) ảnh hưởng đến chất lượng truy xuất.
  2. Sắp xếp lại: Bạn sẽ triển khai Vertex AI Reranker để tinh chỉnh kết quả tìm kiếm và giải quyết vấn đề "mất dấu giữa chừng".
  3. Biến đổi câu hỏi: Bạn sẽ dùng Gemini để tối ưu hoá câu hỏi của người dùng thông qua các kỹ thuật như HyDE (Hypothetical Document Embeddings – Nhúng tài liệu giả định) và Step-back Prompting (Gợi ý từng bước).

Bạn sẽ thực hiện

  • Thiết lập một phiên bản Cloud SQL cho PostgreSQL bằng pgvector.
  • Xây dựng một quy trình truyền dữ liệu chia văn bản thành các đoạn bằng nhiều chiến lược và lưu trữ các mục nhúng trong Cloud SQL.
  • Thực hiện tìm kiếm ngữ nghĩa và so sánh chất lượng của kết quả từ các phương pháp phân đoạn khác nhau.
  • Tích hợp một Reranker để sắp xếp lại các tài liệu đã truy xuất dựa trên mức độ liên quan.
  • Triển khai các hoạt động chuyển đổi truy vấn dựa trên LLM để cải thiện khả năng truy xuất cho các câu hỏi phức tạp hoặc không rõ ràng.

Kiến thức bạn sẽ học được

  • Cách sử dụng LangChain với Vertex AICloud SQL.
  • Tác động của các bộ tách văn bản Ký tự, Đệ quyMã thông báo.
  • Cách triển khai tính năng Tìm kiếm vectơ trong PostgreSQL.
  • Cách sử dụng ContextualCompressionRetriever để xếp hạng lại.
  • Cách triển khai HyDEStep-back Prompting.

2. Thiết lập dự án

Tài khoản Google

Nếu chưa có Tài khoản Google cá nhân, bạn phải tạo một Tài khoản Google.

Sử dụng tài khoản cá nhân thay vì tài khoản do nơi làm việc hoặc trường học cấp.

Đăng nhập vào Google Cloud Console

Đăng nhập vào Google Cloud Console bằng Tài khoản Google cá nhân.

Bật thanh toán

Đổi 5 USD tín dụng Google Cloud (không bắt buộc)

Để tham gia hội thảo này, bạn cần có một Tài khoản thanh toán có sẵn một số tín dụng. Nếu dự định sử dụng hệ thống thanh toán của riêng mình, bạn có thể bỏ qua bước này.

  1. Nhấp vào đường liên kết này rồi đăng nhập bằng Tài khoản Google cá nhân. Bạn sẽ thấy nội dung như sau: Nhấp vào đây để chuyển đến trang ghi công
  2. Nhấp vào nút NHẤP VÀO ĐÂY ĐỂ XEM CÁC KHOẢN TÍN DỤNG. Thao tác này sẽ đưa bạn đến một trang để thiết lập hồ sơ thanh toán Thiết lập trang hồ sơ thanh toán
  3. Nhấp vào Xác nhận. Giờ đây, bạn đã kết nối với một Tài khoản thanh toán dùng thử trên Google Cloud Platform. Ảnh chụp màn hình tổng quan về việc thanh toán

Thiết lập tài khoản thanh toán cá nhân

Nếu thiết lập thông tin thanh toán bằng tín dụng Google Cloud, bạn có thể bỏ qua bước này.

Để thiết lập tài khoản thanh toán cá nhân, hãy truy cập vào đây để bật tính năng thanh toán trong Cloud Console.

Một số lưu ý:

  • Việc hoàn thành bài thực hành này sẽ tốn ít hơn 1 USD cho các tài nguyên trên đám mây.
  • Bạn có thể làm theo các bước ở cuối bài thực hành này để xoá tài nguyên nhằm tránh bị tính thêm phí.
  • Người dùng mới đủ điều kiện dùng thử miễn phí 300 USD.

Tạo dự án (không bắt buộc)

Nếu bạn không có dự án hiện tại muốn sử dụng cho lớp học này, hãy tạo một dự án mới tại đây.

3. Mở Trình chỉnh sửa Cloud Shell

  1. Nhấp vào đường liên kết này để chuyển trực tiếp đến Cloud Shell Editor
  2. Nếu được nhắc uỷ quyền vào bất kỳ thời điểm nào trong ngày hôm nay, hãy nhấp vào Uỷ quyền để tiếp tục.Nhấp để uỷ quyền cho Cloud Shell
  3. Nếu thiết bị đầu cuối không xuất hiện ở cuối màn hình, hãy mở thiết bị đầu cuối:
    • Nhấp vào Xem
    • Nhấp vào Terminal (Thiết bị đầu cuối)Mở cửa sổ dòng lệnh mới trong Trình chỉnh sửa Cloud Shell
  4. Trong cửa sổ dòng lệnh, hãy thiết lập dự án bằng lệnh sau:
    gcloud config set project [PROJECT_ID]
    
    • Ví dụ:
      gcloud config set project lab-project-id-example
      
    • Nếu không nhớ mã dự án, bạn có thể liệt kê tất cả mã dự án bằng cách dùng lệnh:
      gcloud projects list
      
      Đặt mã dự án trong thiết bị đầu cuối Cloud Shell Editor
  5. Bạn sẽ thấy thông báo sau:
    Updated property [core/project].
    

4. Bật API

Để xây dựng giải pháp này, bạn cần bật một số API của Google Cloud cho Vertex AI, Cloud SQL và dịch vụ Sắp xếp lại.

  1. Trong dòng lệnh, hãy bật các API:
    gcloud services enable \
      aiplatform.googleapis.com \
      sqladmin.googleapis.com \
      cloudresourcemanager.googleapis.com \
      serviceusage.googleapis.com \
      discoveryengine.googleapis.com
    
    
    

Giới thiệu về các API

  • Vertex AI API (aiplatform.googleapis.com): Cho phép sử dụng Gemini để tạo nội dung và Vertex AI Embeddings để chuyển đổi văn bản thành vectơ.
  • Cloud SQL Admin API (sqladmin.googleapis.com): Cho phép bạn quản lý các phiên bản Cloud SQL theo phương thức lập trình.
  • Discovery Engine API (discoveryengine.googleapis.com): Hỗ trợ các chức năng của Vertex AI Reranker.
  • Service Usage API (serviceusage.googleapis.com): Bắt buộc để kiểm tra và quản lý hạn mức dịch vụ.

5. Tạo môi trường ảo và cài đặt các phần phụ thuộc

Trước khi bắt đầu bất kỳ dự án Python nào, bạn nên tạo một môi trường ảo. Điều này giúp tách biệt các phần phụ thuộc của dự án, ngăn chặn xung đột với các dự án khác hoặc các gói Python toàn cục của hệ thống.

  1. Tạo một thư mục có tên là rag-labs rồi chuyển vào thư mục đó. Chạy mã sau trong terminal:
    mkdir rag-labs && cd rag-labs
    
  2. Tạo và kích hoạt môi trường ảo:
    uv venv --python 3.12
    source .venv/bin/activate
    
  3. Tạo một tệp requirements.txt có các phần phụ thuộc cần thiết. Chạy mã sau trong terminal:
    cloudshell edit requirements.txt
    
  4. Dán các phần phụ thuộc được tối ưu hoá sau đây vào requirements.txt. Các phiên bản này được ghim để tránh xung đột và tăng tốc quá trình cài đặt.
    # Core LangChain & AI
    langchain-community==0.3.31
    langchain-google-vertexai==2.1.2
    langchain-google-community[vertexaisearch]==2.0.10
    
    # Google Cloud
    google-cloud-storage==2.19.0
    google-cloud-aiplatform[langchain]==1.130.0
    
    # Database
    cloud-sql-python-connector[pg8000]==1.19.0
    sqlalchemy==2.0.45
    pgvector==0.4.2
    
    # Utilities
    tiktoken==0.12.0
    python-dotenv==1.2.1
    requests==2.32.5
    
  5. Cài đặt các phần phụ thuộc:
    uv pip install -r requirements.txt
    

6. Thiết lập Cloud SQL cho PostgreSQL

Trong nhiệm vụ này, bạn sẽ cung cấp một phiên bản Cloud SQL cho PostgreSQL, tạo một cơ sở dữ liệu và chuẩn bị cơ sở dữ liệu đó cho tính năng tìm kiếm vectơ.

Xác định cấu hình Cloud SQL

  1. Tạo một tệp .env để lưu trữ cấu hình của bạn. Chạy mã sau trong terminal:
    cloudshell edit .env
    
  2. Dán cấu hình sau vào .env.
    # Project Config
    PROJECT_ID="[YOUR_PROJECT_ID]"
    REGION="us-central1"
    
    # Database Config
    SQL_INSTANCE_NAME="rag-pg-instance-1"
    SQL_DATABASE_NAME="rag_harry_potter_db"
    SQL_USER="rag_user"
    SQL_PASSWORD="StrongPassword123!" 
    
    # RAG Config
    PGVECTOR_COLLECTION_NAME="rag_harry_potter"
    RANKING_LOCATION_ID="global"
    
    # Connection Name (Auto-generated in scripts usually, but useful to have)
    DB_INSTANCE_CONNECTION_NAME="${PROJECT_ID}:${REGION}:${SQL_INSTANCE_NAME}"
    
  3. Thay thế [YOUR_PROJECT_ID] bằng mã dự án thực tế của bạn trên Google Cloud. (ví dụ: PROJECT_ID = "google-cloud-labs")
    Nếu bạn không nhớ mã dự án, hãy chạy lệnh sau trong cửa sổ dòng lệnh. Thao tác này sẽ cho bạn thấy danh sách tất cả các dự án và mã dự án của bạn.
    gcloud projects list
    
  4. Tải các biến vào phiên shell:
    source .env
    

Tạo thực thể và cơ sở dữ liệu

  1. Tạo một phiên bản Cloud SQL cho PostgreSQL. Lệnh này sẽ tạo một phiên bản nhỏ phù hợp với phòng thí nghiệm này.
    gcloud sql instances create ${SQL_INSTANCE_NAME} \
      --database-version=POSTGRES_15 \
      --tier=db-g1-small \
      --region=${REGION} \
      --project=${PROJECT_ID}
    
  2. Sau khi phiên bản sẵn sàng, hãy tạo cơ sở dữ liệu:
    gcloud sql databases create ${SQL_DATABASE_NAME} \
      --instance=${SQL_INSTANCE_NAME} \
      --project=${PROJECT_ID}
    
  3. Tạo người dùng cơ sở dữ liệu:
    gcloud sql users create ${SQL_USER} \
      --instance=${SQL_INSTANCE_NAME} \
      --password=${SQL_PASSWORD} \
      --project=${PROJECT_ID}
    

Bật tiện ích pgvector

Tiện ích pgvector cho phép PostgreSQL lưu trữ và tìm kiếm các vectơ nhúng. Bạn phải bật tính năng này một cách rõ ràng trên cơ sở dữ liệu của mình.

  1. Tạo một tập lệnh có tên là enable_pgvector.py. Chạy mã sau trong terminal:
    cloudshell edit enable_pgvector.py
    
  2. Dán mã sau vào enable_pgvector.py. Tập lệnh này kết nối với cơ sở dữ liệu của bạn và chạy CREATE EXTENSION IF NOT EXISTS vector;.
    import os
    import sqlalchemy
    from google.cloud.sql.connector import Connector, IPTypes
    import logging
    from dotenv import load_dotenv
    
    load_dotenv()
    logging.basicConfig(level=logging.INFO)
    
    # Config
    project_id = os.getenv("PROJECT_ID")
    region = os.getenv("REGION")
    instance_name = os.getenv("SQL_INSTANCE_NAME")
    db_user = os.getenv("SQL_USER")
    db_pass = os.getenv("SQL_PASSWORD")
    db_name = os.getenv("SQL_DATABASE_NAME")
    instance_connection_name = f"{project_id}:{region}:{instance_name}"
    
    def getconn():
        with Connector() as connector:
            conn = connector.connect(
                instance_connection_name,
                "pg8000",
                user=db_user,
                password=db_pass,
                db=db_name,
                ip_type=IPTypes.PUBLIC,
            )
            return conn
    
    def enable_pgvector():
        pool = sqlalchemy.create_engine(
            "postgresql+pg8000://",
            creator=getconn,
        )
        with pool.connect() as db_conn:
            # Check if extension exists
            result = db_conn.execute(sqlalchemy.text("SELECT extname FROM pg_extension WHERE extname = 'vector';")).fetchone()
            if result:
                logging.info("pgvector extension is already enabled.")
            else:
                logging.info("Enabling pgvector extension...")
                db_conn.execute(sqlalchemy.text("CREATE EXTENSION IF NOT EXISTS vector;"))
                db_conn.commit()
                logging.info("pgvector extension enabled successfully.")
    
    if __name__ == "__main__":
        enable_pgvector()
    
  3. Chạy tập lệnh:
    python enable_pgvector.py
    

7. Phần 1: Chiến lược phân đoạn

Bước đầu tiên trong quy trình RAG là chuyển đổi tài liệu sang định dạng mà LLM có thể hiểu: đoạn.

LLM có giới hạn về cửa sổ ngữ cảnh (lượng văn bản mà LLM có thể xử lý cùng một lúc). Hơn nữa, việc truy xuất một tài liệu gồm 50 trang để trả lời một câu hỏi cụ thể sẽ làm loãng thông tin. Chúng tôi chia tài liệu thành các "đoạn" nhỏ hơn để tách biệt thông tin liên quan.

Tuy nhiên, cách bạn chia văn bản có ý nghĩa vô cùng lớn:

  • Character Splitter (Công cụ phân tách ký tự): Phân tách theo số lượng ký tự. Cách này nhanh nhưng rủi ro vì có thể cắt đôi từ hoặc câu, làm mất ý nghĩa ngữ nghĩa.
  • Recursive Splitter (Bộ tách đệ quy): Cố gắng tách theo đoạn văn trước, sau đó là câu, rồi đến từ. Công cụ này cố gắng giữ các đơn vị ngữ nghĩa với nhau.
  • Token Splitter:Phân chia dựa trên từ vựng (mã thông báo) của LLM. Điều này đảm bảo các đoạn văn vừa khít với cửa sổ ngữ cảnh nhưng có thể tốn nhiều tài nguyên tính toán hơn để tạo.

Trong phần này, bạn sẽ nhập cùng một dữ liệu bằng cả 3 chiến lược để so sánh chúng.

Tạo tập lệnh truyền dữ liệu

Bạn sẽ sử dụng một tập lệnh tải một tập dữ liệu Harry Potter xuống, chia tập dữ liệu đó bằng các chiến lược Character, RecursiveToken, đồng thời tải các mục nhúng lên 3 bảng riêng biệt trong Cloud SQL.

  1. Tạo tệp ingest_data.py:
    cloudshell edit ingest_data.py
    
  2. Dán mã cố định sau đây vào ingest_data.py. Phiên bản này phân tích cú pháp chính xác cấu trúc JSON của tập dữ liệu.
    import os
    import json
    import logging
    import requests
    from typing import List, Dict, Any
    from dotenv import load_dotenv
    
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter, TokenTextSplitter
    from langchain.docstore.document import Document
    
    load_dotenv()
    logging.basicConfig(level=logging.INFO)
    
    # Configuration
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME")
    BOOKS_JSON_URL = "https://storage.googleapis.com/github-repo/generative-ai/gemini/reasoning-engine/sample_data/harry_potter_books.json"
    
    CHUNK_SIZE = 500
    CHUNK_OVERLAP = 50
    MAX_DOCS_TO_PROCESS = 10 
    
    # Database Connector
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def download_data():
        logging.info(f"Downloading data from {BOOKS_JSON_URL}...")
        response = requests.get(BOOKS_JSON_URL)
        return response.json()
    
    def prepare_chunks(json_data, strategy):
        documents = []
    
        # Iterate through the downloaded data
        for entry in json_data[:MAX_DOCS_TO_PROCESS]:
    
            # --- JSON PARSING LOGIC ---
            # The data structure nests content inside 'kwargs' -> 'page_content'
            if "kwargs" in entry and "page_content" in entry["kwargs"]:
                content = entry["kwargs"]["page_content"]
    
                # Extract metadata if available, ensuring it's a dict
                metadata = entry["kwargs"].get("metadata", {})
                if not isinstance(metadata, dict):
                    metadata = {"source": "unknown"}
    
                # Add the strategy to metadata for tracking
                metadata["strategy"] = strategy
            else:
                continue
    
            if not content:
                continue
    
            # Choose the splitter based on the strategy
            if strategy == "character":
                splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separator="\n")
            elif strategy == "token":
                splitter = TokenTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
            else: # default to recursive
                splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
    
            # Split the content into chunks
            chunks = splitter.split_text(content)
    
            # Create Document objects for each chunk
            for chunk in chunks:
                documents.append(Document(page_content=chunk, metadata=metadata))
    
        return documents
    
    def main():
        logging.info("Initializing Embeddings...")
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
    
        data = download_data()
        strategies = ["character", "recursive", "token"]
    
        # Connection string for PGVector (uses the getconn helper)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        for strategy in strategies:
            collection_name = f"{BASE_COLLECTION_NAME}_{strategy}"
            logging.info(f"--- Processing strategy: {strategy.upper()} ---")
            logging.info(f"Target Collection: {collection_name}")
    
            # Prepare documents with the specific strategy
            docs = prepare_chunks(data, strategy)
    
            if not docs:
                logging.warning(f"No documents generated for strategy {strategy}. Check data source.")
                continue
    
            logging.info(f"Generated {len(docs)} chunks. Uploading to Cloud SQL...")
    
            # Initialize the Vector Store
            store = PGVector(
                collection_name=collection_name,
                embedding_function=embeddings,
                connection_string=pg_conn_str,
                engine_args={"creator": getconn},
                pre_delete_collection=True # Clears old data for this collection before adding new
            )
    
            # Batch add documents
            store.add_documents(docs)
            logging.info(f"Successfully finished {strategy}.\n")
    
    if __name__ == "__main__":
        main()
    
  3. Chạy tập lệnh truyền dữ liệu. Thao tác này sẽ điền vào cơ sở dữ liệu của bạn 3 bảng (tập hợp) khác nhau.
    python ingest_data.py
    

So sánh kết quả phân đoạn

Bây giờ dữ liệu đã được tải, hãy chạy một truy vấn trên cả 3 tập hợp để xem chiến lược phân đoạn ảnh hưởng đến kết quả như thế nào.

  1. Tạo query_chunking.py:
    cloudshell edit query_chunking.py
    
  2. Dán mã sau vào query_chunking.py:
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR) # Only show errors to keep output clean
    
    # Config
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME")
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        query = "Tell me about the Dursleys and their relationship with Harry Potter"
        print(f"\nQUERY: {query}\n" + "="*50)
    
        strategies = ["character", "recursive", "token"]
    
        for strategy in strategies:
            collection = f"{BASE_COLLECTION_NAME}_{strategy}"
            print(f"\nSTRATEGY: {strategy.upper()}")
    
            store = PGVector(
                collection_name=collection,
                embedding_function=embeddings,
                connection_string=pg_conn_str,
                engine_args={"creator": getconn}
            )
    
            results = store.similarity_search_with_score(query, k=2)
            for i, (doc, score) in enumerate(results):
                print(f"  Result {i+1} (Score: {score:.4f}): {doc.page_content[:150].replace(chr(10), ' ')}...")
    
    if __name__ == "__main__":
        main()
    
  3. Chạy tập lệnh truy vấn:
    python query_chunking.py
    

Quan sát kết quả đầu ra.

Lưu ý cách phân chia Ký tự có thể cắt câu khi chưa hết ý, trong khi Đệ quy cố gắng tuân thủ ranh giới đoạn văn. Việc chia token đảm bảo các đoạn vừa khít với cửa sổ ngữ cảnh LLM nhưng có thể bỏ qua cấu trúc ngữ nghĩa.

8. Phần 2: Sắp xếp lại

Tìm kiếm vectơ (truy xuất) diễn ra cực kỳ nhanh chóng vì dựa trên các biểu diễn toán học được nén (embedding). Phương pháp này có phạm vi rộng để đảm bảo Khả năng thu hồi (tìm tất cả các mục có thể liên quan), nhưng thường có Độ chính xác thấp (thứ hạng của những mục đó có thể không hoàn hảo).

Thông thường, các tài liệu liên quan sẽ bị "Mất hút" ở giữa danh sách kết quả. Một LLM chú ý đến 5 kết quả hàng đầu có thể bỏ lỡ câu trả lời quan trọng ở vị trí thứ 7.

Sắp xếp lại giải quyết vấn đề này bằng cách thêm một giai đoạn thứ hai.

  1. Retriever: Tìm nạp một tập hợp lớn hơn (ví dụ: 25 mục hàng đầu) bằng cách sử dụng tính năng tìm kiếm vectơ nhanh.
  2. Xếp hạng lại: Sử dụng một mô hình chuyên biệt (chẳng hạn như Cross-Encoder) để kiểm tra toàn bộ văn bản của các cặp truy vấn và tài liệu. Phương pháp này chậm hơn nhưng chính xác hơn nhiều. Thao tác này sẽ tính điểm lại 25 kết quả hàng đầu và trả về 3 kết quả tốt nhất.

Trong nhiệm vụ này, bạn sẽ tìm kiếm trong tập hợp recursive đã tạo ở Phần 1, nhưng lần này bạn sẽ áp dụng Vertex AI Reranker để tinh chỉnh kết quả.

  1. Tạo query_reranking.py:
    cloudshell edit query_reranking.py
    
  2. Dán mã sau. Lưu ý cách nó nhắm đến rõ ràng việc thu thập _recursive và sử dụng ContextualCompressionRetriever.
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    
    # Reranking Imports
    from langchain.retrievers import ContextualCompressionRetriever
    from langchain_google_community.vertex_rank import VertexAIRank
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR)
    
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    
    # IMPORTANT: Target the recursive collection created in ingest_data.py
    COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"
    RANKING_LOCATION = os.getenv("RANKING_LOCATION_ID")
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        print(f"Connecting to collection: {COLLECTION_NAME}")
        store = PGVector(
            collection_name=COLLECTION_NAME,
            embedding_function=embeddings,
            connection_string=pg_conn_str,
            engine_args={"creator": getconn}
        )
    
        query = "What are the Horcruxes?"
        print(f"QUERY: {query}\n")
    
        # 1. Base Retriever (Vector Search) - Fetch top 10
        base_retriever = store.as_retriever(search_kwargs={"k": 10})
    
        # 2. Reranker - Select top 3 from the 10
        reranker = VertexAIRank(
            project_id=PROJECT_ID,
            location_id=RANKING_LOCATION,
            ranking_config="default_ranking_config",
            title_field="source",
            top_n=3
        )
    
        compression_retriever = ContextualCompressionRetriever(
            base_compressor=reranker,
            base_retriever=base_retriever
        )
    
        # Execute
        try:
            reranked_docs = compression_retriever.invoke(query)
    
            if not reranked_docs:
                print("No documents returned. Check if the collection exists and is populated.")
    
            print(f"--- Top 3 Reranked Results ---")
            for i, doc in enumerate(reranked_docs):
                print(f"Result {i+1} (Score: {doc.metadata.get('relevance_score', 'N/A')}):")
                print(f"  {doc.page_content[:200]}...\n")
        except Exception as e:
            print(f"Error during reranking: {e}")
    
    if __name__ == "__main__":
        main()
    
  3. Chạy truy vấn sắp xếp lại:
    python query_reranking.py
    

Quan sát

Bạn có thể nhận thấy điểm số về mức độ liên quan cao hơn hoặc thứ tự khác so với tìm kiếm vectơ thô. Điều này đảm bảo LLM nhận được ngữ cảnh chính xác nhất có thể.

9. Phần 3: Biến đổi truy vấn

Thông thường, nút thắt lớn nhất trong RAG là người dùng. Các cụm từ tìm kiếm của người dùng thường không rõ ràng, không đầy đủ hoặc diễn đạt kém. Nếu quá trình nhúng truy vấn không khớp về mặt toán học với quá trình nhúng tài liệu, thì quá trình truy xuất sẽ không thành công.

Tính năng Biến đổi truy vấn sử dụng một LLM để viết lại hoặc mở rộng truy vấn trước khi truy vấn đó truy cập vào cơ sở dữ liệu. Bạn sẽ triển khai 2 kỹ thuật:

  • HyDE (Nhúng tài liệu giả định): Mức độ tương đồng về vectơ giữa một câu hỏi và một câu trả lời thường thấp hơn mức độ tương đồng giữa một câu trả lời và một câu trả lời giả định. HyDE yêu cầu LLM tạo ra một câu trả lời hoàn hảo, nhúng câu trả lời đó và tìm kiếm những tài liệu có vẻ giống với câu trả lời được tạo.
  • Bước lùi trong câu lệnh: Nếu người dùng đặt một câu hỏi cụ thể và chi tiết, hệ thống có thể bỏ lỡ bối cảnh rộng hơn. Kỹ thuật đặt câu lệnh lùi bước yêu cầu LLM tạo một câu hỏi trừu tượng ở cấp cao hơn ("Gia đình này có lịch sử như thế nào?") để truy xuất thông tin cơ bản cùng với các chi tiết cụ thể.
  1. Tạo query_transformation.py:
    cloudshell edit query_transformation.py
    
  2. Dán mã sau:
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
    from langchain_community.vectorstores import PGVector
    from langchain_core.prompts import PromptTemplate
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR)
    
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def generate_hyde_doc(query, llm):
        prompt = PromptTemplate(
            input_variables=["question"],
            template="Write a concise, hypothetical answer to the question. Question: {question} Answer:"
        )
        chain = prompt | llm
        return chain.invoke({"question": query})
    
    def generate_step_back(query, llm):
        prompt = PromptTemplate(
            input_variables=["question"],
            template="Write a more general, abstract question that concepts in this question. Original: {question} Step-back:"
        )
        chain = prompt | llm
        return chain.invoke({"question": query})
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.5)
    
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
        store = PGVector(
            collection_name=COLLECTION_NAME,
            embedding_function=embeddings,
            connection_string=pg_conn_str,
            engine_args={"creator": getconn}
        )
        retriever = store.as_retriever(search_kwargs={"k": 2})
    
        original_query = "Tell me about the Dursleys."
        print(f"ORIGINAL QUERY: {original_query}\n" + "-"*30)
    
        # 1. HyDE
        hyde_doc = generate_hyde_doc(original_query, llm)
        print(f"HyDE Generated Doc: {hyde_doc.strip()[:100]}...")
        hyde_results = retriever.invoke(hyde_doc)
        print(f"HyDE Retrieval: {hyde_results[0].page_content[:100]}...\n")
    
        # 2. Step-back
        step_back_q = generate_step_back(original_query, llm)
        print(f"Step-back Query: {step_back_q.strip()}")
        step_results = retriever.invoke(step_back_q)
        print(f"Step-back Retrieval: {step_results[0].page_content[:100]}...")
    
    if __name__ == "__main__":
        main()
    
  3. Chạy tập lệnh chuyển đổi:
    python query_transformation.py
    

Quan sát kết quả đầu ra.

Lưu ý rằng truy vấn Step-back có thể truy xuất bối cảnh rộng hơn về lịch sử gia đình Dursley, trong khi HyDE tập trung vào các chi tiết cụ thể được tạo trong câu trả lời giả định.

10. Phần 4: Tạo từ đầu đến cuối

Chúng tôi đã cắt dữ liệu, tinh chỉnh nội dung tìm kiếm và làm rõ cụm từ tìm kiếm của người dùng. Giờ đây, chúng ta sẽ đặt "G" vào RAG: Tạo.

Cho đến thời điểm này, chúng ta chỉ tìm thông tin. Để tạo một trợ lý AI thực thụ, chúng ta cần cung cấp những tài liệu chất lượng cao đã được sắp xếp lại vào một LLM (Gemini) để tổng hợp câu trả lời bằng ngôn ngữ tự nhiên.

Trong quy trình sản xuất, việc này liên quan đến một quy trình cụ thể:

  1. Truy xuất: Nhận một tập hợp rộng lớn các đề xuất (ví dụ: Top 10) bằng tính năng tìm kiếm vectơ nhanh.
  2. Sắp xếp lại: Lọc xuống mức tốt nhất tuyệt đối (ví dụ: 3) sử dụng Vertex AI Reranker.
  3. Xây dựng bối cảnh: Ghép nội dung của 3 tài liệu hàng đầu đó thành một chuỗi duy nhất.
  4. Grounded Prompting (Lời nhắc có căn cứ): Chèn chuỗi ngữ cảnh đó vào một mẫu lời nhắc nghiêm ngặt buộc LLM chỉ sử dụng thông tin đó.

Tạo tập lệnh tạo

Chúng ta sẽ sử dụng gemini-2.5-flash cho bước tạo. Mô hình này rất phù hợp với RAG vì có cửa sổ ngữ cảnh dài và độ trễ thấp, cho phép mô hình xử lý nhanh nhiều tài liệu được truy xuất.

  1. Tạo end_to_end_rag.py:
cloudshell edit end_to_end_rag.py
  1. Dán mã sau. Hãy chú ý đến biến template. Đây là nơi chúng tôi hướng dẫn nghiêm ngặt mô hình tránh "ảo giác" (tự tạo ra thông tin) bằng cách liên kết biến này với ngữ cảnh được cung cấp.
import os
import logging
from dotenv import load_dotenv
from google.cloud.sql.connector import Connector, IPTypes
from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
from langchain_community.vectorstores import PGVector
from langchain.retrievers import ContextualCompressionRetriever
from langchain_google_community.vertex_rank import VertexAIRank
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

load_dotenv()
logging.basicConfig(level=logging.ERROR)

PROJECT_ID = os.getenv("PROJECT_ID")
REGION = os.getenv("REGION")
# We use the recursive collection as it generally provides the best context boundaries
COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"

def getconn():
    instance_conn = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    with Connector() as connector:
        return connector.connect(
            instance_conn, "pg8000",
            user=os.getenv("SQL_USER"), password=os.getenv("SQL_PASSWORD"),
            db=os.getenv("SQL_DATABASE_NAME"), ip_type=IPTypes.PUBLIC
        )

def main():
    print("--- Initializing Production RAG Pipeline ---")

    # 1. Setup Embeddings (Gemini Embedding 001)
    # We use this to vectorize the user's query to match our database.
    embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)

    # 2. Connect to Vector Store
    pg_conn_str = f"postgresql+pg8000://{os.getenv('SQL_USER')}:{os.getenv('SQL_PASSWORD')}@placeholder/{os.getenv('SQL_DATABASE_NAME')}"
    store = PGVector(
        collection_name=COLLECTION_NAME,
        embedding_function=embeddings,
        connection_string=pg_conn_str,
        engine_args={"creator": getconn}
    )

    # 3. Setup The 'Filter Funnel' (Retriever + Reranker)
    # Step A: Fast retrieval of top 10 similar documents
    base_retriever = store.as_retriever(search_kwargs={"k": 10})

    # Step B: Precise reranking to find the top 3 most relevant
    reranker = VertexAIRank(
        project_id=PROJECT_ID,
        location_id="global", 
        ranking_config="default_ranking_config",
        title_field="source",
        top_n=3
    )

    # Combine A and B into a single retrieval object
    compression_retriever = ContextualCompressionRetriever(
        base_compressor=reranker,
        base_retriever=base_retriever
    )

    # 4. Setup LLM (Gemini 2.5 Flash)
    # We use a low temperature (0.1) to reduce creativity and increase factual adherence.
    llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.1)

    # --- Execution Loop ---
    user_query = "Who is Harry Potter?"
    print(f"\nUser Query: {user_query}")
    print("Retrieving and Reranking documents...")

    # Retrieve the most relevant documents
    top_docs = compression_retriever.invoke(user_query)

    if not top_docs:
        print("No relevant documents found.")
        return

    # Build the Context String
    # We stitch the documents together, labeling them as Source 1, Source 2, etc.
    context_str = "\n\n".join([f"Source {i+1}: {d.page_content}" for i, d in enumerate(top_docs)])

    print(f"Found {len(top_docs)} relevant context chunks.")

    # 5. The Grounded Prompt
    template = """You are a helpful assistant. Answer the question strictly based on the provided context.
    If the answer is not in the context, say "I don't know."

    Context:
    {context}

    Question:
    {question}

    Answer:
    """

    prompt = PromptTemplate(template=template, input_variables=["context", "question"])

    # Create the chain: Prompt -> LLM
    chain = prompt | llm

    print("Generating Answer via Gemini 2.5 Flash...")
    final_answer = chain.invoke({"context": context_str, "question": user_query})

    print(f"\nFINAL ANSWER:\n{final_answer}")

if __name__ == "__main__":
    main()
  1. Chạy ứng dụng cuối cùng:
python end_to_end_rag.py

Tìm hiểu về đầu ra

Khi bạn chạy tập lệnh này, hãy quan sát sự khác biệt giữa các đoạn thô được truy xuất (mà bạn đã thấy trong các bước trước) và câu trả lời cuối cùng. LLM đóng vai trò là một bộ tổng hợp – nó đọc các "đoạn" văn bản rời rạc do Reranker cung cấp và sắp xếp chúng thành một câu mạch lạc, dễ đọc.

Bằng cách liên kết các thành phần này, bạn chuyển từ một "phỏng đoán" ngẫu nhiên sang một quy trình làm việc có căn cứ và mang tính xác định. Retriever (Trình truy xuất) sẽ tìm kiếm thông tin, Reranker (Trình sắp xếp lại) sẽ chọn thông tin phù hợp nhất và Generator (Trình tạo) sẽ tạo ra câu trả lời.

11. Kết luận

Xin chúc mừng! Bạn đã xây dựng thành công một quy trình RAG nâng cao, vượt xa tính năng tìm kiếm vectơ cơ bản.

Tóm tắt

  • Bạn đã định cấu hình Cloud SQL bằng pgvector để có bộ nhớ vectơ có thể mở rộng.
  • Bạn đã so sánh Chiến lược phân đoạn để hiểu cách hoạt động chuẩn bị dữ liệu ảnh hưởng đến việc truy xuất.
  • Bạn đã triển khai tính năng Sắp xếp lại bằng Vertex AI để cải thiện độ chính xác của kết quả.
  • Bạn đã sử dụng Biến đổi truy vấn (HyDE, Step-back) để điều chỉnh ý định của người dùng cho phù hợp với dữ liệu của bạn.

Tìm hiểu thêm

Từ nguyên mẫu đến phiên bản phát hành công khai

Phòng thí nghiệm này thuộc Lộ trình học tập về AI sẵn sàng cho sản xuất bằng Google Cloud.