1. はじめに
概要
検索拡張生成(RAG)は、外部の知識に基づいて大規模言語モデル(LLM)の回答をグラウンディングすることで、回答の品質を高めます。ただし、本番環境対応の RAG システムを構築するには、単純なベクトル検索以上のものが必要です。データの取り込み方法、関連性の高い結果のランキング方法、ユーザーのクエリの処理方法を最適化する必要があります。
この包括的なラボでは、Cloud SQL for PostgreSQL(pgvector で拡張)と Vertex AI を使用して、堅牢な RAG アプリケーションを構築します。次の 3 つの高度な手法について学習します。
- チャンク化戦略: テキストを分割するさまざまな方法(文字、再帰、トークン)が検索の品質にどのように影響するかを確認します。
- 再ランキング: Vertex AI Reranker を実装して、検索結果を絞り込み、「lost in the middle」の問題に対処します。
- クエリ変換: Gemini を使用して、HyDE(仮説ドキュメント エンベディング)やステップバック プロンプトなどの手法でユーザー クエリを最適化します。
演習内容
pgvectorを使用して Cloud SQL for PostgreSQL インスタンスを設定します。- 複数の戦略を使用してテキストをチャンク化し、エンベディングを Cloud SQL に保存するデータ取り込みパイプラインを構築します。
- セマンティック検索を実行し、さまざまなチャンク方法による結果の品質を比較します。
- Reranker を統合して、関連性に基づいて取得したドキュメントを並べ替えます。
- LLM を活用したクエリ変換を実装して、あいまいな質問や複雑な質問に対する検索を改善します。
学習内容
- Vertex AI と Cloud SQL で LangChain を使用する方法。
- 文字、再帰、トークンのテキスト分割の影響。
- PostgreSQL で ベクトル検索を実装する方法。
- 再ランキングに ContextualCompressionRetriever を使用する方法。
- HyDE と Step-back Prompting を実装する方法。
2. プロジェクトの設定
Google アカウント
個人の Google アカウントをお持ちでない場合は、Google アカウントを作成する必要があります。
仕事用または学校用アカウントではなく、個人アカウントを使用します。
Google Cloud コンソールにログインする
個人の Google アカウントを使用して Google Cloud コンソールにログインします。
課金を有効にする
5 ドル分の Google Cloud クレジットを利用する(省略可)
このワークショップを実施するには、クレジットが設定された請求先アカウントが必要です。独自の請求を使用する予定の場合は、この手順をスキップできます。
- こちらのリンクをクリックし、個人の Google アカウントでログインします。次のような出力が表示されます。

- [クレジットにアクセスするにはこちらをクリック] ボタンをクリックします。お支払いプロファイルを設定するページが表示されます。

- [確認] をクリックします。これで、Google Cloud Platform トライアルの請求先アカウントに接続されました。

個人用の請求先アカウントを設定する
Google Cloud クレジットを使用して課金を設定した場合は、この手順をスキップできます。
個人用の請求先アカウントを設定するには、Cloud コンソールでこちらに移動して課金を有効にします。
注意事項:
- このラボを完了するのにかかる Cloud リソースの費用は 1 米ドル未満です。
- このラボの最後の手順に沿ってリソースを削除すると、それ以上の料金は発生しません。
- 新規ユーザーは、300 米ドル分の無料トライアルをご利用いただけます。
プロジェクトの作成(省略可)
このラボで使用する現在のプロジェクトがない場合は、こちらで新しいプロジェクトを作成します。
3. Cloud Shell エディタを開く
- このリンクをクリックすると、Cloud Shell エディタに直接移動します。
- 本日、承認を求めるメッセージがどこかの時点で表示された場合は、[承認] をクリックして続行します。

- ターミナルが画面の下部に表示されない場合は、ターミナルを開きます。
- [表示] をクリックします。
- [ターミナル] をクリックします。

- ターミナルで、次のコマンドを使用してプロジェクトを設定します。
gcloud config set project [PROJECT_ID]- 例:
gcloud config set project lab-project-id-example - プロジェクト ID が思い出せない場合は、次のコマンドでプロジェクト ID をすべて一覧表示できます。
gcloud projects list
- 例:
- 次のようなメッセージが表示されます。
Updated property [core/project].
4. API を有効にする
このソリューションを構築するには、Vertex AI、Cloud SQL、Reranking サービス用に複数の Google Cloud APIs を有効にする必要があります。
- ターミナルで API を有効にします。
gcloud services enable \ aiplatform.googleapis.com \ sqladmin.googleapis.com \ cloudresourcemanager.googleapis.com \ serviceusage.googleapis.com \ discoveryengine.googleapis.com
API の概要
- Vertex AI API(
aiplatform.googleapis.com): Gemini を使用して生成し、Vertex AI Embeddings を使用してテキストをベクトル化できます。 - Cloud SQL Admin API(
sqladmin.googleapis.com): Cloud SQL インスタンスをプログラムで管理できます。 - Discovery Engine API(
discoveryengine.googleapis.com): Vertex AI Reranker の機能を強化します。 - Service Usage API(
serviceusage.googleapis.com): サービス割り当ての確認と管理に必要です。
5. 仮想環境を作成して依存関係をインストールする
Python プロジェクトを開始する前に、仮想環境を作成することをおすすめします。これにより、プロジェクトの依存関係が分離され、他のプロジェクトやシステムのグローバル Python パッケージとの競合が回避されます。
rag-labsという名前のフォルダを作成し、そのフォルダに移動します。ターミナルで次のコードを実行します。mkdir rag-labs && cd rag-labs- 仮想環境を作成して有効にします。
uv venv --python 3.12 source .venv/bin/activate - 必要な依存関係を含む
requirements.txtファイルを作成します。ターミナルで次のコードを実行します。cloudshell edit requirements.txt - 次の最適化された依存関係を
requirements.txtに貼り付けます。これらのバージョンは、競合を回避してインストールを高速化するために固定されています。# Core LangChain & AI langchain-community==0.3.31 langchain-google-vertexai==2.1.2 langchain-google-community[vertexaisearch]==2.0.10 # Google Cloud google-cloud-storage==2.19.0 google-cloud-aiplatform[langchain]==1.130.0 # Database cloud-sql-python-connector[pg8000]==1.19.0 sqlalchemy==2.0.45 pgvector==0.4.2 # Utilities tiktoken==0.12.0 python-dotenv==1.2.1 requests==2.32.5 - 依存関係をインストールします。
uv pip install -r requirements.txt
6. Cloud SQL for PostgreSQL を設定する
このタスクでは、Cloud SQL for PostgreSQL インスタンスをプロビジョニングし、データベースを作成して、ベクトル検索の準備を行います。
Cloud SQL 構成を定義する
- 構成を保存する
.envファイルを作成します。ターミナルで次のコードを実行します。cloudshell edit .env - 次の構成を
.envに貼り付けます。# Project Config PROJECT_ID="[YOUR_PROJECT_ID]" REGION="us-central1" # Database Config SQL_INSTANCE_NAME="rag-pg-instance-1" SQL_DATABASE_NAME="rag_harry_potter_db" SQL_USER="rag_user" SQL_PASSWORD="StrongPassword123!" # RAG Config PGVECTOR_COLLECTION_NAME="rag_harry_potter" RANKING_LOCATION_ID="global" # Connection Name (Auto-generated in scripts usually, but useful to have) DB_INSTANCE_CONNECTION_NAME="${PROJECT_ID}:${REGION}:${SQL_INSTANCE_NAME}" [YOUR_PROJECT_ID]は、実際の Google Cloud プロジェクト ID に置き換えます。(例:PROJECT_ID = "google-cloud-labs")
プロジェクト ID を忘れた場合は、ターミナルで次のコマンドを実行します。すべてのプロジェクトとその ID のリストが表示されます。gcloud projects list- 変数をシェル セッションに読み込みます。
source .env
インスタンスとデータベースを作成する
- Cloud SQL for PostgreSQL インスタンスを作成します。このコマンドは、このラボに適した小規模なインスタンスを作成します。
gcloud sql instances create ${SQL_INSTANCE_NAME} \ --database-version=POSTGRES_15 \ --tier=db-g1-small \ --region=${REGION} \ --project=${PROJECT_ID} - インスタンスの準備ができたら、データベースを作成します。
gcloud sql databases create ${SQL_DATABASE_NAME} \ --instance=${SQL_INSTANCE_NAME} \ --project=${PROJECT_ID} - データベース ユーザーを作成します。
gcloud sql users create ${SQL_USER} \ --instance=${SQL_INSTANCE_NAME} \ --password=${SQL_PASSWORD} \ --project=${PROJECT_ID}
pgvector 拡張機能を有効にする
pgvector 拡張機能を使用すると、PostgreSQL でベクトル エンベディングを保存して検索できます。データベースで明示的に有効にする必要があります。
enable_pgvector.pyという名前のスクリプトを作成します。ターミナルで次のコードを実行します。cloudshell edit enable_pgvector.py- 次のコードを
enable_pgvector.pyに貼り付けます。このスクリプトはデータベースに接続し、CREATE EXTENSION IF NOT EXISTS vector;を実行します。import os import sqlalchemy from google.cloud.sql.connector import Connector, IPTypes import logging from dotenv import load_dotenv load_dotenv() logging.basicConfig(level=logging.INFO) # Config project_id = os.getenv("PROJECT_ID") region = os.getenv("REGION") instance_name = os.getenv("SQL_INSTANCE_NAME") db_user = os.getenv("SQL_USER") db_pass = os.getenv("SQL_PASSWORD") db_name = os.getenv("SQL_DATABASE_NAME") instance_connection_name = f"{project_id}:{region}:{instance_name}" def getconn(): with Connector() as connector: conn = connector.connect( instance_connection_name, "pg8000", user=db_user, password=db_pass, db=db_name, ip_type=IPTypes.PUBLIC, ) return conn def enable_pgvector(): pool = sqlalchemy.create_engine( "postgresql+pg8000://", creator=getconn, ) with pool.connect() as db_conn: # Check if extension exists result = db_conn.execute(sqlalchemy.text("SELECT extname FROM pg_extension WHERE extname = 'vector';")).fetchone() if result: logging.info("pgvector extension is already enabled.") else: logging.info("Enabling pgvector extension...") db_conn.execute(sqlalchemy.text("CREATE EXTENSION IF NOT EXISTS vector;")) db_conn.commit() logging.info("pgvector extension enabled successfully.") if __name__ == "__main__": enable_pgvector() - スクリプトを実行します。
python enable_pgvector.py
7. パート 1: チャンク化戦略
RAG パイプラインの最初のステップは、LLM が理解できる形式(チャンク)にドキュメントを変換することです。
LLM にはコンテキスト ウィンドウの上限(一度に処理できるテキストの量)があります。また、特定の質問に答えるために 50 ページのドキュメントを取得すると、情報が希薄になります。ドキュメントを小さな「チャンク」に分割して、関連情報を分離します。
ただし、テキストの分割方法は非常に重要です。
- 文字分割ツール: 文字数で厳密に分割します。これは高速ですが、リスクがあります。単語や文が半分にカットされ、意味が損なわれる可能性があります。
- 再帰的分割ツール: まず段落で分割し、次に文、単語で分割しようとします。セマンティック単位をまとめて保持しようとします。
- トークン スプリッタ: LLM 独自の語彙(トークン)に基づいて分割します。これにより、チャンクがコンテキスト ウィンドウに完全に収まりますが、生成に計算コストがかかる可能性があります。
このセクションでは、3 つの戦略すべてを使用して同じデータを取り込み、比較します。
取り込みスクリプトを作成する
ハリー・ポッターのデータセットをダウンロードし、Character、Recursive、Token の各戦略を使用して分割し、エンベディングを Cloud SQL の 3 つの別々のテーブルにアップロードするスクリプトを使用します。
- ファイル
ingest_data.pyを作成するcloudshell edit ingest_data.py - 次の修正済みコードを
ingest_data.pyに貼り付けます。このバージョンでは、データセットの JSON 構造が正しく解析されます。import os import json import logging import requests from typing import List, Dict, Any from dotenv import load_dotenv from google.cloud.sql.connector import Connector, IPTypes from langchain_google_vertexai import VertexAIEmbeddings from langchain_community.vectorstores import PGVector from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter, TokenTextSplitter from langchain.docstore.document import Document load_dotenv() logging.basicConfig(level=logging.INFO) # Configuration PROJECT_ID = os.getenv("PROJECT_ID") REGION = os.getenv("REGION") DB_USER = os.getenv("SQL_USER") DB_PASS = os.getenv("SQL_PASSWORD") DB_NAME = os.getenv("SQL_DATABASE_NAME") INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}" BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME") BOOKS_JSON_URL = "https://storage.googleapis.com/github-repo/generative-ai/gemini/reasoning-engine/sample_data/harry_potter_books.json" CHUNK_SIZE = 500 CHUNK_OVERLAP = 50 MAX_DOCS_TO_PROCESS = 10 # Database Connector def getconn(): with Connector() as connector: return connector.connect( INSTANCE_CONNECTION_NAME, "pg8000", user=DB_USER, password=DB_PASS, db=DB_NAME, ip_type=IPTypes.PUBLIC, ) def download_data(): logging.info(f"Downloading data from {BOOKS_JSON_URL}...") response = requests.get(BOOKS_JSON_URL) return response.json() def prepare_chunks(json_data, strategy): documents = [] # Iterate through the downloaded data for entry in json_data[:MAX_DOCS_TO_PROCESS]: # --- JSON PARSING LOGIC --- # The data structure nests content inside 'kwargs' -> 'page_content' if "kwargs" in entry and "page_content" in entry["kwargs"]: content = entry["kwargs"]["page_content"] # Extract metadata if available, ensuring it's a dict metadata = entry["kwargs"].get("metadata", {}) if not isinstance(metadata, dict): metadata = {"source": "unknown"} # Add the strategy to metadata for tracking metadata["strategy"] = strategy else: continue if not content: continue # Choose the splitter based on the strategy if strategy == "character": splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separator="\n") elif strategy == "token": splitter = TokenTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP) else: # default to recursive splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP) # Split the content into chunks chunks = splitter.split_text(content) # Create Document objects for each chunk for chunk in chunks: documents.append(Document(page_content=chunk, metadata=metadata)) return documents def main(): logging.info("Initializing Embeddings...") embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION) data = download_data() strategies = ["character", "recursive", "token"] # Connection string for PGVector (uses the getconn helper) pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}" for strategy in strategies: collection_name = f"{BASE_COLLECTION_NAME}_{strategy}" logging.info(f"--- Processing strategy: {strategy.upper()} ---") logging.info(f"Target Collection: {collection_name}") # Prepare documents with the specific strategy docs = prepare_chunks(data, strategy) if not docs: logging.warning(f"No documents generated for strategy {strategy}. Check data source.") continue logging.info(f"Generated {len(docs)} chunks. Uploading to Cloud SQL...") # Initialize the Vector Store store = PGVector( collection_name=collection_name, embedding_function=embeddings, connection_string=pg_conn_str, engine_args={"creator": getconn}, pre_delete_collection=True # Clears old data for this collection before adding new ) # Batch add documents store.add_documents(docs) logging.info(f"Successfully finished {strategy}.\n") if __name__ == "__main__": main() - 取り込みスクリプトを実行します。これにより、データベースに 3 つの異なるテーブル(コレクション)が入力されます。
python ingest_data.py
チャンク分割の結果を比較する
データが読み込まれたので、3 つのコレクションすべてに対してクエリを実行し、チャンク戦略が結果にどのように影響するかを確認しましょう。
query_chunking.pyを作成します。cloudshell edit query_chunking.py- 次のコードを
query_chunking.pyに貼り付けます。import os import logging from dotenv import load_dotenv from google.cloud.sql.connector import Connector, IPTypes from langchain_google_vertexai import VertexAIEmbeddings from langchain_community.vectorstores import PGVector load_dotenv() logging.basicConfig(level=logging.ERROR) # Only show errors to keep output clean # Config PROJECT_ID = os.getenv("PROJECT_ID") REGION = os.getenv("REGION") DB_USER = os.getenv("SQL_USER") DB_PASS = os.getenv("SQL_PASSWORD") DB_NAME = os.getenv("SQL_DATABASE_NAME") INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}" BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME") def getconn(): with Connector() as connector: return connector.connect( INSTANCE_CONNECTION_NAME, "pg8000", user=DB_USER, password=DB_PASS, db=DB_NAME, ip_type=IPTypes.PUBLIC, ) def main(): embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION) pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}" query = "Tell me about the Dursleys and their relationship with Harry Potter" print(f"\nQUERY: {query}\n" + "="*50) strategies = ["character", "recursive", "token"] for strategy in strategies: collection = f"{BASE_COLLECTION_NAME}_{strategy}" print(f"\nSTRATEGY: {strategy.upper()}") store = PGVector( collection_name=collection, embedding_function=embeddings, connection_string=pg_conn_str, engine_args={"creator": getconn} ) results = store.similarity_search_with_score(query, k=2) for i, (doc, score) in enumerate(results): print(f" Result {i+1} (Score: {score:.4f}): {doc.page_content[:150].replace(chr(10), ' ')}...") if __name__ == "__main__": main() - クエリ スクリプトを実行します。
python query_chunking.py
出力を確認します。
文字分割では文の途中で切断されることがありますが、再帰では段落の境界が尊重されます。トークン分割では、チャンクが LLM コンテキスト ウィンドウに完全に収まるようにしますが、セマンティック構造が無視される可能性があります。
8. パート 2: 再ランキング
ベクトル検索(取得)は、圧縮された数式表現(エンベディング)に依存しているため、非常に高速です。関連性の高いアイテムをすべて見つける(再現率)ために広範囲に検索しますが、適合率が低くなる(アイテムのランキングが不完全になる)ことがよくあります。
関連性の高いドキュメントが、検索結果リストの中央に表示されることがよくあります。上位 5 件の結果に注目している LLM は、7 位にある重要な回答を見逃す可能性があります。
再ランキングは、第 2 段階を追加することでこの問題を解決します。
- Retriever: 高速ベクトル検索を使用して、より大きなセット(上位 25 件など)を取得します。
- Reranker: 特殊なモデル(Cross-Encoder など)を使用して、クエリとドキュメントのペアの全文を調べます。速度は遅くなりますが、精度は大幅に向上します。上位 25 件を再スコア化し、絶対的に優れた 3 件を返します。
このタスクでは、パート 1 で作成した recursive コレクションを検索しますが、今回は Vertex AI Reranker を適用して結果を絞り込みます。
query_reranking.pyを作成します。cloudshell edit query_reranking.py- 次のコードを貼り付けます。
_recursiveコレクションを明示的にターゲットとし、ContextualCompressionRetrieverを使用していることに注目してください。import os import logging from dotenv import load_dotenv from google.cloud.sql.connector import Connector, IPTypes from langchain_google_vertexai import VertexAIEmbeddings from langchain_community.vectorstores import PGVector # Reranking Imports from langchain.retrievers import ContextualCompressionRetriever from langchain_google_community.vertex_rank import VertexAIRank load_dotenv() logging.basicConfig(level=logging.ERROR) PROJECT_ID = os.getenv("PROJECT_ID") REGION = os.getenv("REGION") DB_USER = os.getenv("SQL_USER") DB_PASS = os.getenv("SQL_PASSWORD") DB_NAME = os.getenv("SQL_DATABASE_NAME") INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}" # IMPORTANT: Target the recursive collection created in ingest_data.py COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive" RANKING_LOCATION = os.getenv("RANKING_LOCATION_ID") def getconn(): with Connector() as connector: return connector.connect( INSTANCE_CONNECTION_NAME, "pg8000", user=DB_USER, password=DB_PASS, db=DB_NAME, ip_type=IPTypes.PUBLIC, ) def main(): embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION) pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}" print(f"Connecting to collection: {COLLECTION_NAME}") store = PGVector( collection_name=COLLECTION_NAME, embedding_function=embeddings, connection_string=pg_conn_str, engine_args={"creator": getconn} ) query = "What are the Horcruxes?" print(f"QUERY: {query}\n") # 1. Base Retriever (Vector Search) - Fetch top 10 base_retriever = store.as_retriever(search_kwargs={"k": 10}) # 2. Reranker - Select top 3 from the 10 reranker = VertexAIRank( project_id=PROJECT_ID, location_id=RANKING_LOCATION, ranking_config="default_ranking_config", title_field="source", top_n=3 ) compression_retriever = ContextualCompressionRetriever( base_compressor=reranker, base_retriever=base_retriever ) # Execute try: reranked_docs = compression_retriever.invoke(query) if not reranked_docs: print("No documents returned. Check if the collection exists and is populated.") print(f"--- Top 3 Reranked Results ---") for i, doc in enumerate(reranked_docs): print(f"Result {i+1} (Score: {doc.metadata.get('relevance_score', 'N/A')}):") print(f" {doc.page_content[:200]}...\n") except Exception as e: print(f"Error during reranking: {e}") if __name__ == "__main__": main() - 再ランキング クエリを実行します。
python query_reranking.py
観察
関連性スコアが高くなったり、生のベクトル検索とは異なる順序になったりすることがあります。これにより、LLM は可能な限り正確なコンテキストを受け取ることができます。
9. パート 3: クエリ変換
多くの場合、RAG の最大のボトルネックはユーザーです。ユーザーのクエリは、あいまいだったり、不完全だったり、表現が不適切だったりすることがよくあります。クエリ エンベディングがドキュメント エンベディングと数学的に一致しない場合、取得は失敗します。
クエリ変換は、LLM を使用して、クエリがデータベースに到達する前にクエリを書き換えたり、拡張したりします。次の 2 つの手法を実装します。
- HyDE(Hypothetical Document Embeddings): 質問と回答のベクトル類似度は、回答と仮説回答の類似度よりも低いことがよくあります。HyDE は、LLM に完璧な回答をハルシネーションするよう求め、それをエンベディングして、ハルシネーションに似たドキュメントを検索します。
- ステップバック プロンプト: ユーザーが具体的な詳細な質問をした場合、システムはより広範なコンテキストを見逃す可能性があります。ステップバック プロンプトでは、LLM に上位レベルの抽象的な質問(「この家族の歴史は?」)を生成して、具体的な詳細とともに基本的な情報を取得するよう指示します。
query_transformation.pyを作成します。cloudshell edit query_transformation.py- 次のコードを貼り付けます。
import os import logging from dotenv import load_dotenv from google.cloud.sql.connector import Connector, IPTypes from langchain_google_vertexai import VertexAIEmbeddings, VertexAI from langchain_community.vectorstores import PGVector from langchain_core.prompts import PromptTemplate load_dotenv() logging.basicConfig(level=logging.ERROR) PROJECT_ID = os.getenv("PROJECT_ID") REGION = os.getenv("REGION") DB_USER = os.getenv("SQL_USER") DB_PASS = os.getenv("SQL_PASSWORD") DB_NAME = os.getenv("SQL_DATABASE_NAME") INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}" COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive" def getconn(): with Connector() as connector: return connector.connect( INSTANCE_CONNECTION_NAME, "pg8000", user=DB_USER, password=DB_PASS, db=DB_NAME, ip_type=IPTypes.PUBLIC, ) def generate_hyde_doc(query, llm): prompt = PromptTemplate( input_variables=["question"], template="Write a concise, hypothetical answer to the question. Question: {question} Answer:" ) chain = prompt | llm return chain.invoke({"question": query}) def generate_step_back(query, llm): prompt = PromptTemplate( input_variables=["question"], template="Write a more general, abstract question that concepts in this question. Original: {question} Step-back:" ) chain = prompt | llm return chain.invoke({"question": query}) def main(): embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION) llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.5) pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}" store = PGVector( collection_name=COLLECTION_NAME, embedding_function=embeddings, connection_string=pg_conn_str, engine_args={"creator": getconn} ) retriever = store.as_retriever(search_kwargs={"k": 2}) original_query = "Tell me about the Dursleys." print(f"ORIGINAL QUERY: {original_query}\n" + "-"*30) # 1. HyDE hyde_doc = generate_hyde_doc(original_query, llm) print(f"HyDE Generated Doc: {hyde_doc.strip()[:100]}...") hyde_results = retriever.invoke(hyde_doc) print(f"HyDE Retrieval: {hyde_results[0].page_content[:100]}...\n") # 2. Step-back step_back_q = generate_step_back(original_query, llm) print(f"Step-back Query: {step_back_q.strip()}") step_results = retriever.invoke(step_back_q) print(f"Step-back Retrieval: {step_results[0].page_content[:100]}...") if __name__ == "__main__": main() - 変換スクリプトを実行します。
python query_transformation.py
出力を確認します。
ステップバック クエリはダーズリー家の歴史に関する広範なコンテキストを取得する可能性がありますが、HyDE は仮説的な回答で生成された特定の詳細に焦点を当てています。
10. パート 4: エンドツーエンドの生成
データを分割し、検索を絞り込み、ユーザーのクエリを洗練させました。これで、RAG の「G」である生成について説明します。
これまでは、情報を見つけることしかできませんでした。真の AI アシスタントを構築するには、高品質で再ランキングされたドキュメントを LLM(Gemini)にフィードして、自然言語の回答を合成する必要があります。
本番環境パイプラインでは、これには特定のフローが含まれます。
- 取得: 幅広い候補のセットを取得します(例: 上位 10 件)を高速ベクトル検索で取得します。
- 再ランキング: 最高の候補に絞り込みます(例: 上位 3 件)を返します。
- コンテキストの構築: 上位 3 つのドキュメントのコンテンツを 1 つの文字列に結合します。
- グラウンディング プロンプト: そのコンテキスト文字列を厳密なプロンプト テンプレートに挿入し、LLM にその情報のみを使用させます。
生成スクリプトを作成する
生成ステップでは gemini-2.5-flash を使用します。このモデルは、コンテキスト ウィンドウが長く、レイテンシが低いため、RAG に最適です。これにより、複数の取得されたドキュメントを迅速に処理できます。
end_to_end_rag.pyを作成します。
cloudshell edit end_to_end_rag.py
- 次のコードを貼り付けます。
template変数に注目してください。これは、提供されたコンテキストにバインドすることで、モデルが「ハルシネーション」(でっち上げ)を回避するように厳密に指示する場所です。
import os
import logging
from dotenv import load_dotenv
from google.cloud.sql.connector import Connector, IPTypes
from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
from langchain_community.vectorstores import PGVector
from langchain.retrievers import ContextualCompressionRetriever
from langchain_google_community.vertex_rank import VertexAIRank
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
load_dotenv()
logging.basicConfig(level=logging.ERROR)
PROJECT_ID = os.getenv("PROJECT_ID")
REGION = os.getenv("REGION")
# We use the recursive collection as it generally provides the best context boundaries
COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"
def getconn():
instance_conn = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
with Connector() as connector:
return connector.connect(
instance_conn, "pg8000",
user=os.getenv("SQL_USER"), password=os.getenv("SQL_PASSWORD"),
db=os.getenv("SQL_DATABASE_NAME"), ip_type=IPTypes.PUBLIC
)
def main():
print("--- Initializing Production RAG Pipeline ---")
# 1. Setup Embeddings (Gemini Embedding 001)
# We use this to vectorize the user's query to match our database.
embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
# 2. Connect to Vector Store
pg_conn_str = f"postgresql+pg8000://{os.getenv('SQL_USER')}:{os.getenv('SQL_PASSWORD')}@placeholder/{os.getenv('SQL_DATABASE_NAME')}"
store = PGVector(
collection_name=COLLECTION_NAME,
embedding_function=embeddings,
connection_string=pg_conn_str,
engine_args={"creator": getconn}
)
# 3. Setup The 'Filter Funnel' (Retriever + Reranker)
# Step A: Fast retrieval of top 10 similar documents
base_retriever = store.as_retriever(search_kwargs={"k": 10})
# Step B: Precise reranking to find the top 3 most relevant
reranker = VertexAIRank(
project_id=PROJECT_ID,
location_id="global",
ranking_config="default_ranking_config",
title_field="source",
top_n=3
)
# Combine A and B into a single retrieval object
compression_retriever = ContextualCompressionRetriever(
base_compressor=reranker,
base_retriever=base_retriever
)
# 4. Setup LLM (Gemini 2.5 Flash)
# We use a low temperature (0.1) to reduce creativity and increase factual adherence.
llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.1)
# --- Execution Loop ---
user_query = "Who is Harry Potter?"
print(f"\nUser Query: {user_query}")
print("Retrieving and Reranking documents...")
# Retrieve the most relevant documents
top_docs = compression_retriever.invoke(user_query)
if not top_docs:
print("No relevant documents found.")
return
# Build the Context String
# We stitch the documents together, labeling them as Source 1, Source 2, etc.
context_str = "\n\n".join([f"Source {i+1}: {d.page_content}" for i, d in enumerate(top_docs)])
print(f"Found {len(top_docs)} relevant context chunks.")
# 5. The Grounded Prompt
template = """You are a helpful assistant. Answer the question strictly based on the provided context.
If the answer is not in the context, say "I don't know."
Context:
{context}
Question:
{question}
Answer:
"""
prompt = PromptTemplate(template=template, input_variables=["context", "question"])
# Create the chain: Prompt -> LLM
chain = prompt | llm
print("Generating Answer via Gemini 2.5 Flash...")
final_answer = chain.invoke({"context": context_str, "question": user_query})
print(f"\nFINAL ANSWER:\n{final_answer}")
if __name__ == "__main__":
main()
- 最終的なアプリケーションを実行します。
python end_to_end_rag.py
出力を理解する
このスクリプトを実行して、取得した未加工のチャンク(前の手順で確認したチャンク)と最終的な回答の違いを確認します。LLM はシンセサイザーとして機能します。Reranker から提供されたテキストの断片的な「チャンク」を読み取り、それらを一貫性のある人間が読める文に変換します。
これらのコンポーネントを連結することで、確率的な「推測」から決定論的なワークフローに移行します。Retriever が網を投げ、Reranker が最高の獲物を選択し、Generator が料理を作ります。
11. まとめ
おめでとうございます!基本的なベクトル検索をはるかに超える高度な RAG パイプラインを構築できました。
内容のまとめ
- スケーラブルなベクトル ストレージ用に pgvector を使用して Cloud SQL を構成しました。
- チャンク化戦略を比較して、データ準備が取得にどのように影響するかを理解しました。
- Vertex AI で再ランキングを実装して、結果の精度を向上させました。
- クエリ変換(HyDE、Step-back)を使用して、ユーザーの意図とデータを一致させました。
詳細
- RAG リファレンス アーキテクチャ: RAG 関連のリファレンス アーキテクチャ ガイドの一覧を確認します。
プロトタイプから製品版へ
このラボは、「Google Cloud でのプロダクション レディな AI の開発」学習プログラムの一部です。
- カリキュラム全体で、プロトタイプから本番環境への移行を支援します。
- ハッシュタグ #ProductionReadyAI を使用して進捗状況を共有します。