高级 RAG 技术

1. 简介

概览

检索增强生成 (RAG) 通过将大语言模型 (LLM) 的回答建立在外部知识的基础之上,增强了 LLM 的回答能力。不过,构建可用于生产环境的 RAG 系统需要的不仅仅是简单的向量搜索。您必须优化数据提取方式、相关结果排名方式和用户查询处理方式。

在这个全面的实验中,您将使用 Cloud SQL for PostgreSQL(通过 pgvector 扩展)和 Vertex AI 构建强大的 RAG 应用。您将学习三种高级技巧:

  1. 分块策略:您将观察不同的文本拆分方法(字符、递归、令牌)如何影响检索质量。
  2. 重新排名:您将实现 Vertex AI Reranker,以优化搜索结果并解决“中间丢失”问题。
  3. 查询转换:您将使用 Gemini 通过 HyDE(假设文档嵌入)和 Step-back Prompting 等技术优化用户查询。

您将执行的操作

  • 使用 pgvector 设置 Cloud SQL for PostgreSQL 实例。
  • 构建一个数据注入流水线,该流水线使用多种策略将文本分块,并将嵌入内容存储在 Cloud SQL 中。
  • 执行语义搜索,并比较不同分块方法的结果质量。
  • 集成重新排序器,以根据相关性对检索到的文档重新排序。
  • 实现依托 LLM 的查询转换,以改进模糊或复杂问题的检索。

学习内容

  • 如何将 LangChainVertex AICloud SQL 搭配使用。
  • 字符递归令牌文本拆分器的影响。
  • 如何在 PostgreSQL 中实现向量搜索
  • 如何使用 ContextualCompressionRetriever 进行重新排名。
  • 如何实现 HyDEStep-back Prompting

2. 项目设置

Google 账号

如果您还没有个人 Google 账号,则必须先创建一个 Google 账号

请使用个人账号,而不是工作账号或学校账号。

登录 Google Cloud 控制台

使用个人 Google 账号登录 Google Cloud 控制台

启用结算功能

兑换 5 美元的 Google Cloud 赠金(可选)

如需参加此研讨会,您需要拥有一个有一定信用额度的结算账号。如果您打算使用自己的结算方式,则可以跳过此步骤。

  1. 点击此链接,然后使用个人 Google 账号登录。您会看到类似如下的内容:点击此处前往“赠金”页面
  2. 点击点击此处访问您的积分按钮。系统随即会显示一个页面,供您设置结算资料 “设置结算资料”页面
  3. 点击确认。您现在已关联到 Google Cloud Platform 试用结算账号。结算概览的屏幕截图

设置个人结算账号

如果您使用 Google Cloud 抵用金设置了结算,则可以跳过此步骤。

如需设置个人结算账号,请点击此处在 Cloud 控制台中启用结算功能

注意事项:

  • 完成本实验的 Cloud 资源费用应不到 1 美元。
  • 您可以按照本实验结束时的步骤删除资源,以避免产生更多费用。
  • 新用户符合参与 $300 USD 免费试用计划的条件。

创建项目(可选)

如果您没有要用于此实验的当前项目,请在此处创建一个新项目

3. 打开 Cloud Shell Editor

  1. 点击此链接可直接前往 Cloud Shell 编辑器
  2. 如果系统在今天任何时间提示您进行授权,请点击授权继续。点击以授权 Cloud Shell
  3. 如果终端未显示在屏幕底部,请打开它:
    • 点击查看
    • 点击终端在 Cloud Shell 编辑器中打开新终端
  4. 在终端中,使用以下命令设置项目:
    gcloud config set project [PROJECT_ID]
    
    • 示例:
      gcloud config set project lab-project-id-example
      
    • 如果您不记得自己的项目 ID,可以使用以下命令列出所有项目 ID:
      gcloud projects list
      
      在 Cloud Shell 编辑器终端中设置项目 ID
  5. 您应会看到以下消息:
    Updated property [core/project].
    

4. 启用 API

如需构建此解决方案,您需要为 Vertex AI、Cloud SQL 和重新排名服务启用多个 Google Cloud API。

  1. 在终端中,启用以下 API:
    gcloud services enable \
      aiplatform.googleapis.com \
      sqladmin.googleapis.com \
      cloudresourcemanager.googleapis.com \
      serviceusage.googleapis.com \
      discoveryengine.googleapis.com
    
    
    

API 简介

  • Vertex AI API (aiplatform.googleapis.com):支持使用 Gemini 进行生成,并使用 Vertex AI Embeddings 将文本矢量化。
  • Cloud SQL Admin API (sqladmin.googleapis.com):允许您以编程方式管理 Cloud SQL 实例。
  • Discovery Engine API (discoveryengine.googleapis.com):为 Vertex AI Reranker 功能提供支持。
  • Service Usage API (serviceusage.googleapis.com):用于检查和管理服务配额。

5. 创建虚拟环境并安装依赖项

在开始任何 Python 项目之前,最好先创建一个虚拟环境。这样可以隔离项目的依赖项,防止与其他项目或系统的全局 Python 软件包发生冲突。

  1. 创建名为 rag-labs 的文件夹,并切换到该文件夹。在终端中运行以下代码:
    mkdir rag-labs && cd rag-labs
    
  2. 创建并激活虚拟环境:
    uv venv --python 3.12
    source .venv/bin/activate
    
  3. 创建一个包含必要依赖项的 requirements.txt 文件。在终端中运行以下代码:
    cloudshell edit requirements.txt
    
  4. 将以下优化的依赖项粘贴到 requirements.txt 中。这些版本已固定,以避免冲突并加快安装速度。
    # Core LangChain & AI
    langchain-community==0.3.31
    langchain-google-vertexai==2.1.2
    langchain-google-community[vertexaisearch]==2.0.10
    
    # Google Cloud
    google-cloud-storage==2.19.0
    google-cloud-aiplatform[langchain]==1.130.0
    
    # Database
    cloud-sql-python-connector[pg8000]==1.19.0
    sqlalchemy==2.0.45
    pgvector==0.4.2
    
    # Utilities
    tiktoken==0.12.0
    python-dotenv==1.2.1
    requests==2.32.5
    
  5. 安装依赖项:
    uv pip install -r requirements.txt
    

6. 设置 Cloud SQL for PostgreSQL

在此任务中,您将预配 Cloud SQL for PostgreSQL 实例、创建数据库,并为向量搜索做好准备。

定义 Cloud SQL 配置

  1. 创建一个 .env 文件来存储您的配置。在终端中运行以下代码:
    cloudshell edit .env
    
  2. 将以下配置粘贴到 .env 中。
    # Project Config
    PROJECT_ID="[YOUR_PROJECT_ID]"
    REGION="us-central1"
    
    # Database Config
    SQL_INSTANCE_NAME="rag-pg-instance-1"
    SQL_DATABASE_NAME="rag_harry_potter_db"
    SQL_USER="rag_user"
    SQL_PASSWORD="StrongPassword123!" 
    
    # RAG Config
    PGVECTOR_COLLECTION_NAME="rag_harry_potter"
    RANKING_LOCATION_ID="global"
    
    # Connection Name (Auto-generated in scripts usually, but useful to have)
    DB_INSTANCE_CONNECTION_NAME="${PROJECT_ID}:${REGION}:${SQL_INSTANCE_NAME}"
    
  3. [YOUR_PROJECT_ID] 替换为您的实际 Google Cloud 项目 ID。(例如 PROJECT_ID = "google-cloud-labs"
    如果您忘记了项目 ID,请在终端中运行以下命令。系统会显示您的所有项目及其 ID 的列表。
    gcloud projects list
    
  4. 将变量加载到 shell 会话中:
    source .env
    

创建实例和数据库

  1. 创建 Cloud SQL for PostgreSQL 实例。此命令会创建一个适合本实验的小型实例。
    gcloud sql instances create ${SQL_INSTANCE_NAME} \
      --database-version=POSTGRES_15 \
      --tier=db-g1-small \
      --region=${REGION} \
      --project=${PROJECT_ID}
    
  2. 实例准备就绪后,创建数据库:
    gcloud sql databases create ${SQL_DATABASE_NAME} \
      --instance=${SQL_INSTANCE_NAME} \
      --project=${PROJECT_ID}
    
  3. 创建数据库用户:
    gcloud sql users create ${SQL_USER} \
      --instance=${SQL_INSTANCE_NAME} \
      --password=${SQL_PASSWORD} \
      --project=${PROJECT_ID}
    

启用 pgvector 扩展程序

pgvector 扩展程序可让 PostgreSQL 存储和搜索矢量嵌入。您必须在数据库中明确启用该功能。

  1. 创建名为 enable_pgvector.py 的脚本。在终端中运行以下代码:
    cloudshell edit enable_pgvector.py
    
  2. 将以下代码粘贴到 enable_pgvector.py 中。此脚本会连接到您的数据库并运行 CREATE EXTENSION IF NOT EXISTS vector;
    import os
    import sqlalchemy
    from google.cloud.sql.connector import Connector, IPTypes
    import logging
    from dotenv import load_dotenv
    
    load_dotenv()
    logging.basicConfig(level=logging.INFO)
    
    # Config
    project_id = os.getenv("PROJECT_ID")
    region = os.getenv("REGION")
    instance_name = os.getenv("SQL_INSTANCE_NAME")
    db_user = os.getenv("SQL_USER")
    db_pass = os.getenv("SQL_PASSWORD")
    db_name = os.getenv("SQL_DATABASE_NAME")
    instance_connection_name = f"{project_id}:{region}:{instance_name}"
    
    def getconn():
        with Connector() as connector:
            conn = connector.connect(
                instance_connection_name,
                "pg8000",
                user=db_user,
                password=db_pass,
                db=db_name,
                ip_type=IPTypes.PUBLIC,
            )
            return conn
    
    def enable_pgvector():
        pool = sqlalchemy.create_engine(
            "postgresql+pg8000://",
            creator=getconn,
        )
        with pool.connect() as db_conn:
            # Check if extension exists
            result = db_conn.execute(sqlalchemy.text("SELECT extname FROM pg_extension WHERE extname = 'vector';")).fetchone()
            if result:
                logging.info("pgvector extension is already enabled.")
            else:
                logging.info("Enabling pgvector extension...")
                db_conn.execute(sqlalchemy.text("CREATE EXTENSION IF NOT EXISTS vector;"))
                db_conn.commit()
                logging.info("pgvector extension enabled successfully.")
    
    if __name__ == "__main__":
        enable_pgvector()
    
  3. 运行脚本:
    python enable_pgvector.py
    

7. 第 1 部分:分块策略

任何 RAG 流水线的第一步都是将文档转换为 LLM 可以理解的格式:

LLM 的上下文窗口(一次可处理的文本量)有限。此外,检索 50 页的文档来回答特定问题会稀释信息。我们将文档拆分为较小的“块”,以隔离相关信息。

不过,文本的拆分方式非常重要:

  • 字符拆分器:严格按字符数拆分。这种方法速度快,但风险高;它可能会将字词或句子截断一半,从而破坏语义。
  • 递归拆分器:尝试先按段落拆分,然后按句子拆分,最后按字词拆分。它会尝试将语义单元放在一起。
  • 词元拆分器:根据 LLM 自身的词汇(词元)进行拆分。这样可以确保块完美契合上下文窗口,但生成成本可能更高。

在本部分中,您将使用所有三种策略来提取相同的数据,以便进行比较。

创建提取脚本

您将使用一个脚本下载 Harry Potter 数据集,使用 CharacterRecursiveToken 策略拆分该数据集,并将嵌入内容上传到 Cloud SQL 中的三个单独的表中。

  1. 创建文件 ingest_data.py
    cloudshell edit ingest_data.py
    
  2. 将以下已修复的代码粘贴到 ingest_data.py 中。此版本可正确解析数据集的 JSON 结构。
    import os
    import json
    import logging
    import requests
    from typing import List, Dict, Any
    from dotenv import load_dotenv
    
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter, TokenTextSplitter
    from langchain.docstore.document import Document
    
    load_dotenv()
    logging.basicConfig(level=logging.INFO)
    
    # Configuration
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME")
    BOOKS_JSON_URL = "https://storage.googleapis.com/github-repo/generative-ai/gemini/reasoning-engine/sample_data/harry_potter_books.json"
    
    CHUNK_SIZE = 500
    CHUNK_OVERLAP = 50
    MAX_DOCS_TO_PROCESS = 10 
    
    # Database Connector
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def download_data():
        logging.info(f"Downloading data from {BOOKS_JSON_URL}...")
        response = requests.get(BOOKS_JSON_URL)
        return response.json()
    
    def prepare_chunks(json_data, strategy):
        documents = []
    
        # Iterate through the downloaded data
        for entry in json_data[:MAX_DOCS_TO_PROCESS]:
    
            # --- JSON PARSING LOGIC ---
            # The data structure nests content inside 'kwargs' -> 'page_content'
            if "kwargs" in entry and "page_content" in entry["kwargs"]:
                content = entry["kwargs"]["page_content"]
    
                # Extract metadata if available, ensuring it's a dict
                metadata = entry["kwargs"].get("metadata", {})
                if not isinstance(metadata, dict):
                    metadata = {"source": "unknown"}
    
                # Add the strategy to metadata for tracking
                metadata["strategy"] = strategy
            else:
                continue
    
            if not content:
                continue
    
            # Choose the splitter based on the strategy
            if strategy == "character":
                splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separator="\n")
            elif strategy == "token":
                splitter = TokenTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
            else: # default to recursive
                splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
    
            # Split the content into chunks
            chunks = splitter.split_text(content)
    
            # Create Document objects for each chunk
            for chunk in chunks:
                documents.append(Document(page_content=chunk, metadata=metadata))
    
        return documents
    
    def main():
        logging.info("Initializing Embeddings...")
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
    
        data = download_data()
        strategies = ["character", "recursive", "token"]
    
        # Connection string for PGVector (uses the getconn helper)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        for strategy in strategies:
            collection_name = f"{BASE_COLLECTION_NAME}_{strategy}"
            logging.info(f"--- Processing strategy: {strategy.upper()} ---")
            logging.info(f"Target Collection: {collection_name}")
    
            # Prepare documents with the specific strategy
            docs = prepare_chunks(data, strategy)
    
            if not docs:
                logging.warning(f"No documents generated for strategy {strategy}. Check data source.")
                continue
    
            logging.info(f"Generated {len(docs)} chunks. Uploading to Cloud SQL...")
    
            # Initialize the Vector Store
            store = PGVector(
                collection_name=collection_name,
                embedding_function=embeddings,
                connection_string=pg_conn_str,
                engine_args={"creator": getconn},
                pre_delete_collection=True # Clears old data for this collection before adding new
            )
    
            # Batch add documents
            store.add_documents(docs)
            logging.info(f"Successfully finished {strategy}.\n")
    
    if __name__ == "__main__":
        main()
    
  3. 运行提取脚本。这会使用三个不同的表(集合)填充您的数据库。
    python ingest_data.py
    

比较分块结果

现在,数据已加载完毕,接下来我们针对这三个集合运行查询,看看分块策略对结果有何影响。

  1. 创建 query_chunking.py
    cloudshell edit query_chunking.py
    
  2. 将以下代码粘贴到 query_chunking.py 中:
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR) # Only show errors to keep output clean
    
    # Config
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME")
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        query = "Tell me about the Dursleys and their relationship with Harry Potter"
        print(f"\nQUERY: {query}\n" + "="*50)
    
        strategies = ["character", "recursive", "token"]
    
        for strategy in strategies:
            collection = f"{BASE_COLLECTION_NAME}_{strategy}"
            print(f"\nSTRATEGY: {strategy.upper()}")
    
            store = PGVector(
                collection_name=collection,
                embedding_function=embeddings,
                connection_string=pg_conn_str,
                engine_args={"creator": getconn}
            )
    
            results = store.similarity_search_with_score(query, k=2)
            for i, (doc, score) in enumerate(results):
                print(f"  Result {i+1} (Score: {score:.4f}): {doc.page_content[:150].replace(chr(10), ' ')}...")
    
    if __name__ == "__main__":
        main()
    
  3. 运行查询脚本:
    python query_chunking.py
    

观察输出。

请注意,字符拆分可能会在句子中途截断句子,而递归拆分会尽量遵循段落边界。词元拆分可确保块完美契合 LLM 上下文窗口,但可能会忽略语义结构。

8. 第 2 部分:重新排名

向量搜索(检索)速度非常快,因为它依赖于压缩的数学表示形式(嵌入)。它会广泛撒网,以确保召回率(找到所有可能相关的商品),但往往存在精确度较低的问题(这些商品的排名可能并不理想)。

相关文档经常会“迷失”在结果列表的中间。如果 LLM 只关注前 5 个结果,可能会错过排名第 7 的关键答案。

重新排名通过添加第二阶段来解决此问题。

  1. 检索器:使用快速向量搜索提取更大的集合(例如前 25 个)。
  2. 重新排序器:使用专用模型(例如 Cross-Encoder)检查查询和文档对的全文。这种方法速度较慢,但准确性更高。它会重新对前 25 个结果进行评分,并返回绝对最佳的 3 个结果。

在此任务中,您将搜索第 1 部分中创建的 recursive 集合,但这次您将应用 Vertex AI Reranker 来优化结果。

  1. 创建 query_reranking.py
    cloudshell edit query_reranking.py
    
  2. 粘贴以下代码。请注意,该查询明确以 _recursive 集合为目标,并使用 ContextualCompressionRetriever
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    
    # Reranking Imports
    from langchain.retrievers import ContextualCompressionRetriever
    from langchain_google_community.vertex_rank import VertexAIRank
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR)
    
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    
    # IMPORTANT: Target the recursive collection created in ingest_data.py
    COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"
    RANKING_LOCATION = os.getenv("RANKING_LOCATION_ID")
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        print(f"Connecting to collection: {COLLECTION_NAME}")
        store = PGVector(
            collection_name=COLLECTION_NAME,
            embedding_function=embeddings,
            connection_string=pg_conn_str,
            engine_args={"creator": getconn}
        )
    
        query = "What are the Horcruxes?"
        print(f"QUERY: {query}\n")
    
        # 1. Base Retriever (Vector Search) - Fetch top 10
        base_retriever = store.as_retriever(search_kwargs={"k": 10})
    
        # 2. Reranker - Select top 3 from the 10
        reranker = VertexAIRank(
            project_id=PROJECT_ID,
            location_id=RANKING_LOCATION,
            ranking_config="default_ranking_config",
            title_field="source",
            top_n=3
        )
    
        compression_retriever = ContextualCompressionRetriever(
            base_compressor=reranker,
            base_retriever=base_retriever
        )
    
        # Execute
        try:
            reranked_docs = compression_retriever.invoke(query)
    
            if not reranked_docs:
                print("No documents returned. Check if the collection exists and is populated.")
    
            print(f"--- Top 3 Reranked Results ---")
            for i, doc in enumerate(reranked_docs):
                print(f"Result {i+1} (Score: {doc.metadata.get('relevance_score', 'N/A')}):")
                print(f"  {doc.page_content[:200]}...\n")
        except Exception as e:
            print(f"Error during reranking: {e}")
    
    if __name__ == "__main__":
        main()
    
  3. 运行重新排名查询:
    python query_reranking.py
    

观察

与原始向量搜索相比,您可能会发现相关性得分更高或排序不同。这样可确保 LLM 获得尽可能精确的上下文。

9. 第 3 部分:查询转换

通常,RAG 的最大瓶颈在于用户。用户查询通常含糊不清、不完整或措辞不当。如果查询嵌入在数学上与文档嵌入不一致,则检索会失败。

查询转换使用 LLM 在查询到达数据库之前重写或扩展查询。您将实现两种技术:

  • HyDE(假设性文档嵌入):问题与答案之间的向量相似度通常低于答案与假设性答案之间的相似度。HyDE 会要求 LLM 虚构一个完美答案,嵌入该答案,然后搜索看起来像该虚构答案的文档。
  • 退后一步提示:如果用户提出具体而详细的问题,系统可能会忽略更广泛的背景信息。退后提示会要求 LLM 生成更高级别的抽象问题(例如“这个家庭的历史是什么?”),以便在检索具体细节的同时检索基础信息。
  1. 创建 query_transformation.py
    cloudshell edit query_transformation.py
    
  2. 粘贴以下代码:
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
    from langchain_community.vectorstores import PGVector
    from langchain_core.prompts import PromptTemplate
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR)
    
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def generate_hyde_doc(query, llm):
        prompt = PromptTemplate(
            input_variables=["question"],
            template="Write a concise, hypothetical answer to the question. Question: {question} Answer:"
        )
        chain = prompt | llm
        return chain.invoke({"question": query})
    
    def generate_step_back(query, llm):
        prompt = PromptTemplate(
            input_variables=["question"],
            template="Write a more general, abstract question that concepts in this question. Original: {question} Step-back:"
        )
        chain = prompt | llm
        return chain.invoke({"question": query})
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.5)
    
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
        store = PGVector(
            collection_name=COLLECTION_NAME,
            embedding_function=embeddings,
            connection_string=pg_conn_str,
            engine_args={"creator": getconn}
        )
        retriever = store.as_retriever(search_kwargs={"k": 2})
    
        original_query = "Tell me about the Dursleys."
        print(f"ORIGINAL QUERY: {original_query}\n" + "-"*30)
    
        # 1. HyDE
        hyde_doc = generate_hyde_doc(original_query, llm)
        print(f"HyDE Generated Doc: {hyde_doc.strip()[:100]}...")
        hyde_results = retriever.invoke(hyde_doc)
        print(f"HyDE Retrieval: {hyde_results[0].page_content[:100]}...\n")
    
        # 2. Step-back
        step_back_q = generate_step_back(original_query, llm)
        print(f"Step-back Query: {step_back_q.strip()}")
        step_results = retriever.invoke(step_back_q)
        print(f"Step-back Retrieval: {step_results[0].page_content[:100]}...")
    
    if __name__ == "__main__":
        main()
    
  3. 运行转换脚本:
    python query_transformation.py
    

观察输出。

请注意,Step-back 查询可能会检索有关 Dursley 家族历史的更广泛的背景信息,而 HyDE 则侧重于假设答案中生成的具体细节。

10. 第 4 部分:端到端生成

我们对数据进行了切分、优化了搜索,并润色了用户的查询。现在,我们终于要讨论 RAG 中的“G”了:生成

到目前为止,我们一直在查找信息。为了打造真正的 AI 助理,我们需要将这些经过重新排名的高质量文档馈送到 LLM (Gemini) 中,以合成自然语言答案。

在生产流水线中,这涉及一个特定的流程:

  1. 检索:获取广泛的候选集(例如,前 10 名)使用快速向量搜索。
  2. 重新排名:过滤出绝对最佳的候选结果(例如,前 3 名)使用 Vertex AI Reranker。
  3. 上下文构建:将这 3 个最相关文档的内容拼接成一个字符串。
  4. 基于事实的提示:将该上下文字符串插入到严格的提示模板中,强制 LLM 使用该信息。

创建生成脚本

我们将使用 gemini-2.5-flash 进行生成步骤。此模型非常适合 RAG,因为它具有较长的上下文窗口和较低的延迟时间,因此能够快速处理多个检索到的文档。

  1. 创建 end_to_end_rag.py
cloudshell edit end_to_end_rag.py
  1. 粘贴以下代码。请注意 template 变量,我们通过将其绑定到提供的上下文,严格指示模型避免“幻觉”(编造内容)。
import os
import logging
from dotenv import load_dotenv
from google.cloud.sql.connector import Connector, IPTypes
from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
from langchain_community.vectorstores import PGVector
from langchain.retrievers import ContextualCompressionRetriever
from langchain_google_community.vertex_rank import VertexAIRank
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

load_dotenv()
logging.basicConfig(level=logging.ERROR)

PROJECT_ID = os.getenv("PROJECT_ID")
REGION = os.getenv("REGION")
# We use the recursive collection as it generally provides the best context boundaries
COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"

def getconn():
    instance_conn = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    with Connector() as connector:
        return connector.connect(
            instance_conn, "pg8000",
            user=os.getenv("SQL_USER"), password=os.getenv("SQL_PASSWORD"),
            db=os.getenv("SQL_DATABASE_NAME"), ip_type=IPTypes.PUBLIC
        )

def main():
    print("--- Initializing Production RAG Pipeline ---")

    # 1. Setup Embeddings (Gemini Embedding 001)
    # We use this to vectorize the user's query to match our database.
    embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)

    # 2. Connect to Vector Store
    pg_conn_str = f"postgresql+pg8000://{os.getenv('SQL_USER')}:{os.getenv('SQL_PASSWORD')}@placeholder/{os.getenv('SQL_DATABASE_NAME')}"
    store = PGVector(
        collection_name=COLLECTION_NAME,
        embedding_function=embeddings,
        connection_string=pg_conn_str,
        engine_args={"creator": getconn}
    )

    # 3. Setup The 'Filter Funnel' (Retriever + Reranker)
    # Step A: Fast retrieval of top 10 similar documents
    base_retriever = store.as_retriever(search_kwargs={"k": 10})

    # Step B: Precise reranking to find the top 3 most relevant
    reranker = VertexAIRank(
        project_id=PROJECT_ID,
        location_id="global", 
        ranking_config="default_ranking_config",
        title_field="source",
        top_n=3
    )

    # Combine A and B into a single retrieval object
    compression_retriever = ContextualCompressionRetriever(
        base_compressor=reranker,
        base_retriever=base_retriever
    )

    # 4. Setup LLM (Gemini 2.5 Flash)
    # We use a low temperature (0.1) to reduce creativity and increase factual adherence.
    llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.1)

    # --- Execution Loop ---
    user_query = "Who is Harry Potter?"
    print(f"\nUser Query: {user_query}")
    print("Retrieving and Reranking documents...")

    # Retrieve the most relevant documents
    top_docs = compression_retriever.invoke(user_query)

    if not top_docs:
        print("No relevant documents found.")
        return

    # Build the Context String
    # We stitch the documents together, labeling them as Source 1, Source 2, etc.
    context_str = "\n\n".join([f"Source {i+1}: {d.page_content}" for i, d in enumerate(top_docs)])

    print(f"Found {len(top_docs)} relevant context chunks.")

    # 5. The Grounded Prompt
    template = """You are a helpful assistant. Answer the question strictly based on the provided context.
    If the answer is not in the context, say "I don't know."

    Context:
    {context}

    Question:
    {question}

    Answer:
    """

    prompt = PromptTemplate(template=template, input_variables=["context", "question"])

    # Create the chain: Prompt -> LLM
    chain = prompt | llm

    print("Generating Answer via Gemini 2.5 Flash...")
    final_answer = chain.invoke({"context": context_str, "question": user_query})

    print(f"\nFINAL ANSWER:\n{final_answer}")

if __name__ == "__main__":
    main()
  1. 运行最终应用:
python end_to_end_rag.py

了解输出

运行此脚本时,请观察原始检索到的块(您在之前的步骤中看到过)与最终答案之间的差异。LLM 充当合成器,读取 Reranker 提供的零散文本“块”,并将它们平滑地组合成连贯且人类可读的句子。

通过串联这些组件,您可以从随机“猜测”转变为确定性的、有依据的工作流程。检索器负责撒网,重排序器负责挑选最佳渔获,生成器负责烹饪美食。

11. 总结

恭喜!您已成功构建一个远超基本向量搜索的高级 RAG 流水线。

回顾

  • 您已配置 Cloud SQL with pgvector 以实现可扩缩的向量存储。
  • 您比较了分块策略,以了解数据准备如何影响检索。
  • 您已通过 Vertex AI 实现重新排名,以提高结果的精确度。
  • 您利用查询转换(HyDE、Step-back)将用户意图与您的数据对齐。

了解详情

从原型设计到投入生产

本实验是可用于生产用途的 AI 与 Google Cloud 学习路线的组成部分。

  • 探索完整课程,弥合从原型设计到生产的差距。
  • 使用 #ProductionReadyAI 主题标签分享您的进度