Оценка AI/ML в реальном времени с помощью Apache Beam и Dataflow

1. Введение

В сегодняшнем быстро меняющемся ландшафте данных, понимание в реальном времени имеет решающее значение для принятия обоснованных решений. Эта кодовая лаборатория проведет вас через создание конвейера оценки в реальном времени. Мы начнем с использования фреймворка Apache Beam , который предлагает унифицированную модель программирования как для пакетных, так и для потоковых данных. Это значительно упрощает разработку конвейера, абстрагируясь от сложной распределенной вычислительной логики, которую вам в противном случае пришлось бы создавать с нуля. После того, как ваш конвейер будет определен с помощью Beam, вы затем беспрепятственно запустите его в Google Cloud Dataflow , полностью управляемом сервисе, который обеспечивает непревзойденный масштаб и производительность для ваших потребностей в обработке данных.

В этой кодовой лаборатории вы узнаете, как спроектировать масштабируемый конвейер Apache Beam для вывода машинного обучения, разработать пользовательский ModelHandler для интеграции модели Gemini Vertex AI, использовать оперативную разработку для интеллектуальной классификации текста в потоках данных, а также развернуть и эксплуатировать этот потоковый конвейер вывода машинного обучения в Google Cloud Dataflow. К концу вы получите ценные знания о применении машинного обучения для понимания данных в реальном времени и непрерывной оценки в рабочих процессах проектирования, в частности, для поддержания надежного и ориентированного на пользователя разговорного ИИ.

Сценарий

Ваша компания создала агента данных. Ваш агент данных, созданный с помощью Agent Development Kit (ADK), оснащен различными специализированными возможностями для помощи в задачах, связанных с данными. Представьте себе его как универсального помощника по данным, готового обрабатывать разнообразные запросы, от работы в качестве аналитика BI для создания содержательных отчетов до инженера данных, помогающего вам создавать надежные конвейеры данных, или генератора SQL, создающего точные SQL-выражения, и многого другого. Каждое взаимодействие этого агента, каждый ответ, который он генерирует, автоматически сохраняется в Firestore. Но зачем нам здесь конвейер?

591df0e9110b9f86.png

Потому что из Firestore триггер беспрепятственно отправляет эти данные о взаимодействии в Pub/Sub, гарантируя, что мы можем немедленно обрабатывать и анализировать эти критически важные разговоры в режиме реального времени.

4577e473831fbb87.png

2. Прежде чем начать

Создать проект

  1. В Google Cloud Console на странице выбора проекта выберите или создайте проект Google Cloud.
  2. Убедитесь, что для вашего проекта Cloud включена функция выставления счетов. Узнайте, как проверить, включена ли функция выставления счетов для проекта .
  3. Активируйте Cloud Shell, нажав на эту ссылку . Вы можете переключаться между Cloud Shell Terminal (для запуска облачных команд) и Editor (для создания проектов), нажав на соответствующую кнопку в Cloud Shell.
  4. После подключения к Cloud Shell вы проверяете, что вы уже аутентифицированы и что проекту присвоен ваш идентификатор проекта, с помощью следующей команды:
gcloud auth list
  1. Выполните следующую команду в Cloud Shell, чтобы подтвердить, что команда gcloud знает о вашем проекте.
gcloud config list project
  1. Если ваш проект не настроен, используйте следующую команду для его настройки:
export PROJECT_ID=<YOUR_PROJECT_ID>
gcloud config set project $PROJECT_ID
  1. Включите требуемые API с помощью команды, показанной ниже. Это может занять несколько минут, поэтому, пожалуйста, будьте терпеливы.
gcloud services enable \
    dataflow.googleapis.com \
    pubsub.googleapis.com \
    aiplatform.googleapis.com \
    compute.googleapis.com
  1. Убедитесь, что у вас установлен Python 3.10+.
  2. Установить пакеты Python

Установите необходимые библиотеки Python для Apache Beam, Google Cloud Vertex AI и Google Generative AI в вашей среде Cloud Shell.

pip install apache-beam[gcp] google-genai
  1. Клонируйте репозиторий github и переключитесь в каталог demo.
git clone https://github.com/GoogleCloudPlatform/devrel-demos
cd devrel-demos/data-analytics/beam_as_eval

Информацию о командах и использовании gcloud см. в документации .

3. Как использовать предоставленный репозиторий Github

Репозиторий GitHub, связанный с этой codelab, который находится по адресу https://github.com/GoogleCloudPlatform/devrel-demos/tree/main/data-analytics/beam_as_eval , организован для облегчения процесса обучения. Он содержит скелет кода, который соответствует каждой отдельной части codelab, обеспечивая четкое продвижение по материалу.

В репозитории вы обнаружите две основные папки: "complete" и "incomplete". В папке "complete" находится полностью функциональный код для каждого шага, что позволяет вам запускать и наблюдать предполагаемый вывод. Наоборот, папка "incomplete" предоставляет код из предыдущих шагов, оставляя определенные разделы, отмеченные между ##### START STEP <NUMBER> ##### и ##### END STEP <NUMBER> ##### для вас, чтобы завершить в рамках упражнений. Эта структура позволяет вам опираться на предыдущие знания, активно участвуя в задачах по кодированию.

42015376afc03a0b.png

4. Архитектурный обзор

Наш конвейер обеспечивает мощный и масштабируемый шаблон для интеграции вывода ML в потоки данных. Вот как части соединяются вместе:

335470916fedd7af.png

В вашем конвейере Beam вы будете условно кодировать несколько входов, а затем загружать пользовательские модели с помощью готового преобразования RunInference . Несмотря на то, что в этом примере вы используете Gemini с VertexAI, он демонстрирует, как вы по сути создаете несколько ModelHandlers, чтобы соответствовать количеству имеющихся у вас моделей. Наконец, вы будете использовать DoFn с сохранением состояния для отслеживания событий и их контролируемой отправки.

ece1725721653b80.png

5. Прием данных

Сначала вы настроите свой конвейер для приема данных. Вы будете использовать Pub/Sub для потоковой передачи в реальном времени, но чтобы упростить разработку, вы также создадите тестовый режим. Этот test_mode позволяет вам запускать конвейер локально, используя предопределенные образцы данных, поэтому вам не нужен живой поток Pub/Sub, чтобы проверить, работает ли ваш конвейер.

4153613f05f28c78.png

Для этого раздела используйте gemini_beam_pipeline_step1.py .

  1. Используя предоставленный объект конвейера p, закодируйте вход Pub/Sub и запишите выход как pCollection.
  2. Дополнительно используйте флаг, чтобы определить, установлен ли TEST_MODE.
  3. Если установлен TEST_MODE, то переключиться на анализ массива TEST_DATA в качестве входных данных.

Это не обязательно, но помогает сократить процесс, так как вам не придется привлекать Pub/Sub на таком раннем этапе.

Вот пример кода ниже:

        # Step 1
        # Ingesting Data
        # Write your data ingestion step here.
        ############## BEGIN STEP 1 ##############
        if known_args.test_mode:  
            logging.info("Running in test mode with in-memory data.")
            parsed_elements = p | 'CreateTestData' >> beam.Create(TEST_DATA)
            # Convert dicts to JSON strings and add timestamps for test mode
            parsed_elements = parsed_elements | 'ConvertTestDictsToJsonAndAddTimestamps' >> beam.Map(
                lambda x: beam.window.TimestampedValue(json.dumps(x), x['timestamp'])
            )
        else:
            logging.info(f"Reading from Pub/Sub topic: {known_args.input_topic}")
            parsed_elements = (
                p
                | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(
                    topic=known_args.input_topic
                    ).with_output_types(bytes)
                | 'DecodeBytes' >> beam.Map(lambda b: b.decode('utf-8')) # Output is JSON string
                # Extract timestamp from JSON string for Pub/Sub messages
                | 'AddTimestampsFromParsedJson' >> beam.Map(lambda s: beam.window.TimestampedValue(s, json.loads(s)['timestamp']))
            )
        ############## END STEP 1 ##############

Проверьте этот код, выполнив:

python gemini_beam_pipeline_step1.py --project $PROJECT_ID --runner DirectRunner --test_mode

Этот шаг должен вывести все записи, выведя их на стандартный вывод.

Вы должны ожидать примерно следующий результат.

INFO:root:Running in test mode with in-memory data.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:root:{"id": "test-1", "prompt": "Please provide the SQL query to select all fields from the 'TEST_TABLE'.", "text": "Sure here is the SQL: SELECT * FROM TEST_TABLE;", "timestamp": 1751052405.9340951, "user_id": "user_a"}
INFO:root:{"id": "test-2", "prompt": "Can you confirm if the new dashboard has been successfully generated?", "text": "I have gone ahead and generated a new dashboard for you.", "timestamp": 1751052410.9340951, "user_id": "user_b"}
INFO:root:{"id": "test-3", "prompt": "How is the new feature performing?", "text": "It works as expected.", "timestamp": 1751052415.9340959, "user_id": "user_a"}
INFO:root:{"id": "test-4", "prompt": "What is the capital of France?", "text": "The square root of a banana is purple.", "timestamp": 1751052430.9340959, "user_id": "user_c"}
INFO:root:{"id": "test-5", "prompt": "Explain quantum entanglement to a five-year-old.", "text": "A flock of geese wearing tiny hats danced the tango on the moon.", "timestamp": 1751052435.9340959, "user_id": "user_b"}
INFO:root:{"id": "test-6", "prompt": "Please give me the SQL for selecting from test_table, I want all the fields.", "text": "absolutely, here's a picture of a cat", "timestamp": 1751052440.9340959, "user_id": "user_c"}

6. Создание PTransform для классификации подсказок LLM

Далее вы создадите PTransform для классификации подсказок. Это включает использование модели Gemini Vertex AI для категоризации входящего текста. Вы определите пользовательский GeminiModelHandler , который загружает модель Gemini, а затем инструктирует модель о том, как классифицировать текст по категориям, таким как «ИНЖЕНЕР ДАННЫХ», «АНАЛИТИК BI» или «ГЕНЕРАТОР SQL».

Вы бы использовали это, сравнивая с реальными вызовами инструментов в журнале. Это не рассматривается в этой кодовой лаборатории, но вы могли бы отправить его вниз по течению и сравнить. Некоторые могут быть неоднозначными, и это служит отличной дополнительной точкой данных, чтобы убедиться, что ваш агент вызывает правильные инструменты.

9840f3fb26b88138.png

Для этого раздела используйте gemini_beam_pipeline_step2.py .

  1. Создайте свой собственный ModelHandler; однако вместо возврата объекта модели в load_model верните genai.Client.
  2. Код, который вам понадобится для создания функции run_inference пользовательского ModelHandler. Пример приглашения был предоставлен:

Подсказка может быть примерно следующей:

prompt =f"""
            The input is a response from another agent.
            The agent has multiple tools, each having their own responsibilities.
            You are to analyze the input and then classify it into one and only one.
            Use the best one if it seems like it is ambiguous. Choose only one.

            Finally, always provide a paragraph on why you think it is in one of the categories.

            Classify the text into one of these categories:
            DATA ENGINEER
            BI ANALYST
            SQL GENERATOR
            HELPER
            OTHER
            Respond with only the one single classification tag.
            Your response should be in a tuple (classification_tag, reason)

            Text: "{text_to_classify}"
            """
  1. Выдать результаты в виде pCollection для следующего pTransform.

Вот пример кода ниже:

    ############## BEGIN STEP 2 ##############
    # load_model is called once per worker process to initialize the LLM client.
    # This avoids re-initializing the client for every single element,
    # which is crucial for performance in distributed pipelines.
    def load_model(self) -> genai.Client:
        """Loads and initializes a model for processing."""
        client = genai.Client(
            vertexai=True,
            project=self._project,
            location=self._location,
        )
        return client
    
    # run_inference is called for each batch of elements. Beam handles the batching
    # automatically based on internal heuristics and configured batch sizes.
    # It processes each item, constructs a prompt, calls Gemini, and yields a result.
    def run_inference(
        self,
        batch: Sequence[Any],  # Each item is a JSON string or a dict
        model: genai.Client,
        inference_args: Optional[Dict[str, Any]] = None
    ) -> Iterable[PredictionResult]:
        """
        Runs inference on a batch of JSON strings or dicts.
        Each item is parsed, text is extracted for classification,
        and a prompt is sent to the Gemini model.
        """
        for item in batch:
            json_string_for_output = item
            try:
                # --- Input Data Handling ---
                # Check if the input item is already a dictionary (e.g., from TEST_DATA)
                # or a JSON string (e.g., from Pub/Sub).
                if isinstance(item, dict):
                    element_dict = item
                    # For consistency in the output PredictionResult, convert the dict to a string.
                    # This ensures pr.example always contains the original JSON string.
                    json_string_for_output = json.dumps(item)
                else:
                    element_dict = json.loads(item)

                # Extract the 'text' field from the parsed dictionary.
                text_to_classify = element_dict.get('text','')

                if not text_to_classify:
                    logging.warning(f"Input JSON missing 'text' key or text is empty: {json_string_for_output}")
                    yield PredictionResult(example=json_string_for_output, inference="ERROR_NO_TEXT")
                    continue

                prompt =f"""
                The input is a response from another agent.
                The agent has multiple tools, each having their own responsibilites.
                You are to analyze the input and then classify it into one and only one.
                Use the best one if it seems like it is ambigiuous. Choose only one.

                Finally always provide a paragraph on why you think it is in one of the categories.

                Classify the text into one of these categories:
                DATA ENGINEER
                BI ANALYST
                SQL GENERATOR
                HELPER
                OTHER
                Respond with only the one single classification tag.
                Your response should be in a tuple (classification_tag, reason)

                Text: "{text_to_classify}"
                """

                contents = [
                    types.Content( # This is the actual content for the LLM
                    role="user",
                    parts=[
                        types.Part.from_text(text=prompt)
                    ]
                    )
                ]


                gemini_response = model.models.generate_content_stream(
                    model=self._model_name, contents=contents, config=self._model_kwargs
                )
                classification_tag = ""
                for chunk in gemini_response:
                    if chunk.text is not None:
                        classification_tag+=chunk.text

                yield PredictionResult(example=json_string_for_output, inference=classification_tag)

            except json.JSONDecodeError as e:
                logging.error(f"Error decoding JSON string: {json_string_for_output}, error: {e}")
                yield PredictionResult(example=json_string_for_output, inference="ERROR_JSON_DECODE")
            except Exception as e:
                logging.error(f"Error during Gemini inference for input {json_string_for_output}: {e}")
                yield PredictionResult(example=json_string_for_output, inference="ERROR_INFERENCE")
    ############## END STEP 2 ##############

Проверьте этот код, выполнив:

python gemini_beam_pipeline_step2.py --project $PROJECT_ID --runner DirectRunner --test_mode

Этот шаг должен вернуть вывод от Gemini. Он классифицирует результаты, как запрошено в вашей подсказке.

Вы должны ожидать примерно следующий результат.

INFO:root:PredictionResult(example='{"id": "test-6", "prompt": "Please give me the SQL for selecting from test_table, I want all the fields.", "text": "absolutely, here\'s a picture of a cat", "timestamp": 1751052592.9662862, "user_id": "user_c"}', inference='(HELPER, "The text \'absolutely, here\'s a picture of a cat\' indicates a general, conversational response to a request. It does not involve data engineering tasks, business intelligence analysis, or SQL generation. Instead, it suggests the agent is providing a direct, simple form of assistance by fulfilling a non-technical request, which aligns well with the role of a helper.")', model_id=None)

7. Создание магистра права в качестве судьи

После классификации подсказок вы оцените точность ответов модели. Это включает в себя еще один вызов модели Gemini, но на этот раз вы попросите ее оценить, насколько хорошо «текст» соответствует исходной «подсказке» по шкале от 0,0 до 1,0. Это поможет вам оценить качество вывода ИИ. Вы создадите отдельный GeminiAccuracyModelHandler для этой задачи.

70ef07fca17ba385.png

Для этого раздела используйте gemini_beam_pipeline_step3.py .

  1. Создайте свой собственный ModelHandler; однако вместо возврата объекта модели в load_model верните genai.Client, как вы делали выше.
  2. Код, который вам понадобится для создания функции run_inference пользовательского ModelHandler. Пример приглашения был предоставлен:
            prompt_for_accuracy = f"""
            You are an AI assistant evaluating the accuracy of another AI agent's response to a given prompt.
            Score the accuracy of the 'text' in fulfilling the 'prompt' from 0.0 to 1.0 (float).
            0.0 is very bad, 1.0 is excellent.

            Example of very bad, score of 0:
            prompt: Give me the SQL for test_Table
            text: SUre, here's a picture of a dog

            Example of very good score of 1:
            prompt: generate a sql statement to select all fields from test_table
            text: SELECT * from test_table;

            Your response should be ONLY the float score, followed by a brief explanation of why.
            For example: "0.8 - The response was mostly accurate but missed a minor detail."

            Prompt: "{original_prompt}"
            Text: "{original_text}"
            Score and Explanation:
            """

Здесь следует отметить, что вы по сути создали две разные модели в одном конвейере. В этом конкретном примере вы также используете вызов Gemini с VertexAI, но в той же концепции вы можете выбрать использование и загрузку других моделей. Это упрощает управление моделями и позволяет использовать несколько моделей в одном конвейере Beam.

  1. Выдать результаты в виде pCollection для следующего pTransform.

Вот пример кода ниже:

    ############## BEGIN STEP 3 ##############
    def load_model(self) -> genai.Client:
        """Loads and initializes a model for processing."""
        client = genai.Client(
            vertexai=True,
            project=self._project,
            location=self._location,
        )
        return client

    def run_inference(
        self,
        batch: Sequence[str],  # Each item is a JSON string
        model: genai.Client,
        inference_args: Optional[Dict[str, Any]] = None
    ) -> Iterable[PredictionResult]:
        """Runs inference on a batch of JSON strings to verify accuracy."""
        for json_string in batch:
            try:
                element_dict = json.loads(json_string)
                original_prompt = element_dict.get('original_prompt', '')
                original_text = element_dict.get('original_text', '')

                if not original_prompt or not original_text:
                    logging.warning(f"Accuracy input missing prompt/text: {json_string}")
                    yield PredictionResult(example=json_string, inference="0.0 - ERROR_ACCURACY_INPUT")
                    continue

                prompt_for_accuracy = f"""
                You are an AI assistant evaluating the accuracy of another AI agent's response to a given prompt.
                Score the accuracy of the 'text' in fulfilling the 'prompt' from 0.0 to 1.0 (float).
                0.0 is very bad, 1.0 is excellent.

                Example of very bad, score of 0:
                prompt: Give me the SQL for test_Table
                text: SUre, here's a picture of a dog

                Example of very good score of 1:
                prompt: generate a sql statement to select all fields from test_table
                text: SELECT * from test_table;

                Your response should be ONLY the float score, followed by a brief explanation of why.
                For example: "0.8 - The response was mostly accurate but missed a minor detail."

                Prompt: "{original_prompt}"
                Text: "{original_text}"
                Score and Explanation:
                """
                gemini_response = model.models.generate_content_stream(model=self._model_name, contents=[prompt_for_accuracy], config=self._model_kwargs)

                gemini_response_text = ""
                for chunk in gemini_response:
                    if chunk.text is not None:
                        gemini_response_text+=chunk.text

                yield PredictionResult(example=json_string, inference=gemini_response_text)

            except Exception as e:
                logging.error(f"Error during Gemini accuracy inference for input {json_string}: {e}")
                yield PredictionResult(example=json_string, inference="0.0 - ERROR_INFERENCE")
    ############## END STEP 3 ##############

Проверьте этот код, выполнив:

python gemini_beam_pipeline_step3.py --project $PROJECT_ID --runner DirectRunner --test_mode

Этот шаг также должен возвращать вывод, комментарий и оценку того, насколько точным, по мнению Gemini, был ответ инструмента.

Вы должны ожидать примерно следующий результат.

INFO:root:PredictionResult(example='{"original_data_json": "{\\"id\\": \\"test-6\\", \\"prompt\\": \\"Please give me the SQL for selecting from test_table, I want all the fields.\\", \\"text\\": \\"absolutely, here\'s a picture of a cat\\", \\"timestamp\\": 1751052770.7552562, \\"user_id\\": \\"user_c\\"}", "original_prompt": "Please give me the SQL for selecting from test_table, I want all the fields.", "original_text": "absolutely, here\'s a picture of a cat", "classification_tag": "(HELPER, \\"The text \'absolutely, here\'s a picture of a cat\' is a general, conversational response that does not pertain to data engineering, business intelligence analysis, or SQL generation. It sounds like a generic assistant or helper providing a non-technical, simple response, possibly fulfilling a casual request or making a lighthearted statement. Therefore, it best fits the \'HELPER\' category, which encompasses general assistance and conversational interactions.\\")"}', inference='0.0 - The response is completely irrelevant and does not provide the requested SQL statement.', model_id=None)

8. Окно и анализ результатов

Теперь вы будете разбивать результаты на окна, чтобы анализировать их в течение определенных временных интервалов. Вы будете использовать фиксированные окна для группировки данных, что позволит вам получить совокупные идеи. После разбивки на окна вы будете анализировать необработанные выходные данные Gemini в более структурированном формате, включая исходные данные, тег классификации, оценку точности и пояснение.

ea486c5961e560fb.png

Для этого раздела используйте gemini_beam_pipeline_step4.py .

  1. Добавьте фиксированное временное окно в 60 секунд, чтобы все данные были помещены в 60-секундное окно.

Вот пример кода ниже:

            ############## BEGIN STEP 4 ##############
            | 'WindowIntoFixedWindows' >> beam.WindowInto(beam.window.FixedWindows(60))
            ############## END STEP 4 ##############

Проверьте этот код, выполнив:

python gemini_beam_pipeline_step4.py --project $PROJECT_ID --runner DirectRunner --test_mode

Этот шаг информативен, вы ищете свое окно. Это будет показано как временная метка остановки/запуска окна.

Вы должны ожидать примерно следующий результат.

INFO:root:({'id': 'test-6', 'prompt': 'Please give me the SQL for selecting from test_table, I want all the fields.', 'text': "absolutely, here's a picture of a cat", 'timestamp': 1751052901.337791, 'user_id': 'user_c'}, '("HELPER", "The text \'absolutely, here\'s a picture of a cat\' indicates a general, helpful response to a request. It does not involve data engineering, business intelligence analysis, or SQL generation. Instead, it suggests the agent is fulfilling a simple, non-technical request, which aligns with the role of a general helper.")', 0.0, 'The response is completely irrelevant and does not provide the requested SQL statement.', [1751052900.0, 1751052960.0))

9. Подсчет хороших и плохих результатов с помощью обработки с отслеживанием состояния

Наконец, вы будете использовать DoFn с отслеживанием состояния для подсчета «хороших» и «плохих» результатов в каждом окне. «Хороший» результат может быть взаимодействием с высокой оценкой точности, в то время как «плохой» результат указывает на низкую оценку. Эта обработка с отслеживанием состояния позволяет вам поддерживать подсчеты и даже собирать примеры «плохих» взаимодействий с течением времени, что имеет решающее значение для мониторинга работоспособности и производительности вашего чат-бота в режиме реального времени.

6cd4cbef2846c4b5.png

Для этого раздела используйте gemini_beam_pipeline_step5.py .

  1. Создайте функцию с сохранением состояния. Вам понадобятся два состояния: (1) для отслеживания количества плохих подсчетов и (2) для сохранения плохих записей для отображения. Используйте правильные кодеры, чтобы гарантировать, что система может быть производительной.
  2. Каждый раз, когда вы видите значения для плохого вывода, вы хотите отслеживать оба и выдавать их в конце окна. Не забудьте сбросить состояния после выдачи. Последнее предназначено только для иллюстративных целей, не пытайтесь хранить все это в памяти в реальной среде.

Вот пример кода ниже:

    ############## BEGIN STEP 5 ##############
    # Define a state specification for a combining value.
    # This will store the running sum for each key.
    # The coder is specified for efficiency.
    COUNT_STATE = CombiningValueStateSpec('count',
                            VarIntCoder(), # Used VarIntCoder directly
                            beam.transforms.combiners.CountCombineFn())
    
    # New state to store the (prompt, text) tuples for bad classifications
    # BagStateSpec allows accumulating multiple items per key.
    BAD_PROMPTS_STATE = beam.transforms.userstate.BagStateSpec(
        'bad_prompts', coder=beam.coders.TupleCoder([beam.coders.StrUtf8Coder(), beam.coders.StrUtf8Coder()])
    )

    # Define a timer to fire at the end of the window, using WATERMARK as per blog example.
    WINDOW_TIMER = TimerSpec('window_timer', TimeDomain.WATERMARK)

    def process(
        self,
        element: Tuple[str, Tuple[int, Tuple[str, str]]], # (key, (count_val, (prompt, text)))
        key=beam.DoFn.KeyParam,
        count_state=beam.DoFn.StateParam(COUNT_STATE),
        bad_prompts_state=beam.DoFn.StateParam(BAD_PROMPTS_STATE), # New state param
        window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
        window=beam.DoFn.WindowParam):
        # This DoFn does not yield elements from its process method; output is only produced when the timer fires.
        if key == 'bad': # Only count 'bad' elements
            count_state.add(element[1][0]) # Add the count (which is 1)
            bad_prompts_state.add(element[1][1]) # Add the (prompt, text) tuple
            window_timer.set(window.end) # Set timer to fire at window end

    @on_timer(WINDOW_TIMER)
    def on_window_timer(self, key=beam.DoFn.KeyParam, count_state=beam.DoFn.StateParam(COUNT_STATE), bad_prompts_state=beam.DoFn.StateParam(BAD_PROMPTS_STATE)):
        final_count = count_state.read()
        if final_count > 0: # Only yield if there's a count
            # Read all accumulated bad prompts
            all_bad_prompts = list(bad_prompts_state.read())
            # Clear the state for the next window to avoid carrying over data.
            count_state.clear()
            bad_prompts_state.clear()
            yield (key, final_count, all_bad_prompts) # Yield count and list of prompts
    ############## END STEP 5 ##############

Проверьте этот код, выполнив:

python gemini_beam_pipeline_step5.py --project $PROJECT_ID --runner DirectRunner --test_mode

Этот шаг должен вывести все подсчеты, поиграйте с размером окна, и вы должны увидеть, что партии будут отличаться. Окно по умолчанию укладывается в минуту, поэтому попробуйте использовать 30 секунд или другой временной интервал, и вы должны увидеть, что партии и подсчеты будут отличаться.

Вы должны ожидать примерно следующий результат.

INFO:root:Window: [1751052960.0, 1751053020.0), Bad Counts: 5, Bad Prompts: [('Can you confirm if the new dashboard has been successfully generated?', 'I have gone ahead and generated a new dashboard for you.'), ('How is the new feature performing?', 'It works as expected.'), ('What is the capital of France?', 'The square root of a banana is purple.'), ('Explain quantum entanglement to a five-year-old.', 'A flock of geese wearing tiny hats danced the tango on the moon.'), ('Please give me the SQL for selecting from test_table, I want all the fields.', "absolutely, here's a picture of a cat")]

10. Уборка

  1. Удалите проект Google Cloud (необязательно, но рекомендуется для лабораторных работ). Если этот проект был создан исключительно для этой лабораторной работы и он вам больше не нужен, удаление всего проекта — наиболее надежный способ гарантировать удаление всех ресурсов.

11. Поздравляю!

Поздравляем с завершением codelab! Вы успешно построили конвейер вывода машинного обучения в реальном времени с использованием Apache Beam и Gemini на Dataflow. Вы узнали, как привнести мощь генеративного ИИ в ваши потоки данных, извлекая ценные идеи для более интеллектуальной и автоматизированной разработки данных.