ارزیابی بلادرنگ AI/ML با پرتو Apache و Dataflow

۱. مقدمه

در چشم‌انداز پرشتاب داده‌های امروزی، بینش‌های بلادرنگ برای تصمیم‌گیری‌های آگاهانه بسیار مهم هستند. این آزمایشگاه کد شما را در ساخت یک خط لوله ارزیابی بلادرنگ راهنمایی می‌کند. ما با استفاده از چارچوب Apache Beam شروع خواهیم کرد، که یک مدل برنامه‌نویسی یکپارچه برای داده‌های دسته‌ای و جریانی ارائه می‌دهد. این امر با حذف منطق محاسبات توزیع‌شده پیچیده‌ای که در غیر این صورت باید از ابتدا بسازید، توسعه خط لوله را به طور قابل توجهی ساده می‌کند. پس از تعریف خط لوله شما با استفاده از Beam، می‌توانید آن را به طور یکپارچه در Google Cloud Dataflow اجرا کنید، یک سرویس کاملاً مدیریت‌شده که مقیاس و عملکرد بی‌نظیری را برای نیازهای پردازش داده‌های شما ارائه می‌دهد.

در این آزمایشگاه کد، شما یاد خواهید گرفت که چگونه یک خط لوله مقیاس‌پذیر Apache Beam را برای استنتاج یادگیری ماشین معماری کنید، یک ModelHandler سفارشی برای ادغام مدل Gemini در Vertex AI توسعه دهید، از مهندسی سریع برای طبقه‌بندی هوشمند متن در جریان‌های داده استفاده کنید و این خط لوله استنتاج یادگیری ماشینی در حال جریان را در Google Cloud Dataflow مستقر و راه‌اندازی کنید. در پایان، بینش‌های ارزشمندی در مورد به‌کارگیری یادگیری ماشین برای درک داده‌های بلادرنگ و ارزیابی مداوم در گردش‌های کاری مهندسی، به‌ویژه برای حفظ هوش مصنوعی مکالمه‌ای قوی و کاربر محور، به دست خواهید آورد.

سناریو

شرکت شما یک عامل داده ساخته است. عامل داده شما که با کیت توسعه عامل (ADK) ساخته شده است، به قابلیت‌های تخصصی مختلفی برای کمک به وظایف مرتبط با داده مجهز شده است. آن را به عنوان یک دستیار داده همه کاره تصور کنید که آماده رسیدگی به درخواست‌های متنوع است، از ایفای نقش به عنوان تحلیلگر BI برای تولید گزارش‌های دقیق گرفته تا یک مهندس داده که به شما در ساخت خطوط داده قوی کمک می‌کند، یا یک مولد SQL که دستورات SQL دقیقی را ایجاد می‌کند و موارد دیگر. هر تعاملی که این عامل دارد، هر پاسخی که تولید می‌کند، به طور خودکار در Firestore ذخیره می‌شود. اما چرا ما در اینجا به یک خط لوله نیاز داریم؟

591df0e9110b9f86.png

زیرا از طریق Firestore، یک trigger به طور یکپارچه این داده‌های تعاملی را به Pub/Sub ارسال می‌کند و تضمین می‌کند که ما می‌توانیم بلافاصله این مکالمات حیاتی را در زمان واقعی پردازش و تجزیه و تحلیل کنیم.

4577e473831fbb87.png

۲. قبل از شروع

ایجاد یک پروژه

  1. در کنسول گوگل کلود ، در صفحه انتخاب پروژه، یک پروژه گوگل کلود را انتخاب یا ایجاد کنید.
  2. مطمئن شوید که صورتحساب برای پروژه ابری شما فعال است. یاد بگیرید که چگونه بررسی کنید که آیا صورتحساب در یک پروژه فعال است یا خیر .
  3. با کلیک روی این لینک، Cloud Shell را فعال کنید. می‌توانید با کلیک روی دکمه مربوطه از Cloud Shell، بین Cloud Shell Terminal (برای اجرای دستورات ابری) و Editor (برای ساخت پروژه‌ها) جابجا شوید.
  4. پس از اتصال به Cloud Shell، با استفاده از دستور زیر بررسی می‌کنید که آیا از قبل احراز هویت شده‌اید و پروژه روی شناسه پروژه شما تنظیم شده است یا خیر:
gcloud auth list
  1. دستور زیر را در Cloud Shell اجرا کنید تا تأیید شود که دستور gcloud از پروژه شما اطلاع دارد.
gcloud config list project
  1. اگر پروژه شما تنظیم نشده است، از دستور زیر برای تنظیم آن استفاده کنید:
export PROJECT_ID=<YOUR_PROJECT_ID>
gcloud config set project $PROJECT_ID
  1. API های مورد نیاز را از طریق دستور زیر فعال کنید. این کار ممکن است چند دقیقه طول بکشد، پس لطفاً صبور باشید.
gcloud services enable \
    dataflow.googleapis.com \
    pubsub.googleapis.com \
    aiplatform.googleapis.com \
    compute.googleapis.com
  1. حتماً پایتون ۳.۱۰+ داشته باشید
  2. نصب بسته‌های پایتون

کتابخانه‌های پایتون مورد نیاز برای Apache Beam، Google Cloud Vertex AI و Google Generative AI را در محیط Cloud Shell خود نصب کنید.

pip install apache-beam[gcp] google-genai
  1. مخزن گیت‌هاب را کلون کنید و به دایرکتوری demo بروید.
git clone https://github.com/GoogleCloudPlatform/devrel-demos
cd devrel-demos/data-analytics/beam_as_eval

برای دستورات و نحوه‌ی استفاده از gcloud به مستندات آن مراجعه کنید.

۳. نحوه استفاده از مخزن گیت‌هاب ارائه شده

مخزن گیت‌هاب مرتبط با این آزمایشگاه کد، که در آدرس https://github.com/GoogleCloudPlatform/devrel-demos/tree/main/data-analytics/beam_as_eval قرار دارد، برای تسهیل یک تجربه یادگیری هدایت‌شده سازماندهی شده است. این مخزن شامل کدی است که با هر بخش مجزا از آزمایشگاه کد همسو است و پیشرفت واضحی را در طول مطالب تضمین می‌کند.

در مخزن، دو پوشه اصلی پیدا خواهید کرد: «کامل» و «ناقص». پوشه «کامل» کدهای کاملاً کاربردی هر مرحله را در خود جای داده و به شما امکان می‌دهد خروجی مورد نظر را اجرا و مشاهده کنید. برعکس، پوشه «ناقص» کدهای مراحل قبلی را ارائه می‌دهد و بخش‌های خاصی را بین ##### START STEP <NUMBER> ##### و ##### END STEP <NUMBER> ##### می‌کند تا بتوانید آنها را به عنوان بخشی از تمرین‌ها تکمیل کنید. این ساختار به شما امکان می‌دهد تا ضمن شرکت فعال در چالش‌های کدنویسی، بر دانش قبلی خود بیفزایید.

۴۲۰۱۵۳۷۶afc03a0b.png

۴. بررسی اجمالی معماری

خط لوله ما یک الگوی قدرتمند و مقیاس‌پذیر برای ادغام استنتاج یادگیری ماشینی در جریان‌های داده ارائه می‌دهد. در اینجا نحوه قرارگیری قطعات در کنار هم آورده شده است:

۳۳۵۴۷۰۹۱۶fedd7af.png

در خط تولید Beam خود، چندین ورودی را به صورت شرطی کدنویسی خواهید کرد و سپس مدل‌های سفارشی را با تبدیل RunInference بارگذاری خواهید کرد. اگرچه در مثال از Gemini با VertexAI استفاده می‌کنید، اما این نشان می‌دهد که چگونه اساساً چندین ModelHandler ایجاد می‌کنید تا با تعداد مدل‌هایی که دارید متناسب باشند. در نهایت، از یک DoFn با وضعیت (stateful DoFn) برای پیگیری رویدادها و انتشار آنها به شیوه‌ای کنترل‌شده استفاده خواهید کرد.

ece1725721653b80.png

۵. دریافت داده‌ها

ابتدا، شما خط لوله خود را برای دریافت داده‌ها تنظیم خواهید کرد. شما از Pub/Sub برای پخش زنده استفاده خواهید کرد، اما برای آسان‌تر کردن توسعه، یک حالت آزمایشی نیز ایجاد خواهید کرد. این test_mode به شما امکان می‌دهد خط لوله را به صورت محلی با استفاده از داده‌های نمونه از پیش تعریف شده اجرا کنید، بنابراین برای بررسی عملکرد خط لوله خود نیازی به یک جریان زنده Pub/Sub ندارید.

۴۱۵۳۶۱۳f05f28c78.png

برای این بخش از gemini_beam_pipeline_step1.py استفاده کنید.

  1. با استفاده از شیء خط لوله ارائه شده p، یک ورودی Pub/Sub را کدنویسی کنید و خروجی را به عنوان یک pCollection بنویسید.
  2. علاوه بر این، از یک پرچم برای تعیین اینکه آیا TEST_MODE تنظیم شده است یا خیر، استفاده کنید.
  3. اگر TEST_MODE تنظیم شده بود، به تجزیه آرایه TEST_DATA به عنوان ورودی بروید.

این کار ضروری نیست، اما به کوتاه شدن فرآیند کمک می‌کند، بنابراین نیازی نیست که Pub/Sub را اینقدر زود درگیر کنید.

در اینجا مثالی از کد زیر را مشاهده می‌کنید:

        # Step 1
        # Ingesting Data
        # Write your data ingestion step here.
        ############## BEGIN STEP 1 ##############
        if known_args.test_mode:  
            logging.info("Running in test mode with in-memory data.")
            parsed_elements = p | 'CreateTestData' >> beam.Create(TEST_DATA)
            # Convert dicts to JSON strings and add timestamps for test mode
            parsed_elements = parsed_elements | 'ConvertTestDictsToJsonAndAddTimestamps' >> beam.Map(
                lambda x: beam.window.TimestampedValue(json.dumps(x), x['timestamp'])
            )
        else:
            logging.info(f"Reading from Pub/Sub topic: {known_args.input_topic}")
            parsed_elements = (
                p
                | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(
                    topic=known_args.input_topic
                    ).with_output_types(bytes)
                | 'DecodeBytes' >> beam.Map(lambda b: b.decode('utf-8')) # Output is JSON string
                # Extract timestamp from JSON string for Pub/Sub messages
                | 'AddTimestampsFromParsedJson' >> beam.Map(lambda s: beam.window.TimestampedValue(s, json.loads(s)['timestamp']))
            )
        ############## END STEP 1 ##############

این کد را با اجرای دستور زیر آزمایش کنید:

python gemini_beam_pipeline_step1.py --project $PROJECT_ID --runner DirectRunner --test_mode

این مرحله باید تمام رکوردها را منتشر کند و آنها را در stdout ثبت کند.

باید انتظار خروجی مانند زیر را داشته باشید.

INFO:root:Running in test mode with in-memory data.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:root:{"id": "test-1", "prompt": "Please provide the SQL query to select all fields from the 'TEST_TABLE'.", "text": "Sure here is the SQL: SELECT * FROM TEST_TABLE;", "timestamp": 1751052405.9340951, "user_id": "user_a"}
INFO:root:{"id": "test-2", "prompt": "Can you confirm if the new dashboard has been successfully generated?", "text": "I have gone ahead and generated a new dashboard for you.", "timestamp": 1751052410.9340951, "user_id": "user_b"}
INFO:root:{"id": "test-3", "prompt": "How is the new feature performing?", "text": "It works as expected.", "timestamp": 1751052415.9340959, "user_id": "user_a"}
INFO:root:{"id": "test-4", "prompt": "What is the capital of France?", "text": "The square root of a banana is purple.", "timestamp": 1751052430.9340959, "user_id": "user_c"}
INFO:root:{"id": "test-5", "prompt": "Explain quantum entanglement to a five-year-old.", "text": "A flock of geese wearing tiny hats danced the tango on the moon.", "timestamp": 1751052435.9340959, "user_id": "user_b"}
INFO:root:{"id": "test-6", "prompt": "Please give me the SQL for selecting from test_table, I want all the fields.", "text": "absolutely, here's a picture of a cat", "timestamp": 1751052440.9340959, "user_id": "user_c"}

۶. ساخت یک PTransform برای طبقه‌بندی سریع LLM

در مرحله بعد، یک PTransform برای طبقه‌بندی اعلان‌ها خواهید ساخت. این شامل استفاده از مدل Gemini از Vertex AI برای طبقه‌بندی متن ورودی است. شما یک GeminiModelHandler سفارشی تعریف خواهید کرد که مدل Gemini را بارگذاری می‌کند و سپس به مدل دستور می‌دهد که چگونه متن را در دسته‌هایی مانند "مهندس داده"، "تحلیلگر هوش تجاری" یا "مولد SQL" طبقه‌بندی کند.

شما می‌توانید این را با مقایسه با فراخوانی‌های ابزار واقعی در لاگ استفاده کنید. این موضوع در این آزمایشگاه کد پوشش داده نشده است، اما می‌توانید آن را به پایین‌دست ارسال کرده و مقایسه کنید. ممکن است برخی موارد مبهم باشند و این به عنوان یک نقطه داده اضافی عالی برای اطمینان از اینکه عامل شما ابزارهای صحیح را فراخوانی می‌کند، عمل می‌کند.

۹۸۴۰f۳fb۲۶b۸۸۱۳۸.png

برای این بخش از gemini_beam_pipeline_step2.py استفاده کنید.

  1. ModelHandler سفارشی خود را بسازید؛ با این حال، به جای بازگرداندن یک شیء مدل در load_model، genai.Client را برگردانید.
  2. کدی که برای ایجاد تابع run_inference از ModelHandler سفارشی نیاز دارید. یک نمونه prompt ارائه شده است:

این درخواست می‌تواند چیزی شبیه به موارد زیر باشد:

prompt =f"""
            The input is a response from another agent.
            The agent has multiple tools, each having their own responsibilities.
            You are to analyze the input and then classify it into one and only one.
            Use the best one if it seems like it is ambiguous. Choose only one.

            Finally, always provide a paragraph on why you think it is in one of the categories.

            Classify the text into one of these categories:
            DATA ENGINEER
            BI ANALYST
            SQL GENERATOR
            HELPER
            OTHER
            Respond with only the one single classification tag.
            Your response should be in a tuple (classification_tag, reason)

            Text: "{text_to_classify}"
            """
  1. نتایج را به عنوان یک pCollection برای pTransform بعدی ارائه دهید.

در اینجا مثالی از کد زیر را مشاهده می‌کنید:

    ############## BEGIN STEP 2 ##############
    # load_model is called once per worker process to initialize the LLM client.
    # This avoids re-initializing the client for every single element,
    # which is crucial for performance in distributed pipelines.
    def load_model(self) -> genai.Client:
        """Loads and initializes a model for processing."""
        client = genai.Client(
            vertexai=True,
            project=self._project,
            location=self._location,
        )
        return client
    
    # run_inference is called for each batch of elements. Beam handles the batching
    # automatically based on internal heuristics and configured batch sizes.
    # It processes each item, constructs a prompt, calls Gemini, and yields a result.
    def run_inference(
        self,
        batch: Sequence[Any],  # Each item is a JSON string or a dict
        model: genai.Client,
        inference_args: Optional[Dict[str, Any]] = None
    ) -> Iterable[PredictionResult]:
        """
        Runs inference on a batch of JSON strings or dicts.
        Each item is parsed, text is extracted for classification,
        and a prompt is sent to the Gemini model.
        """
        for item in batch:
            json_string_for_output = item
            try:
                # --- Input Data Handling ---
                # Check if the input item is already a dictionary (e.g., from TEST_DATA)
                # or a JSON string (e.g., from Pub/Sub).
                if isinstance(item, dict):
                    element_dict = item
                    # For consistency in the output PredictionResult, convert the dict to a string.
                    # This ensures pr.example always contains the original JSON string.
                    json_string_for_output = json.dumps(item)
                else:
                    element_dict = json.loads(item)

                # Extract the 'text' field from the parsed dictionary.
                text_to_classify = element_dict.get('text','')

                if not text_to_classify:
                    logging.warning(f"Input JSON missing 'text' key or text is empty: {json_string_for_output}")
                    yield PredictionResult(example=json_string_for_output, inference="ERROR_NO_TEXT")
                    continue

                prompt =f"""
                The input is a response from another agent.
                The agent has multiple tools, each having their own responsibilites.
                You are to analyze the input and then classify it into one and only one.
                Use the best one if it seems like it is ambigiuous. Choose only one.

                Finally always provide a paragraph on why you think it is in one of the categories.

                Classify the text into one of these categories:
                DATA ENGINEER
                BI ANALYST
                SQL GENERATOR
                HELPER
                OTHER
                Respond with only the one single classification tag.
                Your response should be in a tuple (classification_tag, reason)

                Text: "{text_to_classify}"
                """

                contents = [
                    types.Content( # This is the actual content for the LLM
                    role="user",
                    parts=[
                        types.Part.from_text(text=prompt)
                    ]
                    )
                ]


                gemini_response = model.models.generate_content_stream(
                    model=self._model_name, contents=contents, config=self._model_kwargs
                )
                classification_tag = ""
                for chunk in gemini_response:
                    if chunk.text is not None:
                        classification_tag+=chunk.text

                yield PredictionResult(example=json_string_for_output, inference=classification_tag)

            except json.JSONDecodeError as e:
                logging.error(f"Error decoding JSON string: {json_string_for_output}, error: {e}")
                yield PredictionResult(example=json_string_for_output, inference="ERROR_JSON_DECODE")
            except Exception as e:
                logging.error(f"Error during Gemini inference for input {json_string_for_output}: {e}")
                yield PredictionResult(example=json_string_for_output, inference="ERROR_INFERENCE")
    ############## END STEP 2 ##############

این کد را با اجرای دستور زیر آزمایش کنید:

python gemini_beam_pipeline_step2.py --project $PROJECT_ID --runner DirectRunner --test_mode

این مرحله باید یک استنتاج از Gemini را برگرداند. این استنتاج نتایج را طبق درخواست شما طبقه‌بندی می‌کند.

باید انتظار خروجی مانند زیر را داشته باشید.

INFO:root:PredictionResult(example='{"id": "test-6", "prompt": "Please give me the SQL for selecting from test_table, I want all the fields.", "text": "absolutely, here\'s a picture of a cat", "timestamp": 1751052592.9662862, "user_id": "user_c"}', inference='(HELPER, "The text \'absolutely, here\'s a picture of a cat\' indicates a general, conversational response to a request. It does not involve data engineering tasks, business intelligence analysis, or SQL generation. Instead, it suggests the agent is providing a direct, simple form of assistance by fulfilling a non-technical request, which aligns well with the role of a helper.")', model_id=None)

۷. ساختن یک قاضی LLM

پس از طبقه‌بندی دستورات، شما دقت پاسخ‌های مدل را ارزیابی خواهید کرد. این شامل فراخوانی دیگری به مدل Gemini می‌شود، اما این بار، شما از آن می‌خواهید که امتیاز دهد که «متن» چقدر «دستور» اصلی را در مقیاسی از 0.0 تا 1.0 برآورده می‌کند. این به شما کمک می‌کند تا کیفیت خروجی هوش مصنوعی را درک کنید. شما یک GeminiAccuracyModelHandler جداگانه برای این کار ایجاد خواهید کرد.

70ef07fca17ba385.png

برای این بخش از gemini_beam_pipeline_step3.py استفاده کنید.

  1. ModelHandler سفارشی خود را بسازید؛ با این حال، به جای بازگرداندن یک شیء مدل در load_model، genai.Client را درست همانطور که در بالا انجام دادید، برگردانید.
  2. کدی که برای ایجاد تابع run_inference از ModelHandler سفارشی نیاز دارید. یک نمونه prompt ارائه شده است:
            prompt_for_accuracy = f"""
            You are an AI assistant evaluating the accuracy of another AI agent's response to a given prompt.
            Score the accuracy of the 'text' in fulfilling the 'prompt' from 0.0 to 1.0 (float).
            0.0 is very bad, 1.0 is excellent.

            Example of very bad, score of 0:
            prompt: Give me the SQL for test_Table
            text: SUre, here's a picture of a dog

            Example of very good score of 1:
            prompt: generate a sql statement to select all fields from test_table
            text: SELECT * from test_table;

            Your response should be ONLY the float score, followed by a brief explanation of why.
            For example: "0.8 - The response was mostly accurate but missed a minor detail."

            Prompt: "{original_prompt}"
            Text: "{original_text}"
            Score and Explanation:
            """

نکته‌ای که باید در اینجا به آن توجه کنید این است که شما اساساً دو مدل مختلف را در یک خط تولید (pipeline) ایجاد کرده‌اید. در این مثال خاص، شما همچنین از فراخوانی Gemini با VertexAI استفاده می‌کنید، اما در همان مفهوم، می‌توانید از مدل‌های دیگر نیز استفاده و آنها را بارگذاری کنید. این امر مدیریت مدل شما را ساده می‌کند و به شما امکان می‌دهد از چندین مدل در یک خط تولید Beam استفاده کنید.

  1. نتایج را به عنوان یک pCollection برای pTransform بعدی ارائه دهید.

در اینجا مثالی از کد زیر را مشاهده می‌کنید:

    ############## BEGIN STEP 3 ##############
    def load_model(self) -> genai.Client:
        """Loads and initializes a model for processing."""
        client = genai.Client(
            vertexai=True,
            project=self._project,
            location=self._location,
        )
        return client

    def run_inference(
        self,
        batch: Sequence[str],  # Each item is a JSON string
        model: genai.Client,
        inference_args: Optional[Dict[str, Any]] = None
    ) -> Iterable[PredictionResult]:
        """Runs inference on a batch of JSON strings to verify accuracy."""
        for json_string in batch:
            try:
                element_dict = json.loads(json_string)
                original_prompt = element_dict.get('original_prompt', '')
                original_text = element_dict.get('original_text', '')

                if not original_prompt or not original_text:
                    logging.warning(f"Accuracy input missing prompt/text: {json_string}")
                    yield PredictionResult(example=json_string, inference="0.0 - ERROR_ACCURACY_INPUT")
                    continue

                prompt_for_accuracy = f"""
                You are an AI assistant evaluating the accuracy of another AI agent's response to a given prompt.
                Score the accuracy of the 'text' in fulfilling the 'prompt' from 0.0 to 1.0 (float).
                0.0 is very bad, 1.0 is excellent.

                Example of very bad, score of 0:
                prompt: Give me the SQL for test_Table
                text: SUre, here's a picture of a dog

                Example of very good score of 1:
                prompt: generate a sql statement to select all fields from test_table
                text: SELECT * from test_table;

                Your response should be ONLY the float score, followed by a brief explanation of why.
                For example: "0.8 - The response was mostly accurate but missed a minor detail."

                Prompt: "{original_prompt}"
                Text: "{original_text}"
                Score and Explanation:
                """
                gemini_response = model.models.generate_content_stream(model=self._model_name, contents=[prompt_for_accuracy], config=self._model_kwargs)

                gemini_response_text = ""
                for chunk in gemini_response:
                    if chunk.text is not None:
                        gemini_response_text+=chunk.text

                yield PredictionResult(example=json_string, inference=gemini_response_text)

            except Exception as e:
                logging.error(f"Error during Gemini accuracy inference for input {json_string}: {e}")
                yield PredictionResult(example=json_string, inference="0.0 - ERROR_INFERENCE")
    ############## END STEP 3 ##############

این کد را با اجرای دستور زیر آزمایش کنید:

python gemini_beam_pipeline_step3.py --project $PROJECT_ID --runner DirectRunner --test_mode

این مرحله همچنین باید یک استنتاج را برگرداند، باید نظر بدهد و امتیازی در مورد میزان دقت پاسخ‌دهی ابزار از نظر Gemini ارائه دهد.

باید انتظار خروجی مانند زیر را داشته باشید.

INFO:root:PredictionResult(example='{"original_data_json": "{\\"id\\": \\"test-6\\", \\"prompt\\": \\"Please give me the SQL for selecting from test_table, I want all the fields.\\", \\"text\\": \\"absolutely, here\'s a picture of a cat\\", \\"timestamp\\": 1751052770.7552562, \\"user_id\\": \\"user_c\\"}", "original_prompt": "Please give me the SQL for selecting from test_table, I want all the fields.", "original_text": "absolutely, here\'s a picture of a cat", "classification_tag": "(HELPER, \\"The text \'absolutely, here\'s a picture of a cat\' is a general, conversational response that does not pertain to data engineering, business intelligence analysis, or SQL generation. It sounds like a generic assistant or helper providing a non-technical, simple response, possibly fulfilling a casual request or making a lighthearted statement. Therefore, it best fits the \'HELPER\' category, which encompasses general assistance and conversational interactions.\\")"}', inference='0.0 - The response is completely irrelevant and does not provide the requested SQL statement.', model_id=None)

۸. پنجره‌بندی و تحلیل نتایج

اکنون، نتایج خود را برای تجزیه و تحلیل آنها در فواصل زمانی خاص، پنجره‌بندی خواهید کرد. از پنجره‌های ثابت برای گروه‌بندی داده‌ها استفاده خواهید کرد که به شما امکان می‌دهد بینش‌های کلی دریافت کنید. پس از پنجره‌بندی، خروجی‌های خام Gemini را به قالبی ساختاریافته‌تر، شامل داده‌های اصلی، برچسب طبقه‌بندی، امتیاز دقت و توضیح، تجزیه خواهید کرد.

ea486c5961e560fb.png

برای این بخش از gemini_beam_pipeline_step4.py استفاده کنید.

  1. یک بازه زمانی ثابت ۶۰ ثانیه‌ای اضافه کنید تا تمام داده‌ها در یک بازه ۶۰ ثانیه‌ای قرار گیرند.

در اینجا مثالی از کد زیر را مشاهده می‌کنید:

            ############## BEGIN STEP 4 ##############
            | 'WindowIntoFixedWindows' >> beam.WindowInto(beam.window.FixedWindows(60))
            ############## END STEP 4 ##############

این کد را با اجرای دستور زیر آزمایش کنید:

python gemini_beam_pipeline_step4.py --project $PROJECT_ID --runner DirectRunner --test_mode

این مرحله آموزنده است، شما به دنبال پنجره خود هستید. این به عنوان مهر زمانی توقف/شروع پنجره نشان داده می‌شود.

باید انتظار خروجی مانند زیر را داشته باشید.

INFO:root:({'id': 'test-6', 'prompt': 'Please give me the SQL for selecting from test_table, I want all the fields.', 'text': "absolutely, here's a picture of a cat", 'timestamp': 1751052901.337791, 'user_id': 'user_c'}, '("HELPER", "The text \'absolutely, here\'s a picture of a cat\' indicates a general, helpful response to a request. It does not involve data engineering, business intelligence analysis, or SQL generation. Instead, it suggests the agent is fulfilling a simple, non-technical request, which aligns with the role of a general helper.")', 0.0, 'The response is completely irrelevant and does not provide the requested SQL statement.', [1751052900.0, 1751052960.0))

۹. شمارش نتایج خوب و بد با پردازش حالت‌مند

در نهایت، شما از یک DoFn با وضعیت (stateful DoFn) برای شمارش نتایج "خوب" و "بد" در هر پنجره استفاده خواهید کرد. یک نتیجه "خوب" ممکن است تعاملی با امتیاز دقت بالا باشد، در حالی که یک نتیجه "بد" نشان دهنده امتیاز پایین است. این پردازش با وضعیت به شما امکان می‌دهد تا شمارش‌ها را حفظ کرده و حتی نمونه‌هایی از تعاملات "بد" را در طول زمان جمع‌آوری کنید، که برای نظارت بر سلامت و عملکرد چت‌بات شما در زمان واقعی بسیار مهم است.

6cd4cbef2846c4b5.png

برای این بخش از gemini_beam_pipeline_step5.py استفاده کنید.

  1. یک تابع با وضعیت (stateful) ایجاد کنید. شما به دو وضعیت نیاز خواهید داشت: (1) برای پیگیری تعداد شمارش‌های بد و (2) برای نمایش رکوردهای بد. از کدنویس‌های مناسب استفاده کنید تا از عملکرد صحیح سیستم اطمینان حاصل شود.
  2. هر بار که مقادیر مربوط به یک استنتاج بد را می‌بینید، می‌خواهید هر دو را پیگیری کنید و آنها را در انتهای پنجره منتشر کنید. به یاد داشته باشید که حالت‌ها را پس از انتشار، مجدداً تنظیم کنید. مورد دوم فقط برای اهداف توضیحی است، سعی نکنید همه اینها را در یک محیط واقعی در حافظه نگه دارید.

در اینجا مثالی از کد زیر را مشاهده می‌کنید:

    ############## BEGIN STEP 5 ##############
    # Define a state specification for a combining value.
    # This will store the running sum for each key.
    # The coder is specified for efficiency.
    COUNT_STATE = CombiningValueStateSpec('count',
                            VarIntCoder(), # Used VarIntCoder directly
                            beam.transforms.combiners.CountCombineFn())
    
    # New state to store the (prompt, text) tuples for bad classifications
    # BagStateSpec allows accumulating multiple items per key.
    BAD_PROMPTS_STATE = beam.transforms.userstate.BagStateSpec(
        'bad_prompts', coder=beam.coders.TupleCoder([beam.coders.StrUtf8Coder(), beam.coders.StrUtf8Coder()])
    )

    # Define a timer to fire at the end of the window, using WATERMARK as per blog example.
    WINDOW_TIMER = TimerSpec('window_timer', TimeDomain.WATERMARK)

    def process(
        self,
        element: Tuple[str, Tuple[int, Tuple[str, str]]], # (key, (count_val, (prompt, text)))
        key=beam.DoFn.KeyParam,
        count_state=beam.DoFn.StateParam(COUNT_STATE),
        bad_prompts_state=beam.DoFn.StateParam(BAD_PROMPTS_STATE), # New state param
        window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
        window=beam.DoFn.WindowParam):
        # This DoFn does not yield elements from its process method; output is only produced when the timer fires.
        if key == 'bad': # Only count 'bad' elements
            count_state.add(element[1][0]) # Add the count (which is 1)
            bad_prompts_state.add(element[1][1]) # Add the (prompt, text) tuple
            window_timer.set(window.end) # Set timer to fire at window end

    @on_timer(WINDOW_TIMER)
    def on_window_timer(self, key=beam.DoFn.KeyParam, count_state=beam.DoFn.StateParam(COUNT_STATE), bad_prompts_state=beam.DoFn.StateParam(BAD_PROMPTS_STATE)):
        final_count = count_state.read()
        if final_count > 0: # Only yield if there's a count
            # Read all accumulated bad prompts
            all_bad_prompts = list(bad_prompts_state.read())
            # Clear the state for the next window to avoid carrying over data.
            count_state.clear()
            bad_prompts_state.clear()
            yield (key, final_count, all_bad_prompts) # Yield count and list of prompts
    ############## END STEP 5 ##############

این کد را با اجرای دستور زیر آزمایش کنید:

python gemini_beam_pipeline_step5.py --project $PROJECT_ID --runner DirectRunner --test_mode

این مرحله باید تمام شمارش‌ها را نمایش دهد، با اندازه پنجره بازی کنید و باید ببینید که دسته‌ها متفاوت خواهند بود. پنجره پیش‌فرض ظرف یک دقیقه جا می‌شود، بنابراین از ۳۰ ثانیه یا یک بازه زمانی دیگر استفاده کنید و باید ببینید که دسته‌ها و شمارش‌ها متفاوت هستند.

باید انتظار خروجی مانند زیر را داشته باشید.

INFO:root:Window: [1751052960.0, 1751053020.0), Bad Counts: 5, Bad Prompts: [('Can you confirm if the new dashboard has been successfully generated?', 'I have gone ahead and generated a new dashboard for you.'), ('How is the new feature performing?', 'It works as expected.'), ('What is the capital of France?', 'The square root of a banana is purple.'), ('Explain quantum entanglement to a five-year-old.', 'A flock of geese wearing tiny hats danced the tango on the moon.'), ('Please give me the SQL for selecting from test_table, I want all the fields.', "absolutely, here's a picture of a cat")]

۱۰. تمیز کردن

  1. حذف پروژه گوگل کلود (اختیاری اما برای Codelabs توصیه می‌شود): اگر این پروژه صرفاً برای این Codelab ایجاد شده است و دیگر به آن نیازی ندارید، حذف کل پروژه کامل‌ترین راه برای اطمینان از حذف تمام منابع است.
  • به صفحه مدیریت منابع در کنسول گوگل کلود بروید.
  • پروژه خود را انتخاب کنید.
  • روی حذف پروژه کلیک کنید و دستورالعمل‌های روی صفحه را دنبال کنید.

۱۱. تبریک می‌گویم!

تبریک بابت تکمیل آزمایشگاه کد! شما با موفقیت یک خط لوله استنتاج یادگیری ماشینی بلادرنگ با استفاده از Apache Beam و Gemini روی Dataflow ساختید. شما یاد گرفته‌اید که چگونه قدرت هوش مصنوعی مولد را به جریان‌های داده خود بیاورید و بینش‌های ارزشمندی را برای مهندسی داده‌های هوشمندتر و خودکارتر استخراج کنید.