高度な RAG 手法

1. はじめに

概要

検索拡張生成(RAG)は、外部の知識に基づいて大規模言語モデル(LLM)の回答をグラウンディングすることで、回答の品質を高めます。ただし、本番環境対応の RAG システムを構築するには、単純なベクトル検索以上のものが必要です。データの取り込み方法、関連性の高い結果のランキング方法、ユーザーのクエリの処理方法を最適化する必要があります。

この包括的なラボでは、Cloud SQL for PostgreSQLpgvector で拡張)と Vertex AI を使用して、堅牢な RAG アプリケーションを構築します。次の 3 つの高度な手法について学習します。

  1. チャンク化戦略: テキストを分割するさまざまな方法(文字、再帰、トークン)が検索の品質にどのように影響するかを確認します。
  2. 再ランキング: Vertex AI Reranker を実装して、検索結果を絞り込み、「lost in the middle」の問題に対処します。
  3. クエリ変換: Gemini を使用して、HyDE(仮説ドキュメント エンベディング)やステップバック プロンプトなどの手法でユーザー クエリを最適化します。

演習内容

  • pgvector を使用して Cloud SQL for PostgreSQL インスタンスを設定します。
  • 複数の戦略を使用してテキストをチャンク化し、エンベディングを Cloud SQL に保存するデータ取り込みパイプラインを構築します。
  • セマンティック検索を実行し、さまざまなチャンク方法による結果の品質を比較します。
  • Reranker を統合して、関連性に基づいて取得したドキュメントを並べ替えます。
  • LLM を活用したクエリ変換を実装して、あいまいな質問や複雑な質問に対する検索を改善します。

学習内容

  • Vertex AICloud SQLLangChain を使用する方法。
  • 文字再帰トークンのテキスト分割の影響。
  • PostgreSQL で ベクトル検索を実装する方法。
  • 再ランキングに ContextualCompressionRetriever を使用する方法。
  • HyDEStep-back Prompting を実装する方法。

2. プロジェクトの設定

Google アカウント

個人の Google アカウントをお持ちでない場合は、Google アカウントを作成する必要があります。

仕事用または学校用アカウントではなく、個人アカウントを使用します。

Google Cloud コンソールにログインする

個人の Google アカウントを使用して Google Cloud コンソールにログインします。

課金を有効にする

5 ドル分の Google Cloud クレジットを利用する(省略可)

このワークショップを実施するには、クレジットが設定された請求先アカウントが必要です。独自の請求を使用する予定の場合は、この手順をスキップできます。

  1. こちらのリンクをクリックし、個人の Google アカウントでログインします。次のような出力が表示されます。クレジットのページはこちらをクリック
  2. [クレジットにアクセスするにはこちらをクリック] ボタンをクリックします。お支払いプロファイルを設定するページが表示されます。お支払い情報の設定ページ
  3. [確認] をクリックします。これで、Google Cloud Platform トライアルの請求先アカウントに接続されました。請求の概要のスクリーンショット

個人用の請求先アカウントを設定する

Google Cloud クレジットを使用して課金を設定した場合は、この手順をスキップできます。

個人用の請求先アカウントを設定するには、Cloud コンソールでこちらに移動して課金を有効にします

注意事項:

  • このラボを完了するのにかかる Cloud リソースの費用は 1 米ドル未満です。
  • このラボの最後の手順に沿ってリソースを削除すると、それ以上の料金は発生しません。
  • 新規ユーザーは、300 米ドル分の無料トライアルをご利用いただけます。

プロジェクトの作成(省略可)

このラボで使用する現在のプロジェクトがない場合は、こちらで新しいプロジェクトを作成します。

3. Cloud Shell エディタを開く

  1. このリンクをクリックすると、Cloud Shell エディタに直接移動します。
  2. 本日、承認を求めるメッセージがどこかの時点で表示された場合は、[承認] をクリックして続行します。Cloud Shell を承認する
  3. ターミナルが画面の下部に表示されない場合は、ターミナルを開きます。
    • [表示] をクリックします。
    • [ターミナル] をクリックします。Cloud Shell エディタで新しいターミナルを開く
  4. ターミナルで、次のコマンドを使用してプロジェクトを設定します。
    gcloud config set project [PROJECT_ID]
    
    • 例:
      gcloud config set project lab-project-id-example
      
    • プロジェクト ID が思い出せない場合は、次のコマンドでプロジェクト ID をすべて一覧表示できます。
      gcloud projects list
      
      Cloud Shell エディタのターミナルでプロジェクト ID を設定する
  5. 次のようなメッセージが表示されます。
    Updated property [core/project].
    

4. API を有効にする

このソリューションを構築するには、Vertex AI、Cloud SQL、Reranking サービス用に複数の Google Cloud APIs を有効にする必要があります。

  1. ターミナルで API を有効にします。
    gcloud services enable \
      aiplatform.googleapis.com \
      sqladmin.googleapis.com \
      cloudresourcemanager.googleapis.com \
      serviceusage.googleapis.com \
      discoveryengine.googleapis.com
    
    
    

API の概要

  • Vertex AI APIaiplatform.googleapis.com): Gemini を使用して生成し、Vertex AI Embeddings を使用してテキストをベクトル化できます。
  • Cloud SQL Admin APIsqladmin.googleapis.com): Cloud SQL インスタンスをプログラムで管理できます。
  • Discovery Engine APIdiscoveryengine.googleapis.com): Vertex AI Reranker の機能を強化します。
  • Service Usage APIserviceusage.googleapis.com): サービス割り当ての確認と管理に必要です。

5. 仮想環境を作成して依存関係をインストールする

Python プロジェクトを開始する前に、仮想環境を作成することをおすすめします。これにより、プロジェクトの依存関係が分離され、他のプロジェクトやシステムのグローバル Python パッケージとの競合が回避されます。

  1. rag-labs という名前のフォルダを作成し、そのフォルダに移動します。ターミナルで次のコードを実行します。
    mkdir rag-labs && cd rag-labs
    
  2. 仮想環境を作成して有効にします。
    uv venv --python 3.12
    source .venv/bin/activate
    
  3. 必要な依存関係を含む requirements.txt ファイルを作成します。ターミナルで次のコードを実行します。
    cloudshell edit requirements.txt
    
  4. 次の最適化された依存関係を requirements.txt に貼り付けます。これらのバージョンは、競合を回避してインストールを高速化するために固定されています。
    # Core LangChain & AI
    langchain-community==0.3.31
    langchain-google-vertexai==2.1.2
    langchain-google-community[vertexaisearch]==2.0.10
    
    # Google Cloud
    google-cloud-storage==2.19.0
    google-cloud-aiplatform[langchain]==1.130.0
    
    # Database
    cloud-sql-python-connector[pg8000]==1.19.0
    sqlalchemy==2.0.45
    pgvector==0.4.2
    
    # Utilities
    tiktoken==0.12.0
    python-dotenv==1.2.1
    requests==2.32.5
    
  5. 依存関係をインストールします。
    uv pip install -r requirements.txt
    

6. Cloud SQL for PostgreSQL を設定する

このタスクでは、Cloud SQL for PostgreSQL インスタンスをプロビジョニングし、データベースを作成して、ベクトル検索の準備を行います。

Cloud SQL 構成を定義する

  1. 構成を保存する .env ファイルを作成します。ターミナルで次のコードを実行します。
    cloudshell edit .env
    
  2. 次の構成を .env に貼り付けます。
    # Project Config
    PROJECT_ID="[YOUR_PROJECT_ID]"
    REGION="us-central1"
    
    # Database Config
    SQL_INSTANCE_NAME="rag-pg-instance-1"
    SQL_DATABASE_NAME="rag_harry_potter_db"
    SQL_USER="rag_user"
    SQL_PASSWORD="StrongPassword123!" 
    
    # RAG Config
    PGVECTOR_COLLECTION_NAME="rag_harry_potter"
    RANKING_LOCATION_ID="global"
    
    # Connection Name (Auto-generated in scripts usually, but useful to have)
    DB_INSTANCE_CONNECTION_NAME="${PROJECT_ID}:${REGION}:${SQL_INSTANCE_NAME}"
    
  3. [YOUR_PROJECT_ID] は、実際の Google Cloud プロジェクト ID に置き換えます。(例: PROJECT_ID = "google-cloud-labs"
    プロジェクト ID を忘れた場合は、ターミナルで次のコマンドを実行します。すべてのプロジェクトとその ID のリストが表示されます。
    gcloud projects list
    
  4. 変数をシェル セッションに読み込みます。
    source .env
    

インスタンスとデータベースを作成する

  1. Cloud SQL for PostgreSQL インスタンスを作成します。このコマンドは、このラボに適した小規模なインスタンスを作成します。
    gcloud sql instances create ${SQL_INSTANCE_NAME} \
      --database-version=POSTGRES_15 \
      --tier=db-g1-small \
      --region=${REGION} \
      --project=${PROJECT_ID}
    
  2. インスタンスの準備ができたら、データベースを作成します。
    gcloud sql databases create ${SQL_DATABASE_NAME} \
      --instance=${SQL_INSTANCE_NAME} \
      --project=${PROJECT_ID}
    
  3. データベース ユーザーを作成します。
    gcloud sql users create ${SQL_USER} \
      --instance=${SQL_INSTANCE_NAME} \
      --password=${SQL_PASSWORD} \
      --project=${PROJECT_ID}
    

pgvector 拡張機能を有効にする

pgvector 拡張機能を使用すると、PostgreSQL でベクトル エンベディングを保存して検索できます。データベースで明示的に有効にする必要があります。

  1. enable_pgvector.py という名前のスクリプトを作成します。ターミナルで次のコードを実行します。
    cloudshell edit enable_pgvector.py
    
  2. 次のコードを enable_pgvector.py に貼り付けます。このスクリプトはデータベースに接続し、CREATE EXTENSION IF NOT EXISTS vector; を実行します。
    import os
    import sqlalchemy
    from google.cloud.sql.connector import Connector, IPTypes
    import logging
    from dotenv import load_dotenv
    
    load_dotenv()
    logging.basicConfig(level=logging.INFO)
    
    # Config
    project_id = os.getenv("PROJECT_ID")
    region = os.getenv("REGION")
    instance_name = os.getenv("SQL_INSTANCE_NAME")
    db_user = os.getenv("SQL_USER")
    db_pass = os.getenv("SQL_PASSWORD")
    db_name = os.getenv("SQL_DATABASE_NAME")
    instance_connection_name = f"{project_id}:{region}:{instance_name}"
    
    def getconn():
        with Connector() as connector:
            conn = connector.connect(
                instance_connection_name,
                "pg8000",
                user=db_user,
                password=db_pass,
                db=db_name,
                ip_type=IPTypes.PUBLIC,
            )
            return conn
    
    def enable_pgvector():
        pool = sqlalchemy.create_engine(
            "postgresql+pg8000://",
            creator=getconn,
        )
        with pool.connect() as db_conn:
            # Check if extension exists
            result = db_conn.execute(sqlalchemy.text("SELECT extname FROM pg_extension WHERE extname = 'vector';")).fetchone()
            if result:
                logging.info("pgvector extension is already enabled.")
            else:
                logging.info("Enabling pgvector extension...")
                db_conn.execute(sqlalchemy.text("CREATE EXTENSION IF NOT EXISTS vector;"))
                db_conn.commit()
                logging.info("pgvector extension enabled successfully.")
    
    if __name__ == "__main__":
        enable_pgvector()
    
  3. スクリプトを実行します。
    python enable_pgvector.py
    

7. パート 1: チャンク化戦略

RAG パイプラインの最初のステップは、LLM が理解できる形式(チャンク)にドキュメントを変換することです。

LLM にはコンテキスト ウィンドウの上限(一度に処理できるテキストの量)があります。また、特定の質問に答えるために 50 ページのドキュメントを取得すると、情報が希薄になります。ドキュメントを小さな「チャンク」に分割して、関連情報を分離します。

ただし、テキストの分割方法は非常に重要です。

  • 文字分割ツール: 文字数で厳密に分割します。これは高速ですが、リスクがあります。単語や文が半分にカットされ、意味が損なわれる可能性があります。
  • 再帰的分割ツール: まず段落で分割し、次に文、単語で分割しようとします。セマンティック単位をまとめて保持しようとします。
  • トークン スプリッタ: LLM 独自の語彙(トークン)に基づいて分割します。これにより、チャンクがコンテキスト ウィンドウに完全に収まりますが、生成に計算コストがかかる可能性があります。

このセクションでは、3 つの戦略すべてを使用して同じデータを取り込み、比較します。

取り込みスクリプトを作成する

ハリー・ポッターのデータセットをダウンロードし、CharacterRecursiveToken の各戦略を使用して分割し、エンベディングを Cloud SQL の 3 つの別々のテーブルにアップロードするスクリプトを使用します。

  1. ファイルingest_data.pyを作成する
    cloudshell edit ingest_data.py
    
  2. 次の修正済みコードを ingest_data.py に貼り付けます。このバージョンでは、データセットの JSON 構造が正しく解析されます。
    import os
    import json
    import logging
    import requests
    from typing import List, Dict, Any
    from dotenv import load_dotenv
    
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter, TokenTextSplitter
    from langchain.docstore.document import Document
    
    load_dotenv()
    logging.basicConfig(level=logging.INFO)
    
    # Configuration
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME")
    BOOKS_JSON_URL = "https://storage.googleapis.com/github-repo/generative-ai/gemini/reasoning-engine/sample_data/harry_potter_books.json"
    
    CHUNK_SIZE = 500
    CHUNK_OVERLAP = 50
    MAX_DOCS_TO_PROCESS = 10 
    
    # Database Connector
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def download_data():
        logging.info(f"Downloading data from {BOOKS_JSON_URL}...")
        response = requests.get(BOOKS_JSON_URL)
        return response.json()
    
    def prepare_chunks(json_data, strategy):
        documents = []
    
        # Iterate through the downloaded data
        for entry in json_data[:MAX_DOCS_TO_PROCESS]:
    
            # --- JSON PARSING LOGIC ---
            # The data structure nests content inside 'kwargs' -> 'page_content'
            if "kwargs" in entry and "page_content" in entry["kwargs"]:
                content = entry["kwargs"]["page_content"]
    
                # Extract metadata if available, ensuring it's a dict
                metadata = entry["kwargs"].get("metadata", {})
                if not isinstance(metadata, dict):
                    metadata = {"source": "unknown"}
    
                # Add the strategy to metadata for tracking
                metadata["strategy"] = strategy
            else:
                continue
    
            if not content:
                continue
    
            # Choose the splitter based on the strategy
            if strategy == "character":
                splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separator="\n")
            elif strategy == "token":
                splitter = TokenTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
            else: # default to recursive
                splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
    
            # Split the content into chunks
            chunks = splitter.split_text(content)
    
            # Create Document objects for each chunk
            for chunk in chunks:
                documents.append(Document(page_content=chunk, metadata=metadata))
    
        return documents
    
    def main():
        logging.info("Initializing Embeddings...")
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
    
        data = download_data()
        strategies = ["character", "recursive", "token"]
    
        # Connection string for PGVector (uses the getconn helper)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        for strategy in strategies:
            collection_name = f"{BASE_COLLECTION_NAME}_{strategy}"
            logging.info(f"--- Processing strategy: {strategy.upper()} ---")
            logging.info(f"Target Collection: {collection_name}")
    
            # Prepare documents with the specific strategy
            docs = prepare_chunks(data, strategy)
    
            if not docs:
                logging.warning(f"No documents generated for strategy {strategy}. Check data source.")
                continue
    
            logging.info(f"Generated {len(docs)} chunks. Uploading to Cloud SQL...")
    
            # Initialize the Vector Store
            store = PGVector(
                collection_name=collection_name,
                embedding_function=embeddings,
                connection_string=pg_conn_str,
                engine_args={"creator": getconn},
                pre_delete_collection=True # Clears old data for this collection before adding new
            )
    
            # Batch add documents
            store.add_documents(docs)
            logging.info(f"Successfully finished {strategy}.\n")
    
    if __name__ == "__main__":
        main()
    
  3. 取り込みスクリプトを実行します。これにより、データベースに 3 つの異なるテーブル(コレクション)が入力されます。
    python ingest_data.py
    

チャンク分割の結果を比較する

データが読み込まれたので、3 つのコレクションすべてに対してクエリを実行し、チャンク戦略が結果にどのように影響するかを確認しましょう。

  1. query_chunking.py を作成します。
    cloudshell edit query_chunking.py
    
  2. 次のコードを query_chunking.py に貼り付けます。
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR) # Only show errors to keep output clean
    
    # Config
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME")
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        query = "Tell me about the Dursleys and their relationship with Harry Potter"
        print(f"\nQUERY: {query}\n" + "="*50)
    
        strategies = ["character", "recursive", "token"]
    
        for strategy in strategies:
            collection = f"{BASE_COLLECTION_NAME}_{strategy}"
            print(f"\nSTRATEGY: {strategy.upper()}")
    
            store = PGVector(
                collection_name=collection,
                embedding_function=embeddings,
                connection_string=pg_conn_str,
                engine_args={"creator": getconn}
            )
    
            results = store.similarity_search_with_score(query, k=2)
            for i, (doc, score) in enumerate(results):
                print(f"  Result {i+1} (Score: {score:.4f}): {doc.page_content[:150].replace(chr(10), ' ')}...")
    
    if __name__ == "__main__":
        main()
    
  3. クエリ スクリプトを実行します。
    python query_chunking.py
    

出力を確認します。

文字分割では文の途中で切断されることがありますが、再帰では段落の境界が尊重されます。トークン分割では、チャンクが LLM コンテキスト ウィンドウに完全に収まるようにしますが、セマンティック構造が無視される可能性があります。

8. パート 2: 再ランキング

ベクトル検索(取得)は、圧縮された数式表現(エンベディング)に依存しているため、非常に高速です。関連性の高いアイテムをすべて見つける(再現率)ために広範囲に検索しますが、適合率が低くなる(アイテムのランキングが不完全になる)ことがよくあります。

関連性の高いドキュメントが、検索結果リストの中央に表示されることがよくあります。上位 5 件の結果に注目している LLM は、7 位にある重要な回答を見逃す可能性があります。

再ランキングは、第 2 段階を追加することでこの問題を解決します。

  1. Retriever: 高速ベクトル検索を使用して、より大きなセット(上位 25 件など)を取得します。
  2. Reranker: 特殊なモデル(Cross-Encoder など)を使用して、クエリとドキュメントのペアの全文を調べます。速度は遅くなりますが、精度は大幅に向上します。上位 25 件を再スコア化し、絶対的に優れた 3 件を返します。

このタスクでは、パート 1 で作成した recursive コレクションを検索しますが、今回は Vertex AI Reranker を適用して結果を絞り込みます。

  1. query_reranking.py を作成します。
    cloudshell edit query_reranking.py
    
  2. 次のコードを貼り付けます。_recursive コレクションを明示的にターゲットとし、ContextualCompressionRetriever を使用していることに注目してください。
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    
    # Reranking Imports
    from langchain.retrievers import ContextualCompressionRetriever
    from langchain_google_community.vertex_rank import VertexAIRank
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR)
    
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    
    # IMPORTANT: Target the recursive collection created in ingest_data.py
    COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"
    RANKING_LOCATION = os.getenv("RANKING_LOCATION_ID")
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        print(f"Connecting to collection: {COLLECTION_NAME}")
        store = PGVector(
            collection_name=COLLECTION_NAME,
            embedding_function=embeddings,
            connection_string=pg_conn_str,
            engine_args={"creator": getconn}
        )
    
        query = "What are the Horcruxes?"
        print(f"QUERY: {query}\n")
    
        # 1. Base Retriever (Vector Search) - Fetch top 10
        base_retriever = store.as_retriever(search_kwargs={"k": 10})
    
        # 2. Reranker - Select top 3 from the 10
        reranker = VertexAIRank(
            project_id=PROJECT_ID,
            location_id=RANKING_LOCATION,
            ranking_config="default_ranking_config",
            title_field="source",
            top_n=3
        )
    
        compression_retriever = ContextualCompressionRetriever(
            base_compressor=reranker,
            base_retriever=base_retriever
        )
    
        # Execute
        try:
            reranked_docs = compression_retriever.invoke(query)
    
            if not reranked_docs:
                print("No documents returned. Check if the collection exists and is populated.")
    
            print(f"--- Top 3 Reranked Results ---")
            for i, doc in enumerate(reranked_docs):
                print(f"Result {i+1} (Score: {doc.metadata.get('relevance_score', 'N/A')}):")
                print(f"  {doc.page_content[:200]}...\n")
        except Exception as e:
            print(f"Error during reranking: {e}")
    
    if __name__ == "__main__":
        main()
    
  3. 再ランキング クエリを実行します。
    python query_reranking.py
    

観察

関連性スコアが高くなったり、生のベクトル検索とは異なる順序になったりすることがあります。これにより、LLM は可能な限り正確なコンテキストを受け取ることができます。

9. パート 3: クエリ変換

多くの場合、RAG の最大のボトルネックはユーザーです。ユーザーのクエリは、あいまいだったり、不完全だったり、表現が不適切だったりすることがよくあります。クエリ エンベディングがドキュメント エンベディングと数学的に一致しない場合、取得は失敗します。

クエリ変換は、LLM を使用して、クエリがデータベースに到達するにクエリを書き換えたり、拡張したりします。次の 2 つの手法を実装します。

  • HyDE(Hypothetical Document Embeddings): 質問と回答のベクトル類似度は、回答と仮説回答の類似度よりも低いことがよくあります。HyDE は、LLM に完璧な回答をハルシネーションするよう求め、それをエンベディングして、ハルシネーションに似たドキュメントを検索します。
  • ステップバック プロンプト: ユーザーが具体的な詳細な質問をした場合、システムはより広範なコンテキストを見逃す可能性があります。ステップバック プロンプトでは、LLM に上位レベルの抽象的な質問(「この家族の歴史は?」)を生成して、具体的な詳細とともに基本的な情報を取得するよう指示します。
  1. query_transformation.py を作成します。
    cloudshell edit query_transformation.py
    
  2. 次のコードを貼り付けます。
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
    from langchain_community.vectorstores import PGVector
    from langchain_core.prompts import PromptTemplate
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR)
    
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def generate_hyde_doc(query, llm):
        prompt = PromptTemplate(
            input_variables=["question"],
            template="Write a concise, hypothetical answer to the question. Question: {question} Answer:"
        )
        chain = prompt | llm
        return chain.invoke({"question": query})
    
    def generate_step_back(query, llm):
        prompt = PromptTemplate(
            input_variables=["question"],
            template="Write a more general, abstract question that concepts in this question. Original: {question} Step-back:"
        )
        chain = prompt | llm
        return chain.invoke({"question": query})
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.5)
    
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
        store = PGVector(
            collection_name=COLLECTION_NAME,
            embedding_function=embeddings,
            connection_string=pg_conn_str,
            engine_args={"creator": getconn}
        )
        retriever = store.as_retriever(search_kwargs={"k": 2})
    
        original_query = "Tell me about the Dursleys."
        print(f"ORIGINAL QUERY: {original_query}\n" + "-"*30)
    
        # 1. HyDE
        hyde_doc = generate_hyde_doc(original_query, llm)
        print(f"HyDE Generated Doc: {hyde_doc.strip()[:100]}...")
        hyde_results = retriever.invoke(hyde_doc)
        print(f"HyDE Retrieval: {hyde_results[0].page_content[:100]}...\n")
    
        # 2. Step-back
        step_back_q = generate_step_back(original_query, llm)
        print(f"Step-back Query: {step_back_q.strip()}")
        step_results = retriever.invoke(step_back_q)
        print(f"Step-back Retrieval: {step_results[0].page_content[:100]}...")
    
    if __name__ == "__main__":
        main()
    
  3. 変換スクリプトを実行します。
    python query_transformation.py
    

出力を確認します。

ステップバック クエリはダーズリー家の歴史に関する広範なコンテキストを取得する可能性がありますが、HyDE は仮説的な回答で生成された特定の詳細に焦点を当てています。

10. パート 4: エンドツーエンドの生成

データを分割し、検索を絞り込み、ユーザーのクエリを洗練させました。これで、RAG の「G」である生成について説明します。

これまでは、情報を見つけることしかできませんでした。真の AI アシスタントを構築するには、高品質で再ランキングされたドキュメントを LLM(Gemini)にフィードして、自然言語の回答を合成する必要があります。

本番環境パイプラインでは、これには特定のフローが含まれます。

  1. 取得: 幅広い候補のセットを取得します(例: 上位 10 件)を高速ベクトル検索で取得します。
  2. 再ランキング: 最高の候補に絞り込みます(例: 上位 3 件)を返します。
  3. コンテキストの構築: 上位 3 つのドキュメントのコンテンツを 1 つの文字列に結合します。
  4. グラウンディング プロンプト: そのコンテキスト文字列を厳密なプロンプト テンプレートに挿入し、LLM にその情報のみを使用させます。

生成スクリプトを作成する

生成ステップでは gemini-2.5-flash を使用します。このモデルは、コンテキスト ウィンドウが長く、レイテンシが低いため、RAG に最適です。これにより、複数の取得されたドキュメントを迅速に処理できます。

  1. end_to_end_rag.py を作成します。
cloudshell edit end_to_end_rag.py
  1. 次のコードを貼り付けます。template 変数に注目してください。これは、提供されたコンテキストにバインドすることで、モデルが「ハルシネーション」(でっち上げ)を回避するように厳密に指示する場所です。
import os
import logging
from dotenv import load_dotenv
from google.cloud.sql.connector import Connector, IPTypes
from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
from langchain_community.vectorstores import PGVector
from langchain.retrievers import ContextualCompressionRetriever
from langchain_google_community.vertex_rank import VertexAIRank
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

load_dotenv()
logging.basicConfig(level=logging.ERROR)

PROJECT_ID = os.getenv("PROJECT_ID")
REGION = os.getenv("REGION")
# We use the recursive collection as it generally provides the best context boundaries
COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"

def getconn():
    instance_conn = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    with Connector() as connector:
        return connector.connect(
            instance_conn, "pg8000",
            user=os.getenv("SQL_USER"), password=os.getenv("SQL_PASSWORD"),
            db=os.getenv("SQL_DATABASE_NAME"), ip_type=IPTypes.PUBLIC
        )

def main():
    print("--- Initializing Production RAG Pipeline ---")

    # 1. Setup Embeddings (Gemini Embedding 001)
    # We use this to vectorize the user's query to match our database.
    embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)

    # 2. Connect to Vector Store
    pg_conn_str = f"postgresql+pg8000://{os.getenv('SQL_USER')}:{os.getenv('SQL_PASSWORD')}@placeholder/{os.getenv('SQL_DATABASE_NAME')}"
    store = PGVector(
        collection_name=COLLECTION_NAME,
        embedding_function=embeddings,
        connection_string=pg_conn_str,
        engine_args={"creator": getconn}
    )

    # 3. Setup The 'Filter Funnel' (Retriever + Reranker)
    # Step A: Fast retrieval of top 10 similar documents
    base_retriever = store.as_retriever(search_kwargs={"k": 10})

    # Step B: Precise reranking to find the top 3 most relevant
    reranker = VertexAIRank(
        project_id=PROJECT_ID,
        location_id="global", 
        ranking_config="default_ranking_config",
        title_field="source",
        top_n=3
    )

    # Combine A and B into a single retrieval object
    compression_retriever = ContextualCompressionRetriever(
        base_compressor=reranker,
        base_retriever=base_retriever
    )

    # 4. Setup LLM (Gemini 2.5 Flash)
    # We use a low temperature (0.1) to reduce creativity and increase factual adherence.
    llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.1)

    # --- Execution Loop ---
    user_query = "Who is Harry Potter?"
    print(f"\nUser Query: {user_query}")
    print("Retrieving and Reranking documents...")

    # Retrieve the most relevant documents
    top_docs = compression_retriever.invoke(user_query)

    if not top_docs:
        print("No relevant documents found.")
        return

    # Build the Context String
    # We stitch the documents together, labeling them as Source 1, Source 2, etc.
    context_str = "\n\n".join([f"Source {i+1}: {d.page_content}" for i, d in enumerate(top_docs)])

    print(f"Found {len(top_docs)} relevant context chunks.")

    # 5. The Grounded Prompt
    template = """You are a helpful assistant. Answer the question strictly based on the provided context.
    If the answer is not in the context, say "I don't know."

    Context:
    {context}

    Question:
    {question}

    Answer:
    """

    prompt = PromptTemplate(template=template, input_variables=["context", "question"])

    # Create the chain: Prompt -> LLM
    chain = prompt | llm

    print("Generating Answer via Gemini 2.5 Flash...")
    final_answer = chain.invoke({"context": context_str, "question": user_query})

    print(f"\nFINAL ANSWER:\n{final_answer}")

if __name__ == "__main__":
    main()
  1. 最終的なアプリケーションを実行します。
python end_to_end_rag.py

出力を理解する

このスクリプトを実行して、取得した未加工のチャンク(前の手順で確認したチャンク)と最終的な回答の違いを確認します。LLM はシンセサイザーとして機能します。Reranker から提供されたテキストの断片的な「チャンク」を読み取り、それらを一貫性のある人間が読める文に変換します。

これらのコンポーネントを連結することで、確率的な「推測」から決定論的なワークフローに移行します。Retriever が網を投げ、Reranker が最高の獲物を選択し、Generator が料理を作ります。

11. まとめ

おめでとうございます!基本的なベクトル検索をはるかに超える高度な RAG パイプラインを構築できました。

内容のまとめ

  • スケーラブルなベクトル ストレージ用に pgvector を使用して Cloud SQL を構成しました。
  • チャンク化戦略を比較して、データ準備が取得にどのように影響するかを理解しました。
  • Vertex AI で再ランキングを実装して、結果の精度を向上させました。
  • クエリ変換(HyDE、Step-back)を使用して、ユーザーの意図とデータを一致させました。

詳細

プロトタイプから製品版へ

このラボは、「Google Cloud でのプロダクション レディな AI の開発」学習プログラムの一部です。

  • カリキュラム全体で、プロトタイプから本番環境への移行を支援します。
  • ハッシュタグ #ProductionReadyAI を使用して進捗状況を共有します。