เกี่ยวกับ Codelab นี้
1 บทนำ
ในสภาพแวดล้อมด้านข้อมูลที่รวดเร็วในปัจจุบัน ข้อมูลเชิงลึกแบบเรียลไทม์เป็นสิ่งสำคัญในการตัดสินใจอย่างรอบคอบ Codelab นี้จะแนะนำวิธีสร้างไปป์ไลน์การประเมินแบบเรียลไทม์ เราจะเริ่มต้นด้วยการใช้ประโยชน์จากเฟรมเวิร์ก Apache Beam ซึ่งมีโมเดลการเขียนโปรแกรมแบบรวมสำหรับทั้งข้อมูลแบบกลุ่มและแบบสตรีม ซึ่งจะช่วยลดความซับซ้อนในการพัฒนาไปป์ไลน์ได้อย่างมากด้วยการแยกตรรกะการประมวลผลแบบกระจายที่ซับซ้อนซึ่งคุณจะต้องสร้างขึ้นใหม่ตั้งแต่ต้น เมื่อกำหนดไปป์ไลน์โดยใช้ Beam แล้ว คุณจะเรียกใช้ไปป์ไลน์ดังกล่าวใน Google Cloud Dataflow ได้อย่างราบรื่น ซึ่งเป็นบริการที่มีการจัดการครบวงจรที่ให้ความสามารถในการปรับขนาดและประสิทธิภาพที่ไม่มีใครเทียบได้สำหรับความต้องการในการประมวลผลข้อมูล
ในโค้ดแล็บนี้ คุณจะได้เรียนรู้วิธีออกแบบไปป์ไลน์ Apache Beam ที่ปรับขนาดได้สำหรับการอนุมานแมชชีนเลิร์นนิง พัฒนา ModelHandler แบบกำหนดเองเพื่อผสานรวมโมเดล Gemini ของ Vertex AI ใช้ประโยชน์จากการออกแบบพรอมต์สำหรับการแยกประเภทข้อความอัจฉริยะในสตรีมข้อมูล รวมถึงติดตั้งใช้งานและดำเนินการไปป์ไลน์การอนุมาน ML แบบสตรีมมิงนี้ใน Google Cloud Dataflow เมื่อจบหลักสูตรนี้ คุณจะได้รับข้อมูลเชิงลึกที่มีคุณค่าเกี่ยวกับการใช้แมชชีนเลิร์นนิงเพื่อทำความเข้าใจข้อมูลแบบเรียลไทม์และการประเมินอย่างต่อเนื่องในเวิร์กโฟลว์ด้านวิศวกรรม โดยเฉพาะอย่างยิ่งการดูแล AI แบบสนทนาที่แข็งแกร่งและมุ่งเน้นผู้ใช้
สถานการณ์
บริษัทของคุณได้สร้างตัวแทนข้อมูล เอเจนต์ข้อมูลของคุณซึ่งสร้างขึ้นด้วยชุดพัฒนาเอเจนต์ (ADK) มีความสามารถเฉพาะทางต่างๆ เพื่อช่วยในงานที่เกี่ยวข้องกับข้อมูล ลองนึกภาพว่าเป็นผู้ช่วยด้านข้อมูลอเนกประสงค์ที่พร้อมจัดการคำขอที่หลากหลาย ตั้งแต่การทำหน้าที่เป็นนักวิเคราะห์ BI เพื่อสร้างรายงานเชิงลึก ไปจนถึงวิศวกรข้อมูลที่ช่วยคุณสร้างไปป์ไลน์ข้อมูลที่แข็งแกร่ง หรือเครื่องมือสร้าง SQL ที่สร้างคำสั่ง SQL ที่แม่นยำ และอื่นๆ อีกมากมาย การโต้ตอบทุกครั้งที่เอเจนต์นี้มี รวมถึงคำตอบทุกคำตอบที่เอเจนต์สร้างขึ้นจะได้รับการจัดเก็บไว้ใน Firestore โดยอัตโนมัติ แต่ทำไมเราจึงต้องมีไปป์ไลน์ที่นี่
เนื่องจากทริกเกอร์จาก Firestore จะส่งข้อมูลการโต้ตอบนี้ไปยัง Pub/Sub ได้อย่างราบรื่น จึงมั่นใจได้ว่าเราจะประมวลผลและวิเคราะห์การสนทนาที่สำคัญเหล่านี้ได้ทันทีแบบเรียลไทม์
2 ก่อนเริ่มต้น
สร้างโปรเจ็กต์
- ในคอนโซล Google Cloud ให้เลือกหรือสร้างโปรเจ็กต์ Google Cloud ในหน้าตัวเลือกโปรเจ็กต์
- ตรวจสอบว่าได้เปิดใช้การเรียกเก็บเงินสำหรับโปรเจ็กต์ Cloud แล้ว ดูวิธีตรวจสอบว่าได้เปิดใช้การเรียกเก็บเงินในโปรเจ็กต์แล้วหรือไม่
- เปิดใช้งาน Cloud Shell โดยคลิกลิงก์นี้ คุณสลับระหว่างเทอร์มินัล Cloud Shell (สําหรับเรียกใช้คําสั่งคลาวด์) กับ Editor (สําหรับสร้างโปรเจ็กต์) ได้โดยคลิกปุ่มที่เกี่ยวข้องจาก Cloud Shell
- เมื่อเชื่อมต่อกับ Cloud Shell แล้ว ให้ตรวจสอบว่าคุณได้รับการตรวจสอบสิทธิ์แล้วและตั้งค่าโปรเจ็กต์เป็นรหัสโปรเจ็กต์ของคุณโดยใช้คำสั่งต่อไปนี้
gcloud auth list
- เรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell เพื่อยืนยันว่าคำสั่ง gcloud รู้จักโปรเจ็กต์ของคุณ
gcloud config list project
- หากไม่ได้ตั้งค่าโปรเจ็กต์ ให้ใช้คำสั่งต่อไปนี้เพื่อตั้งค่า
export PROJECT_ID=<YOUR_PROJECT_ID>
gcloud config set project $PROJECT_ID
- เปิดใช้ API ที่จำเป็นผ่านคำสั่งที่แสดงด้านล่าง การดำเนินการนี้อาจใช้เวลาสักครู่ โปรดอดทนรอ
gcloud services enable \
dataflow.googleapis.com \
pubsub.googleapis.com \
aiplatform.googleapis.com \
compute.googleapis.com
- ตรวจสอบว่าคุณมี Python 3.10 ขึ้นไป
- ติดตั้งแพ็กเกจ Python
ติดตั้งไลบรารี Python ที่จำเป็นสำหรับ Apache Beam, Google Cloud Vertex AI และ Google Generative AI ในสภาพแวดล้อม Cloud Shell
pip install apache-beam[gcp] google-genai
- โคลนที่เก็บ GitHub แล้วเปลี่ยนไปที่ไดเรกทอรีตัวอย่าง
git clone https://github.com/GoogleCloudPlatform/devrel-demos
cd devrel-demos/data-analytics/beam_as_eval
โปรดดูคำสั่งและการใช้งาน gcloud ในเอกสารประกอบ
3 วิธีใช้ที่เก็บ GitHub ที่ระบุ
ที่เก็บ GitHub ที่เชื่อมโยงกับโค้ดแล็บนี้ ซึ่งอยู่ที่ https://github.com/GoogleCloudPlatform/devrel-demos/tree/main/data-analytics/beam_as_eval ได้รับการจัดระเบียบเพื่ออำนวยความสะดวกในการเรียนรู้แบบมีคำแนะนำ ซึ่งมีโค้ดโครงสร้างที่สอดคล้องกับแต่ละส่วนที่แตกต่างกันของ Codelab เพื่อให้มั่นใจว่าเนื้อหาจะมีความคืบหน้าที่ชัดเจน
ในที่เก็บ คุณจะเห็นโฟลเดอร์หลัก 2 โฟลเดอร์ ได้แก่ "สมบูรณ์" และ "ไม่สมบูรณ์" โฟลเดอร์ "complete" มีโค้ดที่ใช้งานได้อย่างสมบูรณ์สำหรับแต่ละขั้นตอน ซึ่งช่วยให้คุณรันและสังเกตเอาต์พุตที่ต้องการได้ ในทางกลับกัน โฟลเดอร์ "ไม่สมบูรณ์" จะมีโค้ดจากขั้นตอนก่อนหน้า โดยจะเว้นส่วนที่เฉพาะเจาะจงซึ่งทำเครื่องหมายไว้ระหว่าง ##### START STEP <NUMBER> #####
กับ ##### END STEP <NUMBER> #####
ไว้ให้คุณดำเนินการให้เสร็จสมบูรณ์เป็นส่วนหนึ่งของแบบฝึกหัด โครงสร้างนี้ช่วยให้คุณต่อยอดความรู้เดิมไปพร้อมๆ กับการมีส่วนร่วมในความท้าทายด้านการเขียนโค้ด
4 ภาพรวมสถาปัตยกรรม
ไปป์ไลน์ของเรามีรูปแบบที่มีประสิทธิภาพและปรับขนาดได้สำหรับการผสานรวมการอนุมาน ML เข้ากับสตรีมข้อมูล องค์ประกอบต่างๆ ทำงานร่วมกันดังนี้
ในไปป์ไลน์ Beam คุณจะเขียนโค้ดในอินพุตหลายรายการแบบมีเงื่อนไข จากนั้นโหลดโมเดลที่กำหนดเองด้วยการเปลี่ยน RunInference ที่พร้อมใช้งาน แม้ว่าคุณจะใช้ Gemini กับ Vertex AI ในตัวอย่าง แต่ก็แสดงให้เห็นว่าคุณจะสร้าง ModelHandler หลายรายการเพื่อให้เหมาะกับจำนวนโมเดลที่คุณมีได้อย่างไร สุดท้าย คุณจะใช้ DoFn แบบมีสถานะเพื่อติดตามเหตุการณ์และปล่อยเหตุการณ์เหล่านั้นในลักษณะที่ควบคุมได้
5 การนำเข้าข้อมูล
ก่อนอื่น คุณจะต้องตั้งค่าไปป์ไลน์เพื่อส่งผ่านข้อมูล คุณจะใช้ Pub/Sub สำหรับการสตรีมแบบเรียลไทม์ แต่เพื่อให้การพัฒนาเป็นไปได้ง่ายขึ้น คุณจะสร้างโหมดทดสอบด้วย test_mode
วิธีนี้ช่วยให้คุณเรียกใช้ไปป์ไลน์ได้ในเครื่องโดยใช้ข้อมูลตัวอย่างที่กำหนดไว้ล่วงหน้า คุณจึงไม่จำเป็นต้องมีสตรีม Pub/Sub แบบสดเพื่อดูว่าไปป์ไลน์ทำงานหรือไม่
สำหรับส่วนนี้ ให้ใช้ gemini_beam_pipeline_step1.py
- ใช้ออบเจ็กต์ไปป์ไลน์ p ที่ระบุเพื่อเขียนโค้ดอินพุต Pub/Sub และเขียนเอาต์พุตเป็น PCollection
- นอกจากนี้ ให้ใช้ Flag เพื่อพิจารณาว่าได้ตั้งค่า TEST_MODE หรือไม่
- หากตั้งค่า TEST_MODE ไว้ ให้เปลี่ยนไปแยกวิเคราะห์อาร์เรย์ TEST_DATA เป็นอินพุต
ขั้นตอนนี้ไม่จำเป็น แต่จะช่วยให้กระบวนการสั้นลง คุณจึงไม่จำเป็นต้องใช้ Pub/Sub ในช่วงแรกนี้
ดูตัวอย่างโค้ดได้ด้านล่างนี้
# Step 1
# Ingesting Data
# Write your data ingestion step here.
############## BEGIN STEP 1 ##############
if known_args.test_mode:
logging.info("Running in test mode with in-memory data.")
parsed_elements = p | 'CreateTestData' >> beam.Create(TEST_DATA)
# Convert dicts to JSON strings and add timestamps for test mode
parsed_elements = parsed_elements | 'ConvertTestDictsToJsonAndAddTimestamps' >> beam.Map(
lambda x: beam.window.TimestampedValue(json.dumps(x), x['timestamp'])
)
else:
logging.info(f"Reading from Pub/Sub topic: {known_args.input_topic}")
parsed_elements = (
p
| 'ReadFromPubSub' >> beam.io.ReadFromPubSub(
topic=known_args.input_topic
).with_output_types(bytes)
| 'DecodeBytes' >> beam.Map(lambda b: b.decode('utf-8')) # Output is JSON string
# Extract timestamp from JSON string for Pub/Sub messages
| 'AddTimestampsFromParsedJson' >> beam.Map(lambda s: beam.window.TimestampedValue(s, json.loads(s)['timestamp']))
)
############## END STEP 1 ##############
ทดสอบโค้ดนี้โดยการเรียกใช้
python gemini_beam_pipeline_step1.py --project $PROJECT_ID --runner DirectRunner --test_mode
ขั้นตอนนี้ควรส่งระเบียนทั้งหมดและบันทึกลงใน stdout
คุณจะเห็นเอาต์พุตคล้ายกับตัวอย่างต่อไปนี้
INFO:root:Running in test mode with in-memory data.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:root:{"id": "test-1", "prompt": "Please provide the SQL query to select all fields from the 'TEST_TABLE'.", "text": "Sure here is the SQL: SELECT * FROM TEST_TABLE;", "timestamp": 1751052405.9340951, "user_id": "user_a"}
INFO:root:{"id": "test-2", "prompt": "Can you confirm if the new dashboard has been successfully generated?", "text": "I have gone ahead and generated a new dashboard for you.", "timestamp": 1751052410.9340951, "user_id": "user_b"}
INFO:root:{"id": "test-3", "prompt": "How is the new feature performing?", "text": "It works as expected.", "timestamp": 1751052415.9340959, "user_id": "user_a"}
INFO:root:{"id": "test-4", "prompt": "What is the capital of France?", "text": "The square root of a banana is purple.", "timestamp": 1751052430.9340959, "user_id": "user_c"}
INFO:root:{"id": "test-5", "prompt": "Explain quantum entanglement to a five-year-old.", "text": "A flock of geese wearing tiny hats danced the tango on the moon.", "timestamp": 1751052435.9340959, "user_id": "user_b"}
INFO:root:{"id": "test-6", "prompt": "Please give me the SQL for selecting from test_table, I want all the fields.", "text": "absolutely, here's a picture of a cat", "timestamp": 1751052440.9340959, "user_id": "user_c"}
6 การสร้าง PTransform สำหรับการจัดประเภทพรอมต์ LLM
จากนั้นคุณจะสร้าง PTransform เพื่อจัดประเภทพรอมต์ ซึ่งเกี่ยวข้องกับการใช้โมเดล Gemini ของ Vertex AI เพื่อจัดหมวดหมู่ข้อความขาเข้า คุณจะกำหนด GeminiModelHandler
ที่กำหนดเองซึ่งโหลดโมเดล Gemini จากนั้นสั่งให้โมเดลจัดหมวดหมู่ข้อความเป็นหมวดหมู่ต่างๆ เช่น "วิศวกรข้อมูล" "นักวิเคราะห์ BI" หรือ "ตัวสร้าง SQL"
คุณจะใช้สิ่งนี้ได้โดยการเปรียบเทียบกับการเรียกใช้เครื่องมือจริงในบันทึก ซึ่งไม่ได้กล่าวถึงในโค้ดแล็บนี้ แต่คุณสามารถส่งไปยังสตรีมปลายทางและเปรียบเทียบได้ อาจมีบางอย่างที่คลุมเครือ และสิ่งนี้จะเป็นจุดข้อมูลเพิ่มเติมที่ยอดเยี่ยมเพื่อให้มั่นใจว่าเอเจนต์ของคุณเรียกใช้เครื่องมือที่ถูกต้อง
สำหรับส่วนนี้ ให้ใช้ gemini_beam_pipeline_step2.py
- สร้าง ModelHandler ที่กำหนดเอง แต่ให้ส่งคืน genai.Client แทนที่จะส่งคืนออบเจ็กต์โมเดลใน load_model
- โค้ดที่คุณจะต้องใช้เพื่อสร้างฟังก์ชัน run_inference ของ ModelHandler ที่กำหนดเอง ตัวอย่างพรอมต์มีดังนี้
พรอมต์อาจเป็นดังนี้
prompt =f"""
The input is a response from another agent.
The agent has multiple tools, each having their own responsibilities.
You are to analyze the input and then classify it into one and only one.
Use the best one if it seems like it is ambiguous. Choose only one.
Finally, always provide a paragraph on why you think it is in one of the categories.
Classify the text into one of these categories:
DATA ENGINEER
BI ANALYST
SQL GENERATOR
HELPER
OTHER
Respond with only the one single classification tag.
Your response should be in a tuple (classification_tag, reason)
Text: "{text_to_classify}"
"""
- แสดงผลลัพธ์เป็น PCollection สำหรับ PTransform ถัดไป
ดูตัวอย่างโค้ดได้ด้านล่างนี้
############## BEGIN STEP 2 ##############
# load_model is called once per worker process to initialize the LLM client.
# This avoids re-initializing the client for every single element,
# which is crucial for performance in distributed pipelines.
def load_model(self) -> genai.Client:
"""Loads and initializes a model for processing."""
client = genai.Client(
vertexai=True,
project=self._project,
location=self._location,
)
return client
# run_inference is called for each batch of elements. Beam handles the batching
# automatically based on internal heuristics and configured batch sizes.
# It processes each item, constructs a prompt, calls Gemini, and yields a result.
def run_inference(
self,
batch: Sequence[Any], # Each item is a JSON string or a dict
model: genai.Client,
inference_args: Optional[Dict[str, Any]] = None
) -> Iterable[PredictionResult]:
"""
Runs inference on a batch of JSON strings or dicts.
Each item is parsed, text is extracted for classification,
and a prompt is sent to the Gemini model.
"""
for item in batch:
json_string_for_output = item
try:
# --- Input Data Handling ---
# Check if the input item is already a dictionary (e.g., from TEST_DATA)
# or a JSON string (e.g., from Pub/Sub).
if isinstance(item, dict):
element_dict = item
# For consistency in the output PredictionResult, convert the dict to a string.
# This ensures pr.example always contains the original JSON string.
json_string_for_output = json.dumps(item)
else:
element_dict = json.loads(item)
# Extract the 'text' field from the parsed dictionary.
text_to_classify = element_dict.get('text','')
if not text_to_classify:
logging.warning(f"Input JSON missing 'text' key or text is empty: {json_string_for_output}")
yield PredictionResult(example=json_string_for_output, inference="ERROR_NO_TEXT")
continue
prompt =f"""
The input is a response from another agent.
The agent has multiple tools, each having their own responsibilites.
You are to analyze the input and then classify it into one and only one.
Use the best one if it seems like it is ambigiuous. Choose only one.
Finally always provide a paragraph on why you think it is in one of the categories.
Classify the text into one of these categories:
DATA ENGINEER
BI ANALYST
SQL GENERATOR
HELPER
OTHER
Respond with only the one single classification tag.
Your response should be in a tuple (classification_tag, reason)
Text: "{text_to_classify}"
"""
contents = [
types.Content( # This is the actual content for the LLM
role="user",
parts=[
types.Part.from_text(text=prompt)
]
)
]
gemini_response = model.models.generate_content_stream(
model=self._model_name, contents=contents, config=self._model_kwargs
)
classification_tag = ""
for chunk in gemini_response:
if chunk.text is not None:
classification_tag+=chunk.text
yield PredictionResult(example=json_string_for_output, inference=classification_tag)
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON string: {json_string_for_output}, error: {e}")
yield PredictionResult(example=json_string_for_output, inference="ERROR_JSON_DECODE")
except Exception as e:
logging.error(f"Error during Gemini inference for input {json_string_for_output}: {e}")
yield PredictionResult(example=json_string_for_output, inference="ERROR_INFERENCE")
############## END STEP 2 ##############
ทดสอบโค้ดนี้โดยการเรียกใช้
python gemini_beam_pipeline_step2.py --project $PROJECT_ID --runner DirectRunner --test_mode
ขั้นตอนนี้ควรแสดงการอนุมานจาก Gemini โดยจะจัดประเภทผลลัพธ์ตามที่พรอมต์ของคุณขอ
คุณจะเห็นเอาต์พุตคล้ายกับตัวอย่างต่อไปนี้
INFO:root:PredictionResult(example='{"id": "test-6", "prompt": "Please give me the SQL for selecting from test_table, I want all the fields.", "text": "absolutely, here\'s a picture of a cat", "timestamp": 1751052592.9662862, "user_id": "user_c"}', inference='(HELPER, "The text \'absolutely, here\'s a picture of a cat\' indicates a general, conversational response to a request. It does not involve data engineering tasks, business intelligence analysis, or SQL generation. Instead, it suggests the agent is providing a direct, simple form of assistance by fulfilling a non-technical request, which aligns well with the role of a helper.")', model_id=None)
7 การสร้าง LLM ในฐานะผู้พิพากษา
หลังจากจัดประเภทพรอมต์แล้ว คุณจะประเมินความถูกต้องของคำตอบของโมเดล ซึ่งเกี่ยวข้องกับการเรียกใช้โมเดล Gemini อีกครั้ง แต่คราวนี้คุณจะป้อนพรอมต์ให้โมเดลให้คะแนนว่า "ข้อความ" ตอบสนอง "พรอมต์" เดิมได้ดีเพียงใดในระดับ 0.0 ถึง 1.0 ซึ่งจะช่วยให้คุณเข้าใจคุณภาพของเอาต์พุตของ AI คุณจะสร้าง GeminiAccuracyModelHandler
แยกต่างหากสำหรับงานนี้
สำหรับส่วนนี้ ให้ใช้ gemini_beam_pipeline_step3.py
- สร้าง ModelHandler ที่กำหนดเอง แต่แทนที่จะส่งคืนออบเจ็กต์โมเดลใน load_model ให้ส่งคืน genai.Client เหมือนกับที่คุณทำด้านบน
- โค้ดที่คุณจะต้องใช้เพื่อสร้างฟังก์ชัน run_inference ของ ModelHandler ที่กำหนดเอง ตัวอย่างพรอมต์มีดังนี้
prompt_for_accuracy = f"""
You are an AI assistant evaluating the accuracy of another AI agent's response to a given prompt.
Score the accuracy of the 'text' in fulfilling the 'prompt' from 0.0 to 1.0 (float).
0.0 is very bad, 1.0 is excellent.
Example of very bad, score of 0:
prompt: Give me the SQL for test_Table
text: SUre, here's a picture of a dog
Example of very good score of 1:
prompt: generate a sql statement to select all fields from test_table
text: SELECT * from test_table;
Your response should be ONLY the float score, followed by a brief explanation of why.
For example: "0.8 - The response was mostly accurate but missed a minor detail."
Prompt: "{original_prompt}"
Text: "{original_text}"
Score and Explanation:
"""
สิ่งหนึ่งที่ควรทราบคือคุณได้สร้างโมเดล 2 แบบที่แตกต่างกันในไปป์ไลน์เดียวกัน ในตัวอย่างนี้ คุณยังใช้การเรียก Gemini กับ Vertex AI ด้วย แต่ในแนวคิดเดียวกันนี้ คุณสามารถเลือกใช้และโหลดโมเดลอื่นๆ ได้ ซึ่งจะช่วยลดความซับซ้อนในการจัดการโมเดลและช่วยให้คุณใช้โมเดลหลายรายการภายในไปป์ไลน์ Beam เดียวกันได้
- แสดงผลลัพธ์เป็น PCollection สำหรับ PTransform ถัดไป
ดูตัวอย่างโค้ดได้ด้านล่างนี้
############## BEGIN STEP 3 ##############
def load_model(self) -> genai.Client:
"""Loads and initializes a model for processing."""
client = genai.Client(
vertexai=True,
project=self._project,
location=self._location,
)
return client
def run_inference(
self,
batch: Sequence[str], # Each item is a JSON string
model: genai.Client,
inference_args: Optional[Dict[str, Any]] = None
) -> Iterable[PredictionResult]:
"""Runs inference on a batch of JSON strings to verify accuracy."""
for json_string in batch:
try:
element_dict = json.loads(json_string)
original_prompt = element_dict.get('original_prompt', '')
original_text = element_dict.get('original_text', '')
if not original_prompt or not original_text:
logging.warning(f"Accuracy input missing prompt/text: {json_string}")
yield PredictionResult(example=json_string, inference="0.0 - ERROR_ACCURACY_INPUT")
continue
prompt_for_accuracy = f"""
You are an AI assistant evaluating the accuracy of another AI agent's response to a given prompt.
Score the accuracy of the 'text' in fulfilling the 'prompt' from 0.0 to 1.0 (float).
0.0 is very bad, 1.0 is excellent.
Example of very bad, score of 0:
prompt: Give me the SQL for test_Table
text: SUre, here's a picture of a dog
Example of very good score of 1:
prompt: generate a sql statement to select all fields from test_table
text: SELECT * from test_table;
Your response should be ONLY the float score, followed by a brief explanation of why.
For example: "0.8 - The response was mostly accurate but missed a minor detail."
Prompt: "{original_prompt}"
Text: "{original_text}"
Score and Explanation:
"""
gemini_response = model.models.generate_content_stream(model=self._model_name, contents=[prompt_for_accuracy], config=self._model_kwargs)
gemini_response_text = ""
for chunk in gemini_response:
if chunk.text is not None:
gemini_response_text+=chunk.text
yield PredictionResult(example=json_string, inference=gemini_response_text)
except Exception as e:
logging.error(f"Error during Gemini accuracy inference for input {json_string}: {e}")
yield PredictionResult(example=json_string, inference="0.0 - ERROR_INFERENCE")
############## END STEP 3 ##############
ทดสอบโค้ดนี้โดยการเรียกใช้
python gemini_beam_pipeline_step3.py --project $PROJECT_ID --runner DirectRunner --test_mode
ขั้นตอนนี้ควรส่งคืนการอนุมานด้วย โดยควรแสดงความคิดเห็นและส่งคืนคะแนนเกี่ยวกับความแม่นยำที่ Gemini คิดว่าเครื่องมือตอบกลับ
คุณจะเห็นเอาต์พุตคล้ายกับตัวอย่างต่อไปนี้
INFO:root:PredictionResult(example='{"original_data_json": "{\\"id\\": \\"test-6\\", \\"prompt\\": \\"Please give me the SQL for selecting from test_table, I want all the fields.\\", \\"text\\": \\"absolutely, here\'s a picture of a cat\\", \\"timestamp\\": 1751052770.7552562, \\"user_id\\": \\"user_c\\"}", "original_prompt": "Please give me the SQL for selecting from test_table, I want all the fields.", "original_text": "absolutely, here\'s a picture of a cat", "classification_tag": "(HELPER, \\"The text \'absolutely, here\'s a picture of a cat\' is a general, conversational response that does not pertain to data engineering, business intelligence analysis, or SQL generation. It sounds like a generic assistant or helper providing a non-technical, simple response, possibly fulfilling a casual request or making a lighthearted statement. Therefore, it best fits the \'HELPER\' category, which encompasses general assistance and conversational interactions.\\")"}', inference='0.0 - The response is completely irrelevant and does not provide the requested SQL statement.', model_id=None)
8 การแบ่งหน้าต่างและการวิเคราะห์ผลลัพธ์
ตอนนี้คุณจะจัดกรอบผลลัพธ์เพื่อวิเคราะห์ในช่วงเวลาที่เฉพาะเจาะจง คุณจะใช้กรอบเวลาคงที่เพื่อจัดกลุ่มข้อมูล ซึ่งจะช่วยให้คุณได้รับข้อมูลเชิงลึกแบบรวม หลังจากจัดหน้าต่างแล้ว คุณจะแยกวิเคราะห์เอาต์พุตดิบจาก Gemini เป็นรูปแบบที่มีโครงสร้างมากขึ้น ซึ่งรวมถึงข้อมูลต้นฉบับ แท็กการจัดประเภท คะแนนความแม่นยำ และคำอธิบาย
สำหรับส่วนนี้ ให้ใช้ gemini_beam_pipeline_step4.py
- เพิ่มกรอบเวลาคงที่ 60 วินาทีเพื่อให้ระบบวางข้อมูลทั้งหมดไว้ในกรอบเวลา 60 วินาที
ดูตัวอย่างโค้ดได้ด้านล่างนี้
############## BEGIN STEP 4 ##############
| 'WindowIntoFixedWindows' >> beam.WindowInto(beam.window.FixedWindows(60))
############## END STEP 4 ##############
ทดสอบโค้ดนี้โดยการเรียกใช้
python gemini_beam_pipeline_step4.py --project $PROJECT_ID --runner DirectRunner --test_mode
ขั้นตอนนี้เป็นการให้ข้อมูล คุณกำลังมองหาหน้าต่าง ซึ่งจะแสดงเป็นการประทับเวลาหยุด/เริ่มหน้าต่าง
คุณจะเห็นเอาต์พุตคล้ายกับตัวอย่างต่อไปนี้
INFO:root:({'id': 'test-6', 'prompt': 'Please give me the SQL for selecting from test_table, I want all the fields.', 'text': "absolutely, here's a picture of a cat", 'timestamp': 1751052901.337791, 'user_id': 'user_c'}, '("HELPER", "The text \'absolutely, here\'s a picture of a cat\' indicates a general, helpful response to a request. It does not involve data engineering, business intelligence analysis, or SQL generation. Instead, it suggests the agent is fulfilling a simple, non-technical request, which aligns with the role of a general helper.")', 0.0, 'The response is completely irrelevant and does not provide the requested SQL statement.', [1751052900.0, 1751052960.0))
9 การนับผลลัพธ์ที่ดีและไม่ดีด้วยการประมวลผลแบบมีสถานะ
สุดท้าย คุณจะใช้ DoFn แบบมีสถานะเพื่อนับผลลัพธ์ "ดี" และ "ไม่ดี" ภายในแต่ละหน้าต่าง ผลลัพธ์ "ดี" อาจเป็นการโต้ตอบที่มีคะแนนความแม่นยำสูง ในขณะที่ผลลัพธ์ "ไม่ดี" จะบ่งบอกถึงคะแนนต่ำ การประมวลผลแบบมีสถานะนี้ช่วยให้คุณรักษาจำนวนและรวบรวมตัวอย่างการโต้ตอบที่ "ไม่ดี" เมื่อเวลาผ่านไปได้ ซึ่งเป็นสิ่งสำคัญในการตรวจสอบสถานะและประสิทธิภาพของแชทบอทแบบเรียลไทม์
สำหรับส่วนนี้ ให้ใช้ gemini_beam_pipeline_step5.py
- สร้างฟังก์ชันที่มีสถานะ คุณจะต้องมี 2 สถานะ ได้แก่ (1) เพื่อติดตามจำนวนการนับที่ไม่ถูกต้อง และ (2) เก็บระเบียนที่ไม่ถูกต้องไว้เพื่อแสดง ใช้โปรแกรมเมอร์ที่เหมาะสมเพื่อให้มั่นใจว่าระบบจะทำงานได้อย่างมีประสิทธิภาพ
- ทุกครั้งที่เห็นค่าสำหรับการอนุมานที่ไม่ดี คุณจะต้องติดตามทั้ง 2 ค่าและส่งค่าเหล่านั้นเมื่อสิ้นสุดหน้าต่าง อย่าลืมรีเซ็ตสถานะหลังจากส่งออก ส่วนอย่างหลังมีไว้เพื่อเป็นตัวอย่างเท่านั้น อย่าพยายามเก็บข้อมูลทั้งหมดนี้ไว้ในหน่วยความจำในสภาพแวดล้อมจริง
ดูตัวอย่างโค้ดได้ด้านล่างนี้
############## BEGIN STEP 5 ##############
# Define a state specification for a combining value.
# This will store the running sum for each key.
# The coder is specified for efficiency.
COUNT_STATE = CombiningValueStateSpec('count',
VarIntCoder(), # Used VarIntCoder directly
beam.transforms.combiners.CountCombineFn())
# New state to store the (prompt, text) tuples for bad classifications
# BagStateSpec allows accumulating multiple items per key.
BAD_PROMPTS_STATE = beam.transforms.userstate.BagStateSpec(
'bad_prompts', coder=beam.coders.TupleCoder([beam.coders.StrUtf8Coder(), beam.coders.StrUtf8Coder()])
)
# Define a timer to fire at the end of the window, using WATERMARK as per blog example.
WINDOW_TIMER = TimerSpec('window_timer', TimeDomain.WATERMARK)
def process(
self,
element: Tuple[str, Tuple[int, Tuple[str, str]]], # (key, (count_val, (prompt, text)))
key=beam.DoFn.KeyParam,
count_state=beam.DoFn.StateParam(COUNT_STATE),
bad_prompts_state=beam.DoFn.StateParam(BAD_PROMPTS_STATE), # New state param
window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
window=beam.DoFn.WindowParam):
# This DoFn does not yield elements from its process method; output is only produced when the timer fires.
if key == 'bad': # Only count 'bad' elements
count_state.add(element[1][0]) # Add the count (which is 1)
bad_prompts_state.add(element[1][1]) # Add the (prompt, text) tuple
window_timer.set(window.end) # Set timer to fire at window end
@on_timer(WINDOW_TIMER)
def on_window_timer(self, key=beam.DoFn.KeyParam, count_state=beam.DoFn.StateParam(COUNT_STATE), bad_prompts_state=beam.DoFn.StateParam(BAD_PROMPTS_STATE)):
final_count = count_state.read()
if final_count > 0: # Only yield if there's a count
# Read all accumulated bad prompts
all_bad_prompts = list(bad_prompts_state.read())
# Clear the state for the next window to avoid carrying over data.
count_state.clear()
bad_prompts_state.clear()
yield (key, final_count, all_bad_prompts) # Yield count and list of prompts
############## END STEP 5 ##############
ทดสอบโค้ดนี้โดยการเรียกใช้
python gemini_beam_pipeline_step5.py --project $PROJECT_ID --runner DirectRunner --test_mode
ขั้นตอนนี้ควรแสดงจำนวนทั้งหมด ลองใช้ขนาดของหน้าต่าง แล้วคุณจะเห็นว่าชุดข้อมูลจะแตกต่างกัน ช่วงเริ่มต้นจะพอดีภายใน 1 นาที ดังนั้นลองใช้ 30 วินาทีหรือกรอบเวลาอื่น แล้วคุณจะเห็นว่าชุดข้อมูลและจำนวนแตกต่างกัน
คุณจะเห็นเอาต์พุตคล้ายกับตัวอย่างต่อไปนี้
INFO:root:Window: [1751052960.0, 1751053020.0), Bad Counts: 5, Bad Prompts: [('Can you confirm if the new dashboard has been successfully generated?', 'I have gone ahead and generated a new dashboard for you.'), ('How is the new feature performing?', 'It works as expected.'), ('What is the capital of France?', 'The square root of a banana is purple.'), ('Explain quantum entanglement to a five-year-old.', 'A flock of geese wearing tiny hats danced the tango on the moon.'), ('Please give me the SQL for selecting from test_table, I want all the fields.', "absolutely, here's a picture of a cat")]
10 การล้างข้อมูล
- ลบโปรเจ็กต์ Google Cloud (ไม่บังคับแต่แนะนำสำหรับ Codelab): หากสร้างโปรเจ็กต์นี้ขึ้นมาเพื่อ Codelab นี้โดยเฉพาะและคุณไม่ต้องการใช้โปรเจ็กต์นี้อีกต่อไป การลบทั้งโปรเจ็กต์เป็นวิธีที่ละเอียดที่สุดในการตรวจสอบว่าได้นำทรัพยากรทั้งหมดออกแล้ว
- ไปที่ หน้าจัดการทรัพยากรใน Google Cloud Console
- เลือกโปรเจ็กต์
- คลิกลบโปรเจ็กต์ แล้วทำตามวิธีการบนหน้าจอ
11 ยินดีด้วย
ขอแสดงความยินดีที่ทำ Codelab เสร็จสมบูรณ์ คุณสร้างไปป์ไลน์การอนุมาน ML แบบเรียลไทม์โดยใช้ Apache Beam และ Gemini ใน Dataflow ได้สำเร็จแล้ว คุณได้เรียนรู้วิธีนำความสามารถของ Generative AI มาใช้กับสตรีมข้อมูลของคุณ เพื่อดึงข้อมูลเชิงลึกที่มีคุณค่าสำหรับการทำวิศวกรรมข้อมูลที่ชาญฉลาดและเป็นอัตโนมัติมากขึ้น