ارزیابی بلادرنگ AI/ML با پرتو Apache و Dataflow

1. مقدمه

در چشم انداز داده های سریع امروزی، بینش های زمان واقعی برای تصمیم گیری آگاهانه بسیار مهم است. این آزمایشگاه کد شما را از طریق ایجاد یک خط لوله ارزیابی بلادرنگ راهنمایی می کند. ما با استفاده از چارچوب Apache Beam ، که یک مدل برنامه‌نویسی یکپارچه را برای داده‌های دسته‌ای و جریانی ارائه می‌دهد، شروع می‌کنیم. این به طور قابل توجهی توسعه خط لوله را با انتزاع کردن منطق پیچیده محاسباتی توزیع شده که در غیر این صورت باید از ابتدا بسازید، ساده می کند. هنگامی که خط لوله شما با استفاده از Beam تعریف شد، آن را به طور یکپارچه در Google Cloud Dataflow اجرا می کنید، یک سرویس کاملاً مدیریت شده که مقیاس و عملکرد بی نظیری را برای نیازهای پردازش داده شما ارائه می دهد.

در این نرم‌افزار، یاد می‌گیرید که چگونه یک خط لوله Apache Beam مقیاس‌پذیر برای استنتاج یادگیری ماشین طراحی کنید، یک ModelHandler سفارشی برای ادغام مدل Gemini Vertex AI ایجاد کنید، از مهندسی سریع برای طبقه‌بندی متن هوشمند در جریان‌های داده استفاده کنید، و این خط لوله استنتاج ML جریانی را در Google Cloud Data استقرار و اجرا کنید. در پایان، بینش‌های ارزشمندی در مورد استفاده از یادگیری ماشینی برای درک بی‌درنگ داده‌ها و ارزیابی مستمر در جریان‌های کاری مهندسی، به‌ویژه برای حفظ هوش مصنوعی محاوره‌ای قوی و کاربر محور به دست خواهید آورد.

سناریو

شرکت شما یک عامل داده ساخته است. عامل داده شما که با Agent Development Kit (ADK) ساخته شده است، مجهز به قابلیت های تخصصی مختلف برای کمک به وظایف مربوط به داده است. آن را به عنوان یک دستیار داده همه کاره، آماده رسیدگی به درخواست های مختلف، از عمل به عنوان یک تحلیلگر BI برای تولید گزارش های روشنگر، تا مهندس داده که به شما کمک می کند خطوط لوله داده قوی بسازید، یا یک SQL Generator که بیانیه های SQL دقیق را ایجاد می کند، و موارد دیگر را تصور کنید. هر تعاملی که این عامل دارد، هر پاسخی که ایجاد می کند، به طور خودکار در Firestore ذخیره می شود. اما چرا در اینجا به یک خط لوله نیاز داریم؟

591df0e9110b9f86.png

زیرا از Firestore، یک ماشه به طور یکپارچه این داده‌های تعامل را به Pub/Sub ارسال می‌کند و تضمین می‌کند که می‌توانیم فوراً این مکالمات مهم را در زمان واقعی پردازش و تجزیه و تحلیل کنیم.

4577e473831fbb87.png

2. قبل از شروع

یک پروژه ایجاد کنید

  1. در Google Cloud Console ، در صفحه انتخاب پروژه، یک پروژه Google Cloud را انتخاب یا ایجاد کنید.
  2. مطمئن شوید که صورتحساب برای پروژه Cloud شما فعال است. با نحوه بررسی فعال بودن صورت‌حساب در پروژه آشنا شوید.
  3. Cloud Shell را با کلیک بر روی این پیوند فعال کنید. می توانید با کلیک بر روی دکمه مربوطه از Cloud Shell بین Cloud Shell Terminal (برای اجرای دستورات ابری) و Editor (برای ساخت پروژه ها) جابه جا شوید.
  4. پس از اتصال به Cloud Shell، با استفاده از دستور زیر بررسی می‌کنید که قبلاً احراز هویت شده‌اید و پروژه به ID پروژه شما تنظیم شده است:
gcloud auth list
  1. دستور زیر را در Cloud Shell اجرا کنید تا تأیید کنید که دستور gcloud از پروژه شما اطلاع دارد.
gcloud config list project
  1. اگر پروژه شما تنظیم نشده است، از دستور زیر برای تنظیم آن استفاده کنید:
export PROJECT_ID=<YOUR_PROJECT_ID>
gcloud config set project $PROJECT_ID
  1. API های مورد نیاز را از طریق دستور زیر فعال کنید. این ممکن است چند دقیقه طول بکشد، پس لطفا صبور باشید.
gcloud services enable \
    dataflow.googleapis.com \
    pubsub.googleapis.com \
    aiplatform.googleapis.com \
    compute.googleapis.com
  1. حتما پایتون 3.10+ را داشته باشید
  2. بسته های پایتون را نصب کنید

کتابخانه های Python مورد نیاز را برای Apache Beam، Google Cloud Vertex AI و Google Generative AI در محیط Cloud Shell خود نصب کنید.

pip install apache-beam[gcp] google-genai
  1. مخزن github را کلون کنید و به دایرکتوری دمو بروید.
git clone https://github.com/GoogleCloudPlatform/devrel-demos
cd devrel-demos/data-analytics/beam_as_eval

برای دستورات و استفاده از gcloud به مستندات مراجعه کنید.

3. نحوه استفاده از مخزن ارائه شده Github

مخزن GitHub مرتبط با این آزمایشگاه کد، که در https://github.com/GoogleCloudPlatform/devrel-demos/tree/main/data-analytics/beam_as_eval یافت می‌شود، برای تسهیل تجربه یادگیری هدایت‌شده سازمان‌دهی شده است. این شامل کد اسکلتی است که با هر قسمت مجزای از Codelab همسو می شود و از پیشرفت واضح در مواد اطمینان می دهد.

در مخزن، دو پوشه اصلی را خواهید دید: "کامل" و "ناقص". پوشه "کامل" کد کاملاً کاربردی برای هر مرحله را در خود جای می دهد و به شما امکان می دهد خروجی مورد نظر را اجرا و مشاهده کنید. برعکس، پوشه "ناقص" کدی را از مراحل قبل ارائه می‌کند و بخش‌های خاصی را بین ##### START STEP <NUMBER> ##### و ##### END STEP <NUMBER> ##### علامت‌گذاری شده است تا به عنوان بخشی از تمرین‌ها تکمیل شود. این ساختار شما را قادر می‌سازد تا ضمن مشارکت فعال در چالش‌های کدنویسی، دانش قبلی را بسازید.

42015376afc03a0b.png

4. نمای کلی معماری

خط لوله ما یک الگوی قدرتمند و مقیاس پذیر برای ادغام استنتاج ML در جریان های داده ارائه می دهد. در اینجا نحوه قرار گرفتن قطعات با هم آمده است:

335470916fedd7af.png

در خط لوله Beam خود، در چندین ورودی به صورت مشروط کدنویسی می‌کنید و سپس مدل‌های سفارشی را با تبدیل RunInference بارگذاری می‌کنید. حتی اگر در مثال از Gemini با VertexAI استفاده می‌کنید، نشان می‌دهد که چگونه می‌توانید اساساً چندین ModelHandler را متناسب با تعداد مدل‌هایی که دارید ایجاد کنید. در نهایت، از یک DoFn حالت دار برای پیگیری رویدادها و انتشار آنها به صورت کنترل شده استفاده خواهید کرد.

ece1725721653b80.png

5. بلع داده ها

ابتدا، خط لوله خود را برای دریافت داده ها تنظیم می کنید. از Pub/Sub برای پخش هم‌زمان استفاده می‌کنید، اما برای آسان‌تر کردن توسعه، یک حالت آزمایشی نیز ایجاد خواهید کرد. این test_mode به شما امکان می‌دهد خط لوله را به صورت محلی با استفاده از داده‌های نمونه از پیش تعریف‌شده اجرا کنید، بنابراین برای دیدن اینکه آیا خط لوله شما کار می‌کند نیازی به پخش مستقیم Pub/Sub ندارید.

4153613f05f28c78.png

برای این بخش از gemini_beam_pipeline_step1.py استفاده کنید.

  1. با استفاده از شیء خط لوله ارائه شده p، یک ورودی Pub/Sub را کدگذاری کنید و خروجی را به صورت pCollection بنویسید.
  2. همچنین از یک پرچم برای تعیین اینکه آیا TEST_MODE تنظیم شده است استفاده کنید.
  3. اگر TEST_MODE تنظیم شده بود، سپس به تجزیه آرایه TEST_DATA به عنوان ورودی بروید.

این ضروری نیست، اما به کوتاه کردن روند کمک می کند، بنابراین نیازی به درگیر کردن Pub/Sub اینقدر زود هنگام ندارید.

در اینجا نمونه ای از کد زیر آمده است:

        # Step 1
        # Ingesting Data
        # Write your data ingestion step here.
        ############## BEGIN STEP 1 ##############
        if known_args.test_mode:  
            logging.info("Running in test mode with in-memory data.")
            parsed_elements = p | 'CreateTestData' >> beam.Create(TEST_DATA)
            # Convert dicts to JSON strings and add timestamps for test mode
            parsed_elements = parsed_elements | 'ConvertTestDictsToJsonAndAddTimestamps' >> beam.Map(
                lambda x: beam.window.TimestampedValue(json.dumps(x), x['timestamp'])
            )
        else:
            logging.info(f"Reading from Pub/Sub topic: {known_args.input_topic}")
            parsed_elements = (
                p
                | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(
                    topic=known_args.input_topic
                    ).with_output_types(bytes)
                | 'DecodeBytes' >> beam.Map(lambda b: b.decode('utf-8')) # Output is JSON string
                # Extract timestamp from JSON string for Pub/Sub messages
                | 'AddTimestampsFromParsedJson' >> beam.Map(lambda s: beam.window.TimestampedValue(s, json.loads(s)['timestamp']))
            )
        ############## END STEP 1 ##############

این کد را با اجرای:

python gemini_beam_pipeline_step1.py --project $PROJECT_ID --runner DirectRunner --test_mode

این مرحله باید همه رکوردها را منتشر کند و آنها را در stdout ثبت کند.

شما باید انتظار خروجی مانند زیر را داشته باشید.

INFO:root:Running in test mode with in-memory data.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:root:{"id": "test-1", "prompt": "Please provide the SQL query to select all fields from the 'TEST_TABLE'.", "text": "Sure here is the SQL: SELECT * FROM TEST_TABLE;", "timestamp": 1751052405.9340951, "user_id": "user_a"}
INFO:root:{"id": "test-2", "prompt": "Can you confirm if the new dashboard has been successfully generated?", "text": "I have gone ahead and generated a new dashboard for you.", "timestamp": 1751052410.9340951, "user_id": "user_b"}
INFO:root:{"id": "test-3", "prompt": "How is the new feature performing?", "text": "It works as expected.", "timestamp": 1751052415.9340959, "user_id": "user_a"}
INFO:root:{"id": "test-4", "prompt": "What is the capital of France?", "text": "The square root of a banana is purple.", "timestamp": 1751052430.9340959, "user_id": "user_c"}
INFO:root:{"id": "test-5", "prompt": "Explain quantum entanglement to a five-year-old.", "text": "A flock of geese wearing tiny hats danced the tango on the moon.", "timestamp": 1751052435.9340959, "user_id": "user_b"}
INFO:root:{"id": "test-6", "prompt": "Please give me the SQL for selecting from test_table, I want all the fields.", "text": "absolutely, here's a picture of a cat", "timestamp": 1751052440.9340959, "user_id": "user_c"}

6. ایجاد یک PTtransform برای LLM Prompt Classification

در مرحله بعد، یک PTtransform برای طبقه‌بندی درخواست‌ها می‌سازید. این شامل استفاده از مدل Gemini Vertex AI برای دسته بندی متن ورودی است. شما یک GeminiModelHandler سفارشی تعریف می‌کنید که مدل Gemini را بارگیری می‌کند و سپس به مدل آموزش می‌دهد که چگونه متن را به دسته‌هایی مانند "DATA ENGINEER"، "BI ANALYST" یا "SQL GENERATOR" طبقه‌بندی کند.

شما می توانید از این با مقایسه با تماس های واقعی ابزار در گزارش استفاده کنید. این در این نرم افزار کد پوشش داده نشده است، اما می توانید آن را به پایین دست ارسال کرده و مقایسه کنید. ممکن است مواردی مبهم باشند و این به عنوان یک نقطه داده اضافی عالی عمل می کند تا اطمینان حاصل شود که نماینده شما ابزارهای مناسب را فراخوانی می کند.

9840f3fb26b88138.png

برای این بخش از gemini_beam_pipeline_step2.py استفاده کنید.

  1. ModelHandler سفارشی خود را بسازید. با این حال، به جای برگرداندن یک شی مدل در load_model، genai.Client را برگردانید.
  2. برای ایجاد تابع run_inference از ModelHandler سفارشی به کد نیاز دارید. یک نمونه درخواست ارائه شده است:

اعلان می تواند چیزی به شرح زیر باشد:

prompt =f"""
            The input is a response from another agent.
            The agent has multiple tools, each having their own responsibilities.
            You are to analyze the input and then classify it into one and only one.
            Use the best one if it seems like it is ambiguous. Choose only one.

            Finally, always provide a paragraph on why you think it is in one of the categories.

            Classify the text into one of these categories:
            DATA ENGINEER
            BI ANALYST
            SQL GENERATOR
            HELPER
            OTHER
            Respond with only the one single classification tag.
            Your response should be in a tuple (classification_tag, reason)

            Text: "{text_to_classify}"
            """
  1. نتایج را به عنوان pCollection برای pTransform بعدی ارائه دهید.

در اینجا نمونه ای از کد زیر آمده است:

    ############## BEGIN STEP 2 ##############
    # load_model is called once per worker process to initialize the LLM client.
    # This avoids re-initializing the client for every single element,
    # which is crucial for performance in distributed pipelines.
    def load_model(self) -> genai.Client:
        """Loads and initializes a model for processing."""
        client = genai.Client(
            vertexai=True,
            project=self._project,
            location=self._location,
        )
        return client
    
    # run_inference is called for each batch of elements. Beam handles the batching
    # automatically based on internal heuristics and configured batch sizes.
    # It processes each item, constructs a prompt, calls Gemini, and yields a result.
    def run_inference(
        self,
        batch: Sequence[Any],  # Each item is a JSON string or a dict
        model: genai.Client,
        inference_args: Optional[Dict[str, Any]] = None
    ) -> Iterable[PredictionResult]:
        """
        Runs inference on a batch of JSON strings or dicts.
        Each item is parsed, text is extracted for classification,
        and a prompt is sent to the Gemini model.
        """
        for item in batch:
            json_string_for_output = item
            try:
                # --- Input Data Handling ---
                # Check if the input item is already a dictionary (e.g., from TEST_DATA)
                # or a JSON string (e.g., from Pub/Sub).
                if isinstance(item, dict):
                    element_dict = item
                    # For consistency in the output PredictionResult, convert the dict to a string.
                    # This ensures pr.example always contains the original JSON string.
                    json_string_for_output = json.dumps(item)
                else:
                    element_dict = json.loads(item)

                # Extract the 'text' field from the parsed dictionary.
                text_to_classify = element_dict.get('text','')

                if not text_to_classify:
                    logging.warning(f"Input JSON missing 'text' key or text is empty: {json_string_for_output}")
                    yield PredictionResult(example=json_string_for_output, inference="ERROR_NO_TEXT")
                    continue

                prompt =f"""
                The input is a response from another agent.
                The agent has multiple tools, each having their own responsibilites.
                You are to analyze the input and then classify it into one and only one.
                Use the best one if it seems like it is ambigiuous. Choose only one.

                Finally always provide a paragraph on why you think it is in one of the categories.

                Classify the text into one of these categories:
                DATA ENGINEER
                BI ANALYST
                SQL GENERATOR
                HELPER
                OTHER
                Respond with only the one single classification tag.
                Your response should be in a tuple (classification_tag, reason)

                Text: "{text_to_classify}"
                """

                contents = [
                    types.Content( # This is the actual content for the LLM
                    role="user",
                    parts=[
                        types.Part.from_text(text=prompt)
                    ]
                    )
                ]


                gemini_response = model.models.generate_content_stream(
                    model=self._model_name, contents=contents, config=self._model_kwargs
                )
                classification_tag = ""
                for chunk in gemini_response:
                    if chunk.text is not None:
                        classification_tag+=chunk.text

                yield PredictionResult(example=json_string_for_output, inference=classification_tag)

            except json.JSONDecodeError as e:
                logging.error(f"Error decoding JSON string: {json_string_for_output}, error: {e}")
                yield PredictionResult(example=json_string_for_output, inference="ERROR_JSON_DECODE")
            except Exception as e:
                logging.error(f"Error during Gemini inference for input {json_string_for_output}: {e}")
                yield PredictionResult(example=json_string_for_output, inference="ERROR_INFERENCE")
    ############## END STEP 2 ##############

این کد را با اجرای:

python gemini_beam_pipeline_step2.py --project $PROJECT_ID --runner DirectRunner --test_mode

این مرحله باید یک استنتاج از Gemini را برگرداند. نتایج را طبق درخواست شما طبقه بندی می کند.

شما باید انتظار خروجی مانند زیر را داشته باشید.

INFO:root:PredictionResult(example='{"id": "test-6", "prompt": "Please give me the SQL for selecting from test_table, I want all the fields.", "text": "absolutely, here\'s a picture of a cat", "timestamp": 1751052592.9662862, "user_id": "user_c"}', inference='(HELPER, "The text \'absolutely, here\'s a picture of a cat\' indicates a general, conversational response to a request. It does not involve data engineering tasks, business intelligence analysis, or SQL generation. Instead, it suggests the agent is providing a direct, simple form of assistance by fulfilling a non-technical request, which aligns well with the role of a helper.")', model_id=None)

7. ساخت LLM-به عنوان یک قاضی

پس از طبقه بندی اعلان ها، دقت پاسخ های مدل را ارزیابی خواهید کرد. این شامل تماس دیگری با مدل Gemini است، اما این بار، از آن می‌خواهید تا میزان پاسخگویی «متن» به «اعلان» اصلی را در مقیاسی از 0.0 تا 1.0 به ثمر برساند. این به شما کمک می کند کیفیت خروجی هوش مصنوعی را درک کنید. شما یک GeminiAccuracyModelHandler جداگانه برای این کار ایجاد خواهید کرد.

70ef07fca17ba385.png

برای این بخش از gemini_beam_pipeline_step3.py استفاده کنید.

  1. ModelHandler سفارشی خود را بسازید. با این حال، به جای برگرداندن یک شی مدل در load_model، genai.Client را همانطور که در بالا انجام دادید برگردانید.
  2. برای ایجاد تابع run_inference از ModelHandler سفارشی به کد نیاز دارید. یک نمونه درخواست ارائه شده است:
            prompt_for_accuracy = f"""
            You are an AI assistant evaluating the accuracy of another AI agent's response to a given prompt.
            Score the accuracy of the 'text' in fulfilling the 'prompt' from 0.0 to 1.0 (float).
            0.0 is very bad, 1.0 is excellent.

            Example of very bad, score of 0:
            prompt: Give me the SQL for test_Table
            text: SUre, here's a picture of a dog

            Example of very good score of 1:
            prompt: generate a sql statement to select all fields from test_table
            text: SELECT * from test_table;

            Your response should be ONLY the float score, followed by a brief explanation of why.
            For example: "0.8 - The response was mostly accurate but missed a minor detail."

            Prompt: "{original_prompt}"
            Text: "{original_text}"
            Score and Explanation:
            """

نکته ای که در اینجا باید به آن توجه کرد این است که شما اساساً دو مدل مختلف را در یک خط لوله ایجاد کرده اید. در این مثال خاص، شما همچنین از تماس Gemini با VertexAI استفاده می‌کنید، اما در همان مفهوم می‌توانید مدل‌های دیگر را استفاده و بارگذاری کنید. این امر مدیریت مدل شما را ساده می کند و به شما امکان می دهد از چندین مدل در یک خط لوله Beam استفاده کنید.

  1. نتایج را به عنوان pCollection برای pTransform بعدی ارائه دهید.

در اینجا نمونه ای از کد زیر آمده است:

    ############## BEGIN STEP 3 ##############
    def load_model(self) -> genai.Client:
        """Loads and initializes a model for processing."""
        client = genai.Client(
            vertexai=True,
            project=self._project,
            location=self._location,
        )
        return client

    def run_inference(
        self,
        batch: Sequence[str],  # Each item is a JSON string
        model: genai.Client,
        inference_args: Optional[Dict[str, Any]] = None
    ) -> Iterable[PredictionResult]:
        """Runs inference on a batch of JSON strings to verify accuracy."""
        for json_string in batch:
            try:
                element_dict = json.loads(json_string)
                original_prompt = element_dict.get('original_prompt', '')
                original_text = element_dict.get('original_text', '')

                if not original_prompt or not original_text:
                    logging.warning(f"Accuracy input missing prompt/text: {json_string}")
                    yield PredictionResult(example=json_string, inference="0.0 - ERROR_ACCURACY_INPUT")
                    continue

                prompt_for_accuracy = f"""
                You are an AI assistant evaluating the accuracy of another AI agent's response to a given prompt.
                Score the accuracy of the 'text' in fulfilling the 'prompt' from 0.0 to 1.0 (float).
                0.0 is very bad, 1.0 is excellent.

                Example of very bad, score of 0:
                prompt: Give me the SQL for test_Table
                text: SUre, here's a picture of a dog

                Example of very good score of 1:
                prompt: generate a sql statement to select all fields from test_table
                text: SELECT * from test_table;

                Your response should be ONLY the float score, followed by a brief explanation of why.
                For example: "0.8 - The response was mostly accurate but missed a minor detail."

                Prompt: "{original_prompt}"
                Text: "{original_text}"
                Score and Explanation:
                """
                gemini_response = model.models.generate_content_stream(model=self._model_name, contents=[prompt_for_accuracy], config=self._model_kwargs)

                gemini_response_text = ""
                for chunk in gemini_response:
                    if chunk.text is not None:
                        gemini_response_text+=chunk.text

                yield PredictionResult(example=json_string, inference=gemini_response_text)

            except Exception as e:
                logging.error(f"Error during Gemini accuracy inference for input {json_string}: {e}")
                yield PredictionResult(example=json_string, inference="0.0 - ERROR_INFERENCE")
    ############## END STEP 3 ##############

این کد را با اجرای:

python gemini_beam_pipeline_step3.py --project $PROJECT_ID --runner DirectRunner --test_mode

این مرحله همچنین باید یک استنباط را برگرداند، باید نظر دهد و امتیازی را در مورد اینکه Gemini فکر می‌کند ابزار دقیق پاسخ داده است، بازگرداند.

شما باید انتظار خروجی مانند زیر را داشته باشید.

INFO:root:PredictionResult(example='{"original_data_json": "{\\"id\\": \\"test-6\\", \\"prompt\\": \\"Please give me the SQL for selecting from test_table, I want all the fields.\\", \\"text\\": \\"absolutely, here\'s a picture of a cat\\", \\"timestamp\\": 1751052770.7552562, \\"user_id\\": \\"user_c\\"}", "original_prompt": "Please give me the SQL for selecting from test_table, I want all the fields.", "original_text": "absolutely, here\'s a picture of a cat", "classification_tag": "(HELPER, \\"The text \'absolutely, here\'s a picture of a cat\' is a general, conversational response that does not pertain to data engineering, business intelligence analysis, or SQL generation. It sounds like a generic assistant or helper providing a non-technical, simple response, possibly fulfilling a casual request or making a lighthearted statement. Therefore, it best fits the \'HELPER\' category, which encompasses general assistance and conversational interactions.\\")"}', inference='0.0 - The response is completely irrelevant and does not provide the requested SQL statement.', model_id=None)

8. پنجره و تجزیه و تحلیل نتایج

اکنون، نتایج خود را برای تجزیه و تحلیل آنها در بازه های زمانی مشخص، پنجره خواهید کرد. از پنجره‌های ثابت برای گروه‌بندی داده‌ها استفاده می‌کنید که به شما امکان می‌دهد بینش‌های کلی را دریافت کنید. پس از پنجره، خروجی‌های خام Gemini را در قالبی ساختاریافته‌تر، از جمله داده‌های اصلی، برچسب طبقه‌بندی، امتیاز دقت و توضیح تجزیه و تحلیل می‌کنید.

ea486c5961e560fb.png

برای این بخش از gemini_beam_pipeline_step4.py استفاده کنید.

  1. یک پنجره زمانی ثابت 60 ثانیه ای اضافه کنید تا همه داده ها در یک پنجره 60 ثانیه ای قرار بگیرند.

در اینجا نمونه ای از کد زیر آمده است:

            ############## BEGIN STEP 4 ##############
            | 'WindowIntoFixedWindows' >> beam.WindowInto(beam.window.FixedWindows(60))
            ############## END STEP 4 ##############

این کد را با اجرای:

python gemini_beam_pipeline_step4.py --project $PROJECT_ID --runner DirectRunner --test_mode

این مرحله آموزنده است، شما به دنبال پنجره خود هستید. این به عنوان مهر زمانی توقف/شروع پنجره نشان داده خواهد شد.

شما باید انتظار خروجی مانند زیر را داشته باشید.

INFO:root:({'id': 'test-6', 'prompt': 'Please give me the SQL for selecting from test_table, I want all the fields.', 'text': "absolutely, here's a picture of a cat", 'timestamp': 1751052901.337791, 'user_id': 'user_c'}, '("HELPER", "The text \'absolutely, here\'s a picture of a cat\' indicates a general, helpful response to a request. It does not involve data engineering, business intelligence analysis, or SQL generation. Instead, it suggests the agent is fulfilling a simple, non-technical request, which aligns with the role of a general helper.")', 0.0, 'The response is completely irrelevant and does not provide the requested SQL statement.', [1751052900.0, 1751052960.0))

9. شمارش نتایج خوب و بد با پردازش وضعیتی

در نهایت، از یک DoFn حالتی برای شمارش نتایج «خوب» و «بد» در هر پنجره استفاده خواهید کرد. یک نتیجه "خوب" ممکن است یک تعامل با نمره دقت بالا باشد، در حالی که یک نتیجه "بد" نشان دهنده نمره پایین است. این پردازش حالتی به شما امکان می‌دهد شمارش‌ها را حفظ کنید و حتی نمونه‌هایی از تعاملات «بد» را در طول زمان جمع‌آوری کنید، که برای نظارت بر سلامت و عملکرد ربات چت شما در زمان واقعی بسیار مهم است.

6cd4cbef2846c4b5.png

برای این بخش از gemini_beam_pipeline_step5.py استفاده کنید.

  1. یک تابع حالت ایجاد کنید. شما به دو حالت نیاز دارید: (1) برای پیگیری تعداد شمارش بد و (2) نگهداری سوابق بد برای نمایش. از کدگذارهای مناسب برای اطمینان از عملکرد سیستم استفاده کنید.
  2. هر بار که مقادیر یک استنتاج بد را می بینید، می خواهید هر دو را پیگیری کرده و آنها را در انتهای پنجره منتشر کنید. به یاد داشته باشید که حالت ها را پس از انتشار مجدد تنظیم کنید. مورد دوم فقط برای اهداف تصویری است، سعی نکنید همه اینها را در یک محیط واقعی در حافظه نگه دارید.

در اینجا نمونه ای از کد زیر آمده است:

    ############## BEGIN STEP 5 ##############
    # Define a state specification for a combining value.
    # This will store the running sum for each key.
    # The coder is specified for efficiency.
    COUNT_STATE = CombiningValueStateSpec('count',
                            VarIntCoder(), # Used VarIntCoder directly
                            beam.transforms.combiners.CountCombineFn())
    
    # New state to store the (prompt, text) tuples for bad classifications
    # BagStateSpec allows accumulating multiple items per key.
    BAD_PROMPTS_STATE = beam.transforms.userstate.BagStateSpec(
        'bad_prompts', coder=beam.coders.TupleCoder([beam.coders.StrUtf8Coder(), beam.coders.StrUtf8Coder()])
    )

    # Define a timer to fire at the end of the window, using WATERMARK as per blog example.
    WINDOW_TIMER = TimerSpec('window_timer', TimeDomain.WATERMARK)

    def process(
        self,
        element: Tuple[str, Tuple[int, Tuple[str, str]]], # (key, (count_val, (prompt, text)))
        key=beam.DoFn.KeyParam,
        count_state=beam.DoFn.StateParam(COUNT_STATE),
        bad_prompts_state=beam.DoFn.StateParam(BAD_PROMPTS_STATE), # New state param
        window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
        window=beam.DoFn.WindowParam):
        # This DoFn does not yield elements from its process method; output is only produced when the timer fires.
        if key == 'bad': # Only count 'bad' elements
            count_state.add(element[1][0]) # Add the count (which is 1)
            bad_prompts_state.add(element[1][1]) # Add the (prompt, text) tuple
            window_timer.set(window.end) # Set timer to fire at window end

    @on_timer(WINDOW_TIMER)
    def on_window_timer(self, key=beam.DoFn.KeyParam, count_state=beam.DoFn.StateParam(COUNT_STATE), bad_prompts_state=beam.DoFn.StateParam(BAD_PROMPTS_STATE)):
        final_count = count_state.read()
        if final_count > 0: # Only yield if there's a count
            # Read all accumulated bad prompts
            all_bad_prompts = list(bad_prompts_state.read())
            # Clear the state for the next window to avoid carrying over data.
            count_state.clear()
            bad_prompts_state.clear()
            yield (key, final_count, all_bad_prompts) # Yield count and list of prompts
    ############## END STEP 5 ##############

این کد را با اجرای:

python gemini_beam_pipeline_step5.py --project $PROJECT_ID --runner DirectRunner --test_mode

این مرحله باید همه شمارش ها را خروجی دهد، با اندازه پنجره بازی کنید و باید ببینید که دسته ها متفاوت خواهند بود. پنجره پیش‌فرض در عرض یک دقیقه جا می‌گیرد، بنابراین سعی کنید از 30 ثانیه یا بازه زمانی دیگری استفاده کنید و باید ببینید که دسته‌ها و تعداد آن‌ها متفاوت است.

شما باید انتظار خروجی مانند زیر را داشته باشید.

INFO:root:Window: [1751052960.0, 1751053020.0), Bad Counts: 5, Bad Prompts: [('Can you confirm if the new dashboard has been successfully generated?', 'I have gone ahead and generated a new dashboard for you.'), ('How is the new feature performing?', 'It works as expected.'), ('What is the capital of France?', 'The square root of a banana is purple.'), ('Explain quantum entanglement to a five-year-old.', 'A flock of geese wearing tiny hats danced the tango on the moon.'), ('Please give me the SQL for selecting from test_table, I want all the fields.', "absolutely, here's a picture of a cat")]

10. تمیز کردن

  1. پروژه Google Cloud را حذف کنید (اختیاری اما برای Codelabs توصیه می شود): اگر این پروژه فقط برای این کد لبه ایجاد شده است و دیگر به آن نیاز ندارید، حذف کل پروژه کامل ترین راه برای اطمینان از حذف همه منابع است.
  • به صفحه مدیریت منابع در Google Cloud Console بروید.
  • پروژه خود را انتخاب کنید
  • روی حذف پروژه کلیک کنید و دستورالعمل های روی صفحه را دنبال کنید.

11. تبریک!

برای تکمیل کد لبه تبریک می گویم! شما با موفقیت یک خط لوله استنتاج ML بلادرنگ با استفاده از Apache Beam و Gemini در Dataflow ساخته اید. شما یاد گرفته اید که چگونه قدرت هوش مصنوعی مولد را به جریان داده های خود بیاورید و بینش های ارزشمندی را برای مهندسی داده هوشمندتر و خودکارتر استخراج کنید.