고급 RAG 기술

1. 소개

개요

검색 증강 생성 (RAG)은 대규모 언어 모델 (LLM)의 대답을 외부 지식에 그라운딩하여 LLM의 대답을 향상합니다. 하지만 프로덕션 준비가 완료된 RAG 시스템을 빌드하려면 단순한 벡터 검색 이상의 작업이 필요합니다. 데이터가 수집되는 방식, 관련 결과가 순위가 매겨지는 방식, 사용자 쿼리가 처리되는 방식을 최적화해야 합니다.

이 포괄적인 실습에서는 PostgreSQL용 Cloud SQL (pgvector로 확장됨) 및 Vertex AI를 사용하여 강력한 RAG 애플리케이션을 빌드합니다. 다음 세 가지 고급 기법을 살펴봅니다.

  1. 청킹 전략: 텍스트를 분할하는 다양한 방법 (문자, 재귀, 토큰)이 검색 품질에 미치는 영향을 관찰합니다.
  2. 재순위 지정: Vertex AI Reranker를 구현하여 검색 결과를 개선하고 '중간에 길을 잃음' 문제를 해결합니다.
  3. 질문 변환: Gemini를 사용하여 HyDE (가상 문서 삽입) 및 Step-back Prompting과 같은 기법을 통해 사용자 질문을 최적화합니다.

실습할 내용

  • pgvector를 사용하여 PostgreSQL용 Cloud SQL 인스턴스를 설정합니다.
  • 여러 전략을 사용하여 텍스트를 청크로 나누고 Cloud SQL에 삽입을 저장하는 데이터 수집 파이프라인을 빌드합니다.
  • 시맨틱 검색을 실행하고 다양한 청킹 방법의 결과 품질을 비교합니다.
  • 재랭커를 통합하여 관련성을 기반으로 검색된 문서의 순서를 다시 지정합니다.
  • LLM 기반 쿼리 변환을 구현하여 모호하거나 복잡한 질문에 대한 검색을 개선합니다.

학습할 내용

  • Vertex AICloud SQL과 함께 LangChain을 사용하는 방법
  • 문자, 재귀, 토큰 텍스트 분할기의 영향
  • PostgreSQL에서 벡터 검색을 구현하는 방법
  • 재랭킹을 위해 ContextualCompressionRetriever를 사용하는 방법
  • HyDEStep-back Prompting을 구현하는 방법

2. 프로젝트 설정

Google 계정

아직 개인 Google 계정이 없다면 Google 계정을 만들어야 합니다.

직장 또는 학교 계정 대신 개인 계정을 사용합니다.

Google Cloud 콘솔에 로그인

개인 Google 계정을 사용하여 Google Cloud 콘솔에 로그인합니다.

결제 사용 설정

$5 Google Cloud 크레딧 사용 (선택사항)

이 워크숍을 진행하려면 크레딧이 있는 결제 계정이 필요합니다. 자체 결제를 사용하려는 경우 이 단계를 건너뛰어도 됩니다.

  1. 이 링크를 클릭하고 개인 Google 계정으로 로그인합니다. 다음과 같이 표시됩니다. 크레딧 페이지를 보려면 여기를 클릭하세요.
  2. 크레딧에 액세스하려면 여기를 클릭하세요 버튼을 클릭합니다. 그러면 결제 프로필을 설정하는 페이지로 이동합니다. 결제 프로필 설정 페이지
  3. 확인을 클릭합니다. 이제 Google Cloud Platform 평가판 결제 계정에 연결되었습니다. 결제 개요 스크린샷

개인 결제 계정 설정

Google Cloud 크레딧을 사용하여 결제를 설정한 경우 이 단계를 건너뛸 수 있습니다.

개인 결제 계정을 설정하려면 Cloud 콘솔에서 여기에서 결제를 사용 설정하세요.

참고 사항:

  • 이 실습을 완료하는 데 드는 Cloud 리소스 비용은 미화 1달러 미만입니다.
  • 이 실습이 끝나면 단계에 따라 리소스를 삭제하여 추가 요금이 발생하지 않도록 할 수 있습니다.
  • 신규 사용자는 미화$300 상당의 무료 체험판을 사용할 수 있습니다.

프로젝트 만들기(선택사항)

이 실습에 사용할 현재 프로젝트가 없는 경우 여기에서 새 프로젝트를 만드세요.

3. Cloud Shell 편집기 열기

  1. 이 링크를 클릭하여 Cloud Shell 편집기로 바로 이동합니다.
  2. 오늘 언제든지 승인하라는 메시지가 표시되면 승인을 클릭하여 계속합니다.클릭하여 Cloud Shell 승인
  3. 터미널이 화면 하단에 표시되지 않으면 다음을 실행하여 엽니다.
    • 보기를 클릭합니다.
    • 터미널을 클릭합니다.Cloud Shell 편집기에서 새 터미널 열기
  4. 터미널에서 다음 명령어를 사용하여 프로젝트를 설정합니다.
    gcloud config set project [PROJECT_ID]
    
    • 예:
      gcloud config set project lab-project-id-example
      
    • 프로젝트 ID가 기억나지 않는 경우 다음을 사용하여 모든 프로젝트 ID를 나열할 수 있습니다.
      gcloud projects list
      
      Cloud Shell 편집기 터미널에서 프로젝트 ID 설정
  5. 다음 메시지가 표시되어야 합니다.
    Updated property [core/project].
    

4. API 사용 설정

이 솔루션을 빌드하려면 Vertex AI, Cloud SQL, 재순위 지정 서비스에 대해 여러 Google Cloud API를 사용 설정해야 합니다.

  1. 터미널에서 API를 사용 설정합니다.
    gcloud services enable \
      aiplatform.googleapis.com \
      sqladmin.googleapis.com \
      cloudresourcemanager.googleapis.com \
      serviceusage.googleapis.com \
      discoveryengine.googleapis.com
    
    
    

API 소개

  • Vertex AI API (aiplatform.googleapis.com): Gemini를 사용하여 생성하고 Vertex AI Embeddings를 사용하여 텍스트를 벡터화할 수 있습니다.
  • Cloud SQL Admin API (sqladmin.googleapis.com): Cloud SQL 인스턴스를 프로그래매틱 방식으로 관리할 수 있습니다.
  • Discovery Engine API (discoveryengine.googleapis.com): Vertex AI Reranker 기능을 지원합니다.
  • Service Usage API (serviceusage.googleapis.com): 서비스 할당량을 확인하고 관리하는 데 필요합니다.

5. 가상 환경 만들기 및 종속 항목 설치

Python 프로젝트를 시작하기 전에 가상 환경을 만드는 것이 좋습니다. 이렇게 하면 프로젝트의 종속 항목이 격리되어 다른 프로젝트나 시스템의 전역 Python 패키지와의 충돌을 방지할 수 있습니다.

  1. rag-labs라는 폴더를 만들고 해당 폴더로 변경합니다. 터미널에서 다음 코드를 실행합니다.
    mkdir rag-labs && cd rag-labs
    
  2. 가상 환경 만들기 및 활성화
    uv venv --python 3.12
    source .venv/bin/activate
    
  3. 필요한 종속 항목이 포함된 requirements.txt 파일을 만듭니다. 터미널에서 다음 코드를 실행합니다.
    cloudshell edit requirements.txt
    
  4. 다음 최적화된 종속 항목을 requirements.txt에 붙여넣습니다. 이러한 버전은 충돌을 방지하고 설치 속도를 높이기 위해 고정됩니다.
    # Core LangChain & AI
    langchain-community==0.3.31
    langchain-google-vertexai==2.1.2
    langchain-google-community[vertexaisearch]==2.0.10
    
    # Google Cloud
    google-cloud-storage==2.19.0
    google-cloud-aiplatform[langchain]==1.130.0
    
    # Database
    cloud-sql-python-connector[pg8000]==1.19.0
    sqlalchemy==2.0.45
    pgvector==0.4.2
    
    # Utilities
    tiktoken==0.12.0
    python-dotenv==1.2.1
    requests==2.32.5
    
  5. 종속 항목 설치
    uv pip install -r requirements.txt
    

6. PostgreSQL용 Cloud SQL 설정

이 작업에서는 PostgreSQL용 Cloud SQL 인스턴스를 프로비저닝하고, 데이터베이스를 만들고, 벡터 검색을 위해 준비합니다.

Cloud SQL 구성 정의

  1. 구성을 저장할 .env 파일을 만듭니다. 터미널에서 다음 코드를 실행합니다.
    cloudshell edit .env
    
  2. 다음 구성을 .env에 붙여넣습니다.
    # Project Config
    PROJECT_ID="[YOUR_PROJECT_ID]"
    REGION="us-central1"
    
    # Database Config
    SQL_INSTANCE_NAME="rag-pg-instance-1"
    SQL_DATABASE_NAME="rag_harry_potter_db"
    SQL_USER="rag_user"
    SQL_PASSWORD="StrongPassword123!" 
    
    # RAG Config
    PGVECTOR_COLLECTION_NAME="rag_harry_potter"
    RANKING_LOCATION_ID="global"
    
    # Connection Name (Auto-generated in scripts usually, but useful to have)
    DB_INSTANCE_CONNECTION_NAME="${PROJECT_ID}:${REGION}:${SQL_INSTANCE_NAME}"
    
  3. [YOUR_PROJECT_ID]를 실제 Google Cloud 프로젝트 ID로 바꿉니다. (예: PROJECT_ID = "google-cloud-labs")
    프로젝트 ID가 기억나지 않으면 터미널에서 다음 명령어를 실행합니다. 모든 프로젝트와 ID의 목록이 표시됩니다.
    gcloud projects list
    
  4. 변수를 셸 세션에 로드합니다.
    source .env
    

인스턴스 및 데이터베이스 만들기

  1. PostgreSQL용 Cloud SQL 인스턴스를 만듭니다. 이 명령어는 이 실습에 적합한 작은 인스턴스를 만듭니다.
    gcloud sql instances create ${SQL_INSTANCE_NAME} \
      --database-version=POSTGRES_15 \
      --tier=db-g1-small \
      --region=${REGION} \
      --project=${PROJECT_ID}
    
  2. 인스턴스가 준비되면 데이터베이스를 만듭니다.
    gcloud sql databases create ${SQL_DATABASE_NAME} \
      --instance=${SQL_INSTANCE_NAME} \
      --project=${PROJECT_ID}
    
  3. 데이터베이스 사용자를 만듭니다.
    gcloud sql users create ${SQL_USER} \
      --instance=${SQL_INSTANCE_NAME} \
      --password=${SQL_PASSWORD} \
      --project=${PROJECT_ID}
    

pgvector 확장 프로그램 사용 설정

pgvector 확장 프로그램을 사용하면 PostgreSQL에서 벡터 임베딩을 저장하고 검색할 수 있습니다. 데이터베이스에서 명시적으로 사용 설정해야 합니다.

  1. enable_pgvector.py라는 스크립트를 만듭니다. 터미널에서 다음 코드를 실행합니다.
    cloudshell edit enable_pgvector.py
    
  2. 다음 코드를 enable_pgvector.py에 붙여넣습니다. 이 스크립트는 데이터베이스에 연결하고 CREATE EXTENSION IF NOT EXISTS vector;를 실행합니다.
    import os
    import sqlalchemy
    from google.cloud.sql.connector import Connector, IPTypes
    import logging
    from dotenv import load_dotenv
    
    load_dotenv()
    logging.basicConfig(level=logging.INFO)
    
    # Config
    project_id = os.getenv("PROJECT_ID")
    region = os.getenv("REGION")
    instance_name = os.getenv("SQL_INSTANCE_NAME")
    db_user = os.getenv("SQL_USER")
    db_pass = os.getenv("SQL_PASSWORD")
    db_name = os.getenv("SQL_DATABASE_NAME")
    instance_connection_name = f"{project_id}:{region}:{instance_name}"
    
    def getconn():
        with Connector() as connector:
            conn = connector.connect(
                instance_connection_name,
                "pg8000",
                user=db_user,
                password=db_pass,
                db=db_name,
                ip_type=IPTypes.PUBLIC,
            )
            return conn
    
    def enable_pgvector():
        pool = sqlalchemy.create_engine(
            "postgresql+pg8000://",
            creator=getconn,
        )
        with pool.connect() as db_conn:
            # Check if extension exists
            result = db_conn.execute(sqlalchemy.text("SELECT extname FROM pg_extension WHERE extname = 'vector';")).fetchone()
            if result:
                logging.info("pgvector extension is already enabled.")
            else:
                logging.info("Enabling pgvector extension...")
                db_conn.execute(sqlalchemy.text("CREATE EXTENSION IF NOT EXISTS vector;"))
                db_conn.commit()
                logging.info("pgvector extension enabled successfully.")
    
    if __name__ == "__main__":
        enable_pgvector()
    
  3. 스크립트를 실행합니다.
    python enable_pgvector.py
    

7. 1부: 청크 처리 전략

RAG 파이프라인의 첫 번째 단계는 문서를 LLM이 이해할 수 있는 형식(청크)으로 변환하는 것입니다.

LLM에는 컨텍스트 윈도우 제한 (한 번에 처리할 수 있는 텍스트 양)이 있습니다. 또한 특정 질문에 답하기 위해 50페이지 문서를 검색하면 정보가 희석됩니다. Google은 관련 정보를 분리하기 위해 문서를 더 작은 '청크'로 분할합니다.

하지만 텍스트를 분할하는 방법이 매우 중요합니다.

  • 문자 분할기: 문자 수를 기준으로 엄격하게 분할합니다. 이 방법은 빠르지만 위험합니다. 단어나 문장을 절반으로 잘라 의미를 파괴할 수 있습니다.
  • 재귀 분할기: 먼저 단락으로 분할한 다음 문장, 단어 순으로 분할합니다. 의미 단위가 함께 유지되도록 시도합니다.
  • 토큰 분할기: LLM의 자체 어휘 (토큰)를 기반으로 분할합니다. 이렇게 하면 청크가 컨텍스트 창에 완벽하게 맞지만 생성하는 데 계산 비용이 더 많이 들 수 있습니다.

이 섹션에서는 세 가지 전략을 모두 사용하여 동일한 데이터를 수집하고 이를 비교합니다.

수집 스크립트 만들기

해리포터 데이터 세트를 다운로드하고 Character, Recursive, Token 전략을 사용하여 분할하고 Cloud SQL의 세 가지 별도 테이블에 삽입을 업로드하는 스크립트를 사용합니다.

  1. ingest_data.py 파일 만들기
    cloudshell edit ingest_data.py
    
  2. 다음 수정된 코드를 ingest_data.py에 붙여넣습니다. 이 버전은 데이터 세트의 JSON 구조를 올바르게 파싱합니다.
    import os
    import json
    import logging
    import requests
    from typing import List, Dict, Any
    from dotenv import load_dotenv
    
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter, TokenTextSplitter
    from langchain.docstore.document import Document
    
    load_dotenv()
    logging.basicConfig(level=logging.INFO)
    
    # Configuration
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME")
    BOOKS_JSON_URL = "https://storage.googleapis.com/github-repo/generative-ai/gemini/reasoning-engine/sample_data/harry_potter_books.json"
    
    CHUNK_SIZE = 500
    CHUNK_OVERLAP = 50
    MAX_DOCS_TO_PROCESS = 10 
    
    # Database Connector
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def download_data():
        logging.info(f"Downloading data from {BOOKS_JSON_URL}...")
        response = requests.get(BOOKS_JSON_URL)
        return response.json()
    
    def prepare_chunks(json_data, strategy):
        documents = []
    
        # Iterate through the downloaded data
        for entry in json_data[:MAX_DOCS_TO_PROCESS]:
    
            # --- JSON PARSING LOGIC ---
            # The data structure nests content inside 'kwargs' -> 'page_content'
            if "kwargs" in entry and "page_content" in entry["kwargs"]:
                content = entry["kwargs"]["page_content"]
    
                # Extract metadata if available, ensuring it's a dict
                metadata = entry["kwargs"].get("metadata", {})
                if not isinstance(metadata, dict):
                    metadata = {"source": "unknown"}
    
                # Add the strategy to metadata for tracking
                metadata["strategy"] = strategy
            else:
                continue
    
            if not content:
                continue
    
            # Choose the splitter based on the strategy
            if strategy == "character":
                splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separator="\n")
            elif strategy == "token":
                splitter = TokenTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
            else: # default to recursive
                splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
    
            # Split the content into chunks
            chunks = splitter.split_text(content)
    
            # Create Document objects for each chunk
            for chunk in chunks:
                documents.append(Document(page_content=chunk, metadata=metadata))
    
        return documents
    
    def main():
        logging.info("Initializing Embeddings...")
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
    
        data = download_data()
        strategies = ["character", "recursive", "token"]
    
        # Connection string for PGVector (uses the getconn helper)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        for strategy in strategies:
            collection_name = f"{BASE_COLLECTION_NAME}_{strategy}"
            logging.info(f"--- Processing strategy: {strategy.upper()} ---")
            logging.info(f"Target Collection: {collection_name}")
    
            # Prepare documents with the specific strategy
            docs = prepare_chunks(data, strategy)
    
            if not docs:
                logging.warning(f"No documents generated for strategy {strategy}. Check data source.")
                continue
    
            logging.info(f"Generated {len(docs)} chunks. Uploading to Cloud SQL...")
    
            # Initialize the Vector Store
            store = PGVector(
                collection_name=collection_name,
                embedding_function=embeddings,
                connection_string=pg_conn_str,
                engine_args={"creator": getconn},
                pre_delete_collection=True # Clears old data for this collection before adding new
            )
    
            # Batch add documents
            store.add_documents(docs)
            logging.info(f"Successfully finished {strategy}.\n")
    
    if __name__ == "__main__":
        main()
    
  3. 수집 스크립트를 실행합니다. 이렇게 하면 데이터베이스가 세 개의 서로 다른 테이블 (컬렉션)로 채워집니다.
    python ingest_data.py
    

청크 결과 비교

이제 데이터가 로드되었으므로 세 컬렉션 모두에 대해 쿼리를 실행하여 청킹 전략이 결과에 미치는 영향을 확인해 보겠습니다.

  1. query_chunking.py 만들기:
    cloudshell edit query_chunking.py
    
  2. 다음 코드를 query_chunking.py에 붙여넣습니다.
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR) # Only show errors to keep output clean
    
    # Config
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME")
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        query = "Tell me about the Dursleys and their relationship with Harry Potter"
        print(f"\nQUERY: {query}\n" + "="*50)
    
        strategies = ["character", "recursive", "token"]
    
        for strategy in strategies:
            collection = f"{BASE_COLLECTION_NAME}_{strategy}"
            print(f"\nSTRATEGY: {strategy.upper()}")
    
            store = PGVector(
                collection_name=collection,
                embedding_function=embeddings,
                connection_string=pg_conn_str,
                engine_args={"creator": getconn}
            )
    
            results = store.similarity_search_with_score(query, k=2)
            for i, (doc, score) in enumerate(results):
                print(f"  Result {i+1} (Score: {score:.4f}): {doc.page_content[:150].replace(chr(10), ' ')}...")
    
    if __name__ == "__main__":
        main()
    
  3. 쿼리 스크립트를 실행합니다.
    python query_chunking.py
    

출력을 확인합니다.

문자 분할은 생각 중간에 문장을 끊을 수 있지만 재귀는 단락 경계를 존중하려고 합니다. 토큰 분할은 청크가 LLM 컨텍스트 윈도우에 완벽하게 맞도록 하지만 시맨틱 구조를 무시할 수 있습니다.

8. 2부: 재순위 지정

벡터 검색 (검색)은 압축된 수학적 표현 (임베딩)을 사용하므로 매우 빠릅니다. 재현율 (잠재적으로 관련성이 있는 모든 항목 찾기)을 보장하기 위해 광범위한 검색을 수행하지만 정밀도 (이러한 항목의 순위가 완벽하지 않을 수 있음)가 낮은 경우가 많습니다.

관련 문서가 결과 목록의 중간에 '사라지는' 경우가 많습니다. 상위 5개 결과에만 주의를 기울이는 LLM은 7번째에 있는 중요한 답변을 놓칠 수 있습니다.

재순위 지정은 두 번째 단계를 추가하여 이 문제를 해결합니다.

  1. 리트리버: 빠른 벡터 검색을 사용하여 더 큰 집합 (예: 상위 25개)을 가져옵니다.
  2. 리랭커: 전문 모델 (예: 교차 인코더)을 사용하여 질문과 문서 쌍의 전체 텍스트를 검사합니다. 속도는 느리지만 훨씬 더 정확합니다. 상위 25개에 다시 점수를 매기고 가장 좋은 3개를 반환합니다.

이 작업에서는 1부에서 만든 recursive 컬렉션을 검색하지만 이번에는 Vertex AI Reranker를 적용하여 결과를 개선합니다.

  1. query_reranking.py 만들기:
    cloudshell edit query_reranking.py
    
  2. 다음 코드를 붙여넣습니다. _recursive 컬렉션을 명시적으로 타겟팅하고 ContextualCompressionRetriever를 사용하는 방법을 참고하세요.
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    
    # Reranking Imports
    from langchain.retrievers import ContextualCompressionRetriever
    from langchain_google_community.vertex_rank import VertexAIRank
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR)
    
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    
    # IMPORTANT: Target the recursive collection created in ingest_data.py
    COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"
    RANKING_LOCATION = os.getenv("RANKING_LOCATION_ID")
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        print(f"Connecting to collection: {COLLECTION_NAME}")
        store = PGVector(
            collection_name=COLLECTION_NAME,
            embedding_function=embeddings,
            connection_string=pg_conn_str,
            engine_args={"creator": getconn}
        )
    
        query = "What are the Horcruxes?"
        print(f"QUERY: {query}\n")
    
        # 1. Base Retriever (Vector Search) - Fetch top 10
        base_retriever = store.as_retriever(search_kwargs={"k": 10})
    
        # 2. Reranker - Select top 3 from the 10
        reranker = VertexAIRank(
            project_id=PROJECT_ID,
            location_id=RANKING_LOCATION,
            ranking_config="default_ranking_config",
            title_field="source",
            top_n=3
        )
    
        compression_retriever = ContextualCompressionRetriever(
            base_compressor=reranker,
            base_retriever=base_retriever
        )
    
        # Execute
        try:
            reranked_docs = compression_retriever.invoke(query)
    
            if not reranked_docs:
                print("No documents returned. Check if the collection exists and is populated.")
    
            print(f"--- Top 3 Reranked Results ---")
            for i, doc in enumerate(reranked_docs):
                print(f"Result {i+1} (Score: {doc.metadata.get('relevance_score', 'N/A')}):")
                print(f"  {doc.page_content[:200]}...\n")
        except Exception as e:
            print(f"Error during reranking: {e}")
    
    if __name__ == "__main__":
        main()
    
  3. 재순위 지정 쿼리를 실행합니다.
    python query_reranking.py
    

관찰

원시 벡터 검색과 비교했을 때 관련성 점수가 더 높거나 순서가 다를 수 있습니다. 이렇게 하면 LLM이 최대한 정확한 컨텍스트를 수신할 수 있습니다.

9. 3부: 쿼리 변환

RAG에서 가장 큰 병목 현상은 사용자입니다. 사용자 질문은 모호하거나, 불완전하거나, 표현이 잘못된 경우가 많습니다. 쿼리 임베딩이 문서 임베딩과 수학적으로 일치하지 않으면 검색이 실패합니다.

쿼리 변환은 LLM을 사용하여 데이터베이스에 도달하기 에 쿼리를 다시 작성하거나 확장합니다. 다음 두 가지 기법을 구현합니다.

  • HyDE (가상 문서 삽입): 질문과 답변 간의 벡터 유사성은 답변과 가상 답변 간의 유사성보다 낮은 경우가 많습니다. HyDE는 LLM에 완벽한 답변을 환각하도록 요청하고, 이를 삽입하고, 환각과 유사한 문서를 검색합니다.
  • Step-back 프롬프팅: 사용자가 구체적인 질문을 하면 시스템이 더 넓은 맥락을 놓칠 수 있습니다. Step-back 프롬프트는 LLM에 구체적인 세부정보와 함께 기본적인 정보를 가져오기 위해 상위 수준의 추상적인 질문 ('이 가족의 역사는 뭐야?')을 생성하도록 요청합니다.
  1. query_transformation.py 만들기:
    cloudshell edit query_transformation.py
    
  2. 다음 코드를 붙여넣습니다.
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
    from langchain_community.vectorstores import PGVector
    from langchain_core.prompts import PromptTemplate
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR)
    
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def generate_hyde_doc(query, llm):
        prompt = PromptTemplate(
            input_variables=["question"],
            template="Write a concise, hypothetical answer to the question. Question: {question} Answer:"
        )
        chain = prompt | llm
        return chain.invoke({"question": query})
    
    def generate_step_back(query, llm):
        prompt = PromptTemplate(
            input_variables=["question"],
            template="Write a more general, abstract question that concepts in this question. Original: {question} Step-back:"
        )
        chain = prompt | llm
        return chain.invoke({"question": query})
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.5)
    
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
        store = PGVector(
            collection_name=COLLECTION_NAME,
            embedding_function=embeddings,
            connection_string=pg_conn_str,
            engine_args={"creator": getconn}
        )
        retriever = store.as_retriever(search_kwargs={"k": 2})
    
        original_query = "Tell me about the Dursleys."
        print(f"ORIGINAL QUERY: {original_query}\n" + "-"*30)
    
        # 1. HyDE
        hyde_doc = generate_hyde_doc(original_query, llm)
        print(f"HyDE Generated Doc: {hyde_doc.strip()[:100]}...")
        hyde_results = retriever.invoke(hyde_doc)
        print(f"HyDE Retrieval: {hyde_results[0].page_content[:100]}...\n")
    
        # 2. Step-back
        step_back_q = generate_step_back(original_query, llm)
        print(f"Step-back Query: {step_back_q.strip()}")
        step_results = retriever.invoke(step_back_q)
        print(f"Step-back Retrieval: {step_results[0].page_content[:100]}...")
    
    if __name__ == "__main__":
        main()
    
  3. 변환 스크립트를 실행합니다.
    python query_transformation.py
    

출력을 확인합니다.

Step-back 질문은 더즐리 가족사에 관한 광범위한 맥락을 가져오는 반면 HyDE는 가상 답변에서 생성된 구체적인 세부정보에 초점을 맞춥니다.

10. 4부: 엔드 투 엔드 생성

데이터를 잘라내고, 검색을 개선하고, 사용자의 쿼리를 다듬었습니다. 이제 RAG의 'G'인 생성을 살펴보겠습니다.

지금까지는 정보를 찾는 데에만 집중했습니다. 진정한 AI 어시스턴트를 빌드하려면 이러한 고품질의 재순위 지정된 문서를 LLM (Gemini)에 입력하여 자연어 답변을 합성해야 합니다.

프로덕션 파이프라인에서는 다음과 같은 특정 흐름이 포함됩니다.

  1. 검색: 광범위한 후보 집합을 가져옵니다 (예: 상위 10개)을 빠르게 찾을 수 있습니다.
  2. 재순위 지정: 절대적으로 가장 좋은 항목 (예: 상위 3개) Vertex AI Reranker 사용
  3. 컨텍스트 구성: 상위 3개 문서의 콘텐츠를 하나의 문자열로 연결합니다.
  4. 그라운딩된 프롬프트: LLM이 해당 정보 사용하도록 강제하는 엄격한 프롬프트 템플릿에 컨텍스트 문자열을 삽입합니다.

생성 스크립트 만들기

생성 단계에서는 gemini-2.5-flash을 사용합니다. 이 모델은 긴 컨텍스트 윈도우와 짧은 지연 시간을 갖추고 있어 검색된 여러 문서를 빠르게 처리할 수 있으므로 RAG에 적합합니다.

  1. end_to_end_rag.py 만들기:
cloudshell edit end_to_end_rag.py
  1. 다음 코드를 붙여넣습니다. template 변수에 유의하세요. 여기에 제공된 컨텍스트에 바인딩하여 모델에 '환각' (사실이 아닌 것을 사실인 것처럼 꾸며내는 것)을 피하도록 엄격하게 지시합니다.
import os
import logging
from dotenv import load_dotenv
from google.cloud.sql.connector import Connector, IPTypes
from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
from langchain_community.vectorstores import PGVector
from langchain.retrievers import ContextualCompressionRetriever
from langchain_google_community.vertex_rank import VertexAIRank
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

load_dotenv()
logging.basicConfig(level=logging.ERROR)

PROJECT_ID = os.getenv("PROJECT_ID")
REGION = os.getenv("REGION")
# We use the recursive collection as it generally provides the best context boundaries
COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"

def getconn():
    instance_conn = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    with Connector() as connector:
        return connector.connect(
            instance_conn, "pg8000",
            user=os.getenv("SQL_USER"), password=os.getenv("SQL_PASSWORD"),
            db=os.getenv("SQL_DATABASE_NAME"), ip_type=IPTypes.PUBLIC
        )

def main():
    print("--- Initializing Production RAG Pipeline ---")

    # 1. Setup Embeddings (Gemini Embedding 001)
    # We use this to vectorize the user's query to match our database.
    embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)

    # 2. Connect to Vector Store
    pg_conn_str = f"postgresql+pg8000://{os.getenv('SQL_USER')}:{os.getenv('SQL_PASSWORD')}@placeholder/{os.getenv('SQL_DATABASE_NAME')}"
    store = PGVector(
        collection_name=COLLECTION_NAME,
        embedding_function=embeddings,
        connection_string=pg_conn_str,
        engine_args={"creator": getconn}
    )

    # 3. Setup The 'Filter Funnel' (Retriever + Reranker)
    # Step A: Fast retrieval of top 10 similar documents
    base_retriever = store.as_retriever(search_kwargs={"k": 10})

    # Step B: Precise reranking to find the top 3 most relevant
    reranker = VertexAIRank(
        project_id=PROJECT_ID,
        location_id="global", 
        ranking_config="default_ranking_config",
        title_field="source",
        top_n=3
    )

    # Combine A and B into a single retrieval object
    compression_retriever = ContextualCompressionRetriever(
        base_compressor=reranker,
        base_retriever=base_retriever
    )

    # 4. Setup LLM (Gemini 2.5 Flash)
    # We use a low temperature (0.1) to reduce creativity and increase factual adherence.
    llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.1)

    # --- Execution Loop ---
    user_query = "Who is Harry Potter?"
    print(f"\nUser Query: {user_query}")
    print("Retrieving and Reranking documents...")

    # Retrieve the most relevant documents
    top_docs = compression_retriever.invoke(user_query)

    if not top_docs:
        print("No relevant documents found.")
        return

    # Build the Context String
    # We stitch the documents together, labeling them as Source 1, Source 2, etc.
    context_str = "\n\n".join([f"Source {i+1}: {d.page_content}" for i, d in enumerate(top_docs)])

    print(f"Found {len(top_docs)} relevant context chunks.")

    # 5. The Grounded Prompt
    template = """You are a helpful assistant. Answer the question strictly based on the provided context.
    If the answer is not in the context, say "I don't know."

    Context:
    {context}

    Question:
    {question}

    Answer:
    """

    prompt = PromptTemplate(template=template, input_variables=["context", "question"])

    # Create the chain: Prompt -> LLM
    chain = prompt | llm

    print("Generating Answer via Gemini 2.5 Flash...")
    final_answer = chain.invoke({"context": context_str, "question": user_query})

    print(f"\nFINAL ANSWER:\n{final_answer}")

if __name__ == "__main__":
    main()
  1. 최종 애플리케이션을 실행합니다.
python end_to_end_rag.py

출력 이해하기

이 스크립트를 실행하면 이전 단계에서 본 원시 검색된 청크와 최종 답변의 차이를 확인할 수 있습니다. LLM은 신시사이저 역할을 합니다. Reranker가 제공하는 텍스트의 조각난 '청크'를 읽고 이를 일관되고 사람이 읽을 수 있는 문장으로 매끄럽게 만듭니다.

이러한 구성요소를 연결하면 확률적 '추측'에서 결정적이고 근거가 있는 워크플로로 이동할 수 있습니다. 리트리버는 그물을 던지고, 리랭커는 최고의 물고기를 선택하고, 생성기는 식사를 준비합니다.

11. 결론

축하합니다. 기본 벡터 검색을 훨씬 뛰어넘는 고급 RAG 파이프라인을 빌드했습니다.

요약

  • 확장 가능한 벡터 스토리지를 위해 pgvector를 사용하여 Cloud SQL을 구성했습니다.
  • 청킹 전략을 비교하여 데이터 준비가 검색에 미치는 영향을 파악했습니다.
  • Vertex AI로 재순위 지정을 구현하여 결과의 정확도를 개선했습니다.
  • 질문 변환 (HyDE, Step-back)을 활용하여 사용자 의도를 데이터와 일치시켰습니다.

자세히 알아보기

프로토타입에서 프로덕션으로

이 실습은 Google Cloud를 사용한 프로덕션 레디 AI 학습 과정에 포함되어 있습니다.

  • 전체 커리큘럼 살펴보기를 통해 프로토타입에서 프로덕션으로 나아가는 데 필요한 지식을 쌓으세요.
  • #ProductionReadyAI 해시태그를 사용하여 진행 상황을 공유하세요.