Apache Beam এবং Dataflow সহ রিয়েল-টাইম AI/ML মূল্যায়ন

1. ভূমিকা

আজকের দ্রুত-গতির ডেটা ল্যান্ডস্কেপে, জ্ঞাত সিদ্ধান্ত নেওয়ার জন্য রিয়েল-টাইম অন্তর্দৃষ্টি অত্যন্ত গুরুত্বপূর্ণ। এই কোডল্যাব একটি রিয়েল-টাইম মূল্যায়ন পাইপলাইন তৈরির মাধ্যমে আপনাকে গাইড করবে। আমরা Apache Beam ফ্রেমওয়ার্কের সুবিধা দিয়ে শুরু করব, যা ব্যাচ এবং স্ট্রিমিং ডেটা উভয়ের জন্য একটি ইউনিফাইড প্রোগ্রামিং মডেল অফার করে। এটি জটিল বিতরণকৃত কম্পিউটিং লজিককে বিমূর্ত করে পাইপলাইন উন্নয়নকে উল্লেখযোগ্যভাবে সহজ করে যা অন্যথায় আপনাকে স্ক্র্যাচ থেকে তৈরি করতে হবে। একবার বীম ব্যবহার করে আপনার পাইপলাইনটি সংজ্ঞায়িত হয়ে গেলে, আপনি এটিকে Google ক্লাউড ডেটাফ্লোতে নির্বিঘ্নে চালাবেন, একটি সম্পূর্ণরূপে পরিচালিত পরিষেবা যা আপনার ডেটা প্রক্রিয়াকরণের প্রয়োজনের জন্য অতুলনীয় স্কেল এবং কর্মক্ষমতা প্রদান করে৷

এই কোডল্যাবে, আপনি শিখবেন কিভাবে মেশিন লার্নিং ইনফারেন্সের জন্য একটি স্কেলযোগ্য Apache Beam পাইপলাইন আর্কিটেক্ট করতে হয়, Vertex AI এর জেমিনি মডেলকে একীভূত করার জন্য একটি কাস্টম মডেলহ্যান্ডলার ডেভেলপ করতে হয়, ডেটা স্ট্রীমে ইন্টেলিজেন্ট টেক্সট ক্লাসিফিকেশনের জন্য লিভারেজ প্রম্পট ইঞ্জিনিয়ারিং এবং Google-এ এই স্ট্রিমিং ML Da Inference পাইপলাইন স্থাপন ও পরিচালনা করতে হয়। শেষ পর্যন্ত, আপনি রিয়েল-টাইম ডেটা বোঝার জন্য এবং ইঞ্জিনিয়ারিং ওয়ার্কফ্লোতে অবিচ্ছিন্ন মূল্যায়নের জন্য মেশিন লার্নিং প্রয়োগ করার জন্য মূল্যবান অন্তর্দৃষ্টি অর্জন করবেন, বিশেষ করে শক্তিশালী এবং ব্যবহারকারী-কেন্দ্রিক কথোপকথন AI বজায় রাখার জন্য।

দৃশ্যকল্প

আপনার কোম্পানি একটি ডেটা এজেন্ট তৈরি করেছে। আপনার ডেটা এজেন্ট, এজেন্ট ডেভেলপমেন্ট কিট (ADK) দিয়ে তৈরি, ডেটা-সম্পর্কিত কাজগুলিতে সহায়তা করার জন্য বিভিন্ন বিশেষ ক্ষমতা দিয়ে সজ্জিত। এটিকে একটি বহুমুখী ডেটা সহকারী হিসাবে কল্পনা করুন, বিভিন্ন অনুরোধগুলি পরিচালনা করার জন্য প্রস্তুত, অন্তর্দৃষ্টিপূর্ণ প্রতিবেদন তৈরি করার জন্য একজন BI বিশ্লেষক হিসাবে কাজ করা থেকে, একজন ডেটা ইঞ্জিনিয়ার হিসাবে আপনাকে শক্তিশালী ডেটা পাইপলাইন তৈরি করতে সাহায্য করে, অথবা একটি SQL জেনারেটর যা সুনির্দিষ্ট SQL স্টেটমেন্ট তৈরি করে এবং আরও অনেক কিছু। এই এজেন্টের প্রতিটি মিথস্ক্রিয়া, এটি তৈরি করা প্রতিটি প্রতিক্রিয়া স্বয়ংক্রিয়ভাবে Firestore-এ সংরক্ষিত হয়। কিন্তু কেন আমাদের এখানে পাইপলাইন দরকার?

591df0e9110b9f86.png

কারণ Firestore থেকে, একটি ট্রিগার নির্বিঘ্নে এই ইন্টারঅ্যাকশন ডেটা পাব/সাব-এ পাঠায়, নিশ্চিত করে যে আমরা রিয়েল-টাইমে এই সমালোচনামূলক কথোপকথনগুলি অবিলম্বে প্রক্রিয়া করতে এবং বিশ্লেষণ করতে পারি।

4577e473831fbb87.png

2. আপনি শুরু করার আগে

একটি প্রকল্প তৈরি করুন

  1. Google ক্লাউড কনসোলে , প্রকল্প নির্বাচক পৃষ্ঠায়, একটি Google ক্লাউড প্রকল্প নির্বাচন করুন বা তৈরি করুন।
  2. নিশ্চিত করুন যে আপনার ক্লাউড প্রকল্পের জন্য বিলিং সক্ষম করা আছে৷ একটি প্রকল্পে বিলিং সক্ষম কিনা তা পরীক্ষা করতে শিখুন।
  3. এই লিঙ্কে ক্লিক করে ক্লাউড শেল সক্রিয় করুন। আপনি ক্লাউড শেল থেকে সংশ্লিষ্ট বোতামে ক্লিক করে ক্লাউড শেল টার্মিনাল (ক্লাউড কমান্ড চালানোর জন্য) এবং সম্পাদক (প্রকল্প নির্মাণের জন্য) এর মধ্যে টগল করতে পারেন।
  4. একবার ক্লাউড শেলের সাথে সংযুক্ত হয়ে গেলে, আপনি পরীক্ষা করে দেখুন যে আপনি ইতিমধ্যেই প্রমাণীকৃত হয়েছেন এবং নিম্নলিখিত কমান্ডটি ব্যবহার করে প্রকল্পটি আপনার প্রকল্প আইডিতে সেট করা আছে:
gcloud auth list
  1. gcloud কমান্ড আপনার প্রকল্প সম্পর্কে জানে তা নিশ্চিত করতে ক্লাউড শেলে নিম্নলিখিত কমান্ডটি চালান।
gcloud config list project
  1. যদি আপনার প্রজেক্ট সেট করা না থাকে, তাহলে এটি সেট করতে নিম্নলিখিত কমান্ডটি ব্যবহার করুন:
export PROJECT_ID=<YOUR_PROJECT_ID>
gcloud config set project $PROJECT_ID
  1. নীচে দেখানো কমান্ডের মাধ্যমে প্রয়োজনীয় API গুলি সক্ষম করুন৷ এটি কয়েক মিনিট সময় নিতে পারে, তাই ধৈর্য ধরুন।
gcloud services enable \
    dataflow.googleapis.com \
    pubsub.googleapis.com \
    aiplatform.googleapis.com \
    compute.googleapis.com
  1. পাইথন 3.10+ আছে তা নিশ্চিত করুন
  2. পাইথন প্যাকেজ ইনস্টল করুন

আপনার ক্লাউড শেল পরিবেশে Apache Beam, Google Cloud Vertex AI, এবং Google Generative AI-এর জন্য প্রয়োজনীয় পাইথন লাইব্রেরিগুলি ইনস্টল করুন৷

pip install apache-beam[gcp] google-genai
  1. গিথুব রেপো ক্লোন করুন এবং ডেমো ডিরেক্টরিতে স্যুইচ করুন।
git clone https://github.com/GoogleCloudPlatform/devrel-demos
cd devrel-demos/data-analytics/beam_as_eval

gcloud কমান্ড এবং ব্যবহারের জন্য ডকুমেন্টেশন পড়ুন।

3. প্রদত্ত Github সংগ্রহস্থল কিভাবে ব্যবহার করবেন

এই কোডল্যাবের সাথে যুক্ত GitHub সংগ্রহস্থল, যা https://github.com/GoogleCloudPlatform/devrel-demos/tree/main/data-analytics/beam_as_eval এ পাওয়া যায়, একটি নির্দেশিত শেখার অভিজ্ঞতার সুবিধার্থে সংগঠিত হয়েছে। এটিতে কঙ্কাল কোড রয়েছে যা কোডল্যাবের প্রতিটি স্বতন্ত্র অংশের সাথে সারিবদ্ধ করে, উপাদানটির মাধ্যমে একটি স্পষ্ট অগ্রগতি নিশ্চিত করে।

সংগ্রহস্থলের মধ্যে, আপনি দুটি প্রাথমিক ফোল্ডার আবিষ্কার করবেন: "সম্পূর্ণ" এবং "অসম্পূর্ণ।" "সম্পূর্ণ" ফোল্ডারে প্রতিটি ধাপের জন্য সম্পূর্ণ কার্যকরী কোড রয়েছে, যা আপনাকে উদ্দেশ্যমূলক আউটপুট চালানো এবং পর্যবেক্ষণ করতে দেয়। বিপরীতভাবে, "অসম্পূর্ণ" ফোল্ডারটি পূর্ববর্তী ধাপগুলি থেকে কোড প্রদান করে, অনুশীলনের অংশ হিসাবে আপনার সম্পূর্ণ করার জন্য নির্দিষ্ট বিভাগগুলিকে ##### START STEP <NUMBER> ##### এবং ##### END STEP <NUMBER> ##### এর মধ্যে চিহ্নিত করে রাখে। কোডিং চ্যালেঞ্জে সক্রিয়ভাবে অংশগ্রহণ করার সময় এই কাঠামো আপনাকে পূর্বের জ্ঞানের উপর ভিত্তি করে গড়ে তুলতে সক্ষম করে।

42015376afc03a0b.png

4. আর্কিটেকচারাল ওভারভিউ

আমাদের পাইপলাইন ডেটা স্ট্রিমগুলিতে ML অনুমানকে একীভূত করার জন্য একটি শক্তিশালী এবং মাপযোগ্য প্যাটার্ন প্রদান করে। এখানে কিভাবে টুকরা একসাথে মাপসই করা হয়:

335470916fedd7af.png

আপনার বীম পাইপলাইনে, আপনি শর্তসাপেক্ষে একাধিক ইনপুট কোড করবেন এবং তারপর RunInference টার্নকি ট্রান্সফর্মের সাথে কাস্টম মডেল লোড করবেন। যদিও আপনি উদাহরণে VertexAI-এর সাথে Gemini ব্যবহার করেন, তবুও এটি দেখায় যে আপনার কাছে থাকা মডেলের সংখ্যার সাথে মানানসই করার জন্য আপনি মূলত একাধিক মডেলহ্যান্ডলার তৈরি করবেন। অবশেষে, আপনি ইভেন্টগুলির ট্র্যাক রাখতে এবং নিয়ন্ত্রিত পদ্ধতিতে নির্গত করতে একটি রাষ্ট্রীয় DoFn ব্যবহার করবেন।

ece1725721653b80.png

5. ডেটা গ্রহণ করা

প্রথমে, আপনি ডেটা ইনজেস্ট করার জন্য আপনার পাইপলাইন সেট আপ করবেন। আপনি রিয়েল-টাইম স্ট্রিমিংয়ের জন্য পাব/সাব ব্যবহার করবেন, তবে বিকাশকে সহজ করতে, আপনি একটি পরীক্ষা মোডও তৈরি করবেন। এই test_mode আপনাকে পূর্বনির্ধারিত নমুনা ডেটা ব্যবহার করে স্থানীয়ভাবে পাইপলাইন চালানোর অনুমতি দেয়, তাই আপনার পাইপলাইন কাজ করে কিনা তা দেখার জন্য আপনার লাইভ পাব/সাব স্ট্রিমের প্রয়োজন নেই।

4153613f05f28c78.png

এই বিভাগের জন্য gemini_beam_pipeline_step1.py ব্যবহার করুন।

  1. প্রদত্ত পাইপলাইন অবজেক্ট p ব্যবহার করে একটি Pub/Sub ইনপুট কোড করুন এবং আউটপুটটিকে pCollection হিসাবে লিখুন।
  2. TEST_MODE সেট করা হয়েছে কিনা তা নির্ধারণ করতে অতিরিক্তভাবে একটি পতাকা ব্যবহার করুন।
  3. যদি TEST_MODE সেট করা থাকে, তাহলে একটি ইনপুট হিসাবে TEST_DATA অ্যারে পার্সিং এ স্যুইচ করুন।

এটি প্রয়োজনীয় নয় তবে এটি প্রক্রিয়াটিকে সংক্ষিপ্ত করতে সহায়তা করে তাই আপনাকে এই তাড়াতাড়ি পাব/সাবকে জড়িত করার দরকার নেই।

এখানে নীচের কোডের একটি উদাহরণ:

        # Step 1
        # Ingesting Data
        # Write your data ingestion step here.
        ############## BEGIN STEP 1 ##############
        if known_args.test_mode:  
            logging.info("Running in test mode with in-memory data.")
            parsed_elements = p | 'CreateTestData' >> beam.Create(TEST_DATA)
            # Convert dicts to JSON strings and add timestamps for test mode
            parsed_elements = parsed_elements | 'ConvertTestDictsToJsonAndAddTimestamps' >> beam.Map(
                lambda x: beam.window.TimestampedValue(json.dumps(x), x['timestamp'])
            )
        else:
            logging.info(f"Reading from Pub/Sub topic: {known_args.input_topic}")
            parsed_elements = (
                p
                | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(
                    topic=known_args.input_topic
                    ).with_output_types(bytes)
                | 'DecodeBytes' >> beam.Map(lambda b: b.decode('utf-8')) # Output is JSON string
                # Extract timestamp from JSON string for Pub/Sub messages
                | 'AddTimestampsFromParsedJson' >> beam.Map(lambda s: beam.window.TimestampedValue(s, json.loads(s)['timestamp']))
            )
        ############## END STEP 1 ##############

এই কোডটি সম্পাদন করে পরীক্ষা করুন:

python gemini_beam_pipeline_step1.py --project $PROJECT_ID --runner DirectRunner --test_mode

এই ধাপে সমস্ত রেকর্ড নির্গত করা উচিত, তাদের stdout এ লগ করা।

আপনি নিম্নলিখিত মত একটি আউটপুট আশা করা উচিত.

INFO:root:Running in test mode with in-memory data.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:root:{"id": "test-1", "prompt": "Please provide the SQL query to select all fields from the 'TEST_TABLE'.", "text": "Sure here is the SQL: SELECT * FROM TEST_TABLE;", "timestamp": 1751052405.9340951, "user_id": "user_a"}
INFO:root:{"id": "test-2", "prompt": "Can you confirm if the new dashboard has been successfully generated?", "text": "I have gone ahead and generated a new dashboard for you.", "timestamp": 1751052410.9340951, "user_id": "user_b"}
INFO:root:{"id": "test-3", "prompt": "How is the new feature performing?", "text": "It works as expected.", "timestamp": 1751052415.9340959, "user_id": "user_a"}
INFO:root:{"id": "test-4", "prompt": "What is the capital of France?", "text": "The square root of a banana is purple.", "timestamp": 1751052430.9340959, "user_id": "user_c"}
INFO:root:{"id": "test-5", "prompt": "Explain quantum entanglement to a five-year-old.", "text": "A flock of geese wearing tiny hats danced the tango on the moon.", "timestamp": 1751052435.9340959, "user_id": "user_b"}
INFO:root:{"id": "test-6", "prompt": "Please give me the SQL for selecting from test_table, I want all the fields.", "text": "absolutely, here's a picture of a cat", "timestamp": 1751052440.9340959, "user_id": "user_c"}

6. LLM প্রম্পট শ্রেণীবিভাগের জন্য একটি PTransform তৈরি করা

পরবর্তী, আপনি প্রম্পট শ্রেণীবদ্ধ করতে একটি PTransform তৈরি করবেন। এর মধ্যে আগত পাঠ্যকে শ্রেণীবদ্ধ করার জন্য Vertex AI এর জেমিনি মডেল ব্যবহার করা জড়িত। আপনি একটি কাস্টম GeminiModelHandler সংজ্ঞায়িত করবেন যা জেমিনি মডেল লোড করে এবং তারপরে মডেলটিকে নির্দেশ দেয় কিভাবে পাঠ্যটিকে "ডেটা ইঞ্জিনিয়ার," "BI বিশ্লেষক," বা "SQL জেনারেটর" এর মতো বিভাগে শ্রেণিবদ্ধ করতে হয়।

আপনি লগের প্রকৃত টুল কলের সাথে তুলনা করে এটি ব্যবহার করবেন। এটি এই কোডল্যাবে কভার করা হয়নি তবে আপনি এটিকে ডাউনস্ট্রিম পাঠাতে পারেন এবং এটি তুলনা করতে পারেন। এমন কিছু হতে পারে যা অস্পষ্ট এবং এটি একটি দুর্দান্ত অতিরিক্ত ডেটাপয়েন্ট হিসাবে কাজ করে তা নিশ্চিত করতে আপনার এজেন্ট সঠিক সরঞ্জামগুলিকে কল করছে।

9840f3fb26b88138.png

এই বিভাগের জন্য gemini_beam_pipeline_step2.py ব্যবহার করুন।

  1. আপনার কাস্টম মডেলহ্যান্ডলার তৈরি করুন; যাইহোক, load_model এ একটি মডেল অবজেক্ট ফেরত না দিয়ে, genai.Client ফেরত দিন।
  2. কোডটি আপনাকে কাস্টম মডেলহ্যান্ডলারের রান_ইনফারেন্স ফাংশন তৈরি করতে হবে। একটি নমুনা প্রম্পট প্রদান করা হয়েছে:

প্রম্পটটি নিম্নরূপ কিছু হতে পারে:

prompt =f"""
            The input is a response from another agent.
            The agent has multiple tools, each having their own responsibilities.
            You are to analyze the input and then classify it into one and only one.
            Use the best one if it seems like it is ambiguous. Choose only one.

            Finally, always provide a paragraph on why you think it is in one of the categories.

            Classify the text into one of these categories:
            DATA ENGINEER
            BI ANALYST
            SQL GENERATOR
            HELPER
            OTHER
            Respond with only the one single classification tag.
            Your response should be in a tuple (classification_tag, reason)

            Text: "{text_to_classify}"
            """
  1. পরবর্তী pTransform এর জন্য একটি pCollection হিসাবে ফলাফল প্রদান করুন।

এখানে নীচের কোডের একটি উদাহরণ:

    ############## BEGIN STEP 2 ##############
    # load_model is called once per worker process to initialize the LLM client.
    # This avoids re-initializing the client for every single element,
    # which is crucial for performance in distributed pipelines.
    def load_model(self) -> genai.Client:
        """Loads and initializes a model for processing."""
        client = genai.Client(
            vertexai=True,
            project=self._project,
            location=self._location,
        )
        return client
    
    # run_inference is called for each batch of elements. Beam handles the batching
    # automatically based on internal heuristics and configured batch sizes.
    # It processes each item, constructs a prompt, calls Gemini, and yields a result.
    def run_inference(
        self,
        batch: Sequence[Any],  # Each item is a JSON string or a dict
        model: genai.Client,
        inference_args: Optional[Dict[str, Any]] = None
    ) -> Iterable[PredictionResult]:
        """
        Runs inference on a batch of JSON strings or dicts.
        Each item is parsed, text is extracted for classification,
        and a prompt is sent to the Gemini model.
        """
        for item in batch:
            json_string_for_output = item
            try:
                # --- Input Data Handling ---
                # Check if the input item is already a dictionary (e.g., from TEST_DATA)
                # or a JSON string (e.g., from Pub/Sub).
                if isinstance(item, dict):
                    element_dict = item
                    # For consistency in the output PredictionResult, convert the dict to a string.
                    # This ensures pr.example always contains the original JSON string.
                    json_string_for_output = json.dumps(item)
                else:
                    element_dict = json.loads(item)

                # Extract the 'text' field from the parsed dictionary.
                text_to_classify = element_dict.get('text','')

                if not text_to_classify:
                    logging.warning(f"Input JSON missing 'text' key or text is empty: {json_string_for_output}")
                    yield PredictionResult(example=json_string_for_output, inference="ERROR_NO_TEXT")
                    continue

                prompt =f"""
                The input is a response from another agent.
                The agent has multiple tools, each having their own responsibilites.
                You are to analyze the input and then classify it into one and only one.
                Use the best one if it seems like it is ambigiuous. Choose only one.

                Finally always provide a paragraph on why you think it is in one of the categories.

                Classify the text into one of these categories:
                DATA ENGINEER
                BI ANALYST
                SQL GENERATOR
                HELPER
                OTHER
                Respond with only the one single classification tag.
                Your response should be in a tuple (classification_tag, reason)

                Text: "{text_to_classify}"
                """

                contents = [
                    types.Content( # This is the actual content for the LLM
                    role="user",
                    parts=[
                        types.Part.from_text(text=prompt)
                    ]
                    )
                ]


                gemini_response = model.models.generate_content_stream(
                    model=self._model_name, contents=contents, config=self._model_kwargs
                )
                classification_tag = ""
                for chunk in gemini_response:
                    if chunk.text is not None:
                        classification_tag+=chunk.text

                yield PredictionResult(example=json_string_for_output, inference=classification_tag)

            except json.JSONDecodeError as e:
                logging.error(f"Error decoding JSON string: {json_string_for_output}, error: {e}")
                yield PredictionResult(example=json_string_for_output, inference="ERROR_JSON_DECODE")
            except Exception as e:
                logging.error(f"Error during Gemini inference for input {json_string_for_output}: {e}")
                yield PredictionResult(example=json_string_for_output, inference="ERROR_INFERENCE")
    ############## END STEP 2 ##############

এই কোডটি সম্পাদন করে পরীক্ষা করুন:

python gemini_beam_pipeline_step2.py --project $PROJECT_ID --runner DirectRunner --test_mode

এই পদক্ষেপটি মিথুন থেকে একটি অনুমান ফেরত দেবে। এটি আপনার প্রম্পটের অনুরোধ অনুসারে ফলাফলগুলিকে শ্রেণীবদ্ধ করবে।

আপনি নিম্নলিখিত মত একটি আউটপুট আশা করা উচিত.

INFO:root:PredictionResult(example='{"id": "test-6", "prompt": "Please give me the SQL for selecting from test_table, I want all the fields.", "text": "absolutely, here\'s a picture of a cat", "timestamp": 1751052592.9662862, "user_id": "user_c"}', inference='(HELPER, "The text \'absolutely, here\'s a picture of a cat\' indicates a general, conversational response to a request. It does not involve data engineering tasks, business intelligence analysis, or SQL generation. Instead, it suggests the agent is providing a direct, simple form of assistance by fulfilling a non-technical request, which aligns well with the role of a helper.")', model_id=None)

7. একজন বিচারক হিসেবে এলএলএম তৈরি করা

প্রম্পট শ্রেণীবদ্ধ করার পরে, আপনি মডেলের প্রতিক্রিয়াগুলির যথার্থতা মূল্যায়ন করবেন। এটি জেমিনি মডেলের সাথে আরেকটি কল জড়িত, কিন্তু এই সময়, আপনি 0.0 থেকে 1.0 স্কেলে "টেক্সট" কতটা ভালভাবে মূল "প্রম্পট" পূরণ করে তা স্কোর করতে অনুরোধ করবেন। এটি আপনাকে AI এর আউটপুটের গুণমান বুঝতে সাহায্য করে। আপনি এই কাজের জন্য একটি পৃথক GeminiAccuracyModelHandler তৈরি করবেন।

70ef07fca17ba385.png

এই বিভাগের জন্য gemini_beam_pipeline_step3.py ব্যবহার করুন।

  1. আপনার কাস্টম মডেলহ্যান্ডলার তৈরি করুন; যাইহোক, load_model-এ একটি মডেল অবজেক্ট ফেরত দেওয়ার পরিবর্তে, আপনি উপরে যেমন করেছেন ঠিক তেমনি genai.Client ফেরত দিন।
  2. কোডটি আপনাকে কাস্টম মডেলহ্যান্ডলারের রান_ইনফারেন্স ফাংশন তৈরি করতে হবে। একটি নমুনা প্রম্পট প্রদান করা হয়েছে:
            prompt_for_accuracy = f"""
            You are an AI assistant evaluating the accuracy of another AI agent's response to a given prompt.
            Score the accuracy of the 'text' in fulfilling the 'prompt' from 0.0 to 1.0 (float).
            0.0 is very bad, 1.0 is excellent.

            Example of very bad, score of 0:
            prompt: Give me the SQL for test_Table
            text: SUre, here's a picture of a dog

            Example of very good score of 1:
            prompt: generate a sql statement to select all fields from test_table
            text: SELECT * from test_table;

            Your response should be ONLY the float score, followed by a brief explanation of why.
            For example: "0.8 - The response was mostly accurate but missed a minor detail."

            Prompt: "{original_prompt}"
            Text: "{original_text}"
            Score and Explanation:
            """

এখানে উল্লেখ্য একটি জিনিস হল আপনি মূলত একই পাইপলাইনে দুটি ভিন্ন মডেল তৈরি করেছেন। এই বিশেষ উদাহরণে, আপনি VertexAI-এর সাথে একটি জেমিনি কলও ব্যবহার করছেন কিন্তু একই ধারণায় আপনি অন্যান্য মডেল ব্যবহার এবং লোড করতে বেছে নিতে পারেন। এটি আপনার মডেল পরিচালনাকে সহজ করে এবং আপনাকে একই বীম পাইপলাইনের মধ্যে একাধিক মডেল ব্যবহার করার অনুমতি দেয়।

  1. পরবর্তী pTransform এর জন্য একটি pCollection হিসাবে ফলাফল প্রদান করুন।

এখানে নীচের কোডের একটি উদাহরণ:

    ############## BEGIN STEP 3 ##############
    def load_model(self) -> genai.Client:
        """Loads and initializes a model for processing."""
        client = genai.Client(
            vertexai=True,
            project=self._project,
            location=self._location,
        )
        return client

    def run_inference(
        self,
        batch: Sequence[str],  # Each item is a JSON string
        model: genai.Client,
        inference_args: Optional[Dict[str, Any]] = None
    ) -> Iterable[PredictionResult]:
        """Runs inference on a batch of JSON strings to verify accuracy."""
        for json_string in batch:
            try:
                element_dict = json.loads(json_string)
                original_prompt = element_dict.get('original_prompt', '')
                original_text = element_dict.get('original_text', '')

                if not original_prompt or not original_text:
                    logging.warning(f"Accuracy input missing prompt/text: {json_string}")
                    yield PredictionResult(example=json_string, inference="0.0 - ERROR_ACCURACY_INPUT")
                    continue

                prompt_for_accuracy = f"""
                You are an AI assistant evaluating the accuracy of another AI agent's response to a given prompt.
                Score the accuracy of the 'text' in fulfilling the 'prompt' from 0.0 to 1.0 (float).
                0.0 is very bad, 1.0 is excellent.

                Example of very bad, score of 0:
                prompt: Give me the SQL for test_Table
                text: SUre, here's a picture of a dog

                Example of very good score of 1:
                prompt: generate a sql statement to select all fields from test_table
                text: SELECT * from test_table;

                Your response should be ONLY the float score, followed by a brief explanation of why.
                For example: "0.8 - The response was mostly accurate but missed a minor detail."

                Prompt: "{original_prompt}"
                Text: "{original_text}"
                Score and Explanation:
                """
                gemini_response = model.models.generate_content_stream(model=self._model_name, contents=[prompt_for_accuracy], config=self._model_kwargs)

                gemini_response_text = ""
                for chunk in gemini_response:
                    if chunk.text is not None:
                        gemini_response_text+=chunk.text

                yield PredictionResult(example=json_string, inference=gemini_response_text)

            except Exception as e:
                logging.error(f"Error during Gemini accuracy inference for input {json_string}: {e}")
                yield PredictionResult(example=json_string, inference="0.0 - ERROR_INFERENCE")
    ############## END STEP 3 ##############

এই কোডটি সম্পাদন করে পরীক্ষা করুন:

python gemini_beam_pipeline_step3.py --project $PROJECT_ID --runner DirectRunner --test_mode

এই পদক্ষেপটি একটি অনুমানও প্রদান করবে, এটি মন্তব্য করবে এবং মিথুন কতটা সঠিক ভাবে টুলটি সাড়া দিয়েছে তার উপর একটি স্কোর প্রদান করবে।

আপনি নিম্নলিখিত মত একটি আউটপুট আশা করা উচিত.

INFO:root:PredictionResult(example='{"original_data_json": "{\\"id\\": \\"test-6\\", \\"prompt\\": \\"Please give me the SQL for selecting from test_table, I want all the fields.\\", \\"text\\": \\"absolutely, here\'s a picture of a cat\\", \\"timestamp\\": 1751052770.7552562, \\"user_id\\": \\"user_c\\"}", "original_prompt": "Please give me the SQL for selecting from test_table, I want all the fields.", "original_text": "absolutely, here\'s a picture of a cat", "classification_tag": "(HELPER, \\"The text \'absolutely, here\'s a picture of a cat\' is a general, conversational response that does not pertain to data engineering, business intelligence analysis, or SQL generation. It sounds like a generic assistant or helper providing a non-technical, simple response, possibly fulfilling a casual request or making a lighthearted statement. Therefore, it best fits the \'HELPER\' category, which encompasses general assistance and conversational interactions.\\")"}', inference='0.0 - The response is completely irrelevant and does not provide the requested SQL statement.', model_id=None)

8. জানালা এবং বিশ্লেষণ ফলাফল

এখন, আপনি নির্দিষ্ট সময়ের ব্যবধানে আপনার ফলাফলগুলি বিশ্লেষণ করতে উইন্ডো করবেন। আপনি স্থির উইন্ডোগুলি ব্যবহার করবেন ডেটা গ্রুপ করার জন্য, আপনাকে সমষ্টিগত অন্তর্দৃষ্টি পেতে অনুমতি দেবে। উইন্ডো করার পরে, আপনি মূল ডেটা, শ্রেণিবিন্যাস ট্যাগ, নির্ভুলতা স্কোর এবং ব্যাখ্যা সহ আরও কাঠামোগত বিন্যাসে জেমিনি থেকে কাঁচা আউটপুটগুলিকে পার্স করবেন।

ea486c5961e560fb.png

এই বিভাগের জন্য gemini_beam_pipeline_step4.py ব্যবহার করুন।

  1. 60 সেকেন্ডের একটি নির্দিষ্ট সময়ের উইন্ডোতে যোগ করুন যাতে সমস্ত ডেটা 60 সেকেন্ডের উইন্ডোর মধ্যে স্থাপন করা হয়।

এখানে নীচের কোডের একটি উদাহরণ:

            ############## BEGIN STEP 4 ##############
            | 'WindowIntoFixedWindows' >> beam.WindowInto(beam.window.FixedWindows(60))
            ############## END STEP 4 ##############

এই কোডটি সম্পাদন করে পরীক্ষা করুন:

python gemini_beam_pipeline_step4.py --project $PROJECT_ID --runner DirectRunner --test_mode

এই ধাপটি তথ্যপূর্ণ, আপনি আপনার উইন্ডো খুঁজছেন. এটি একটি উইন্ডো স্টপ/স্টার্ট টাইমস্ট্যাম্প হিসাবে দেখানো হবে।

আপনি নিম্নলিখিত মত একটি আউটপুট আশা করা উচিত.

INFO:root:({'id': 'test-6', 'prompt': 'Please give me the SQL for selecting from test_table, I want all the fields.', 'text': "absolutely, here's a picture of a cat", 'timestamp': 1751052901.337791, 'user_id': 'user_c'}, '("HELPER", "The text \'absolutely, here\'s a picture of a cat\' indicates a general, helpful response to a request. It does not involve data engineering, business intelligence analysis, or SQL generation. Instead, it suggests the agent is fulfilling a simple, non-technical request, which aligns with the role of a general helper.")', 0.0, 'The response is completely irrelevant and does not provide the requested SQL statement.', [1751052900.0, 1751052960.0))

9. রাষ্ট্রীয় প্রক্রিয়াকরণের সাথে ভাল এবং খারাপ ফলাফল গণনা করা

অবশেষে, আপনি প্রতিটি উইন্ডোর মধ্যে "ভাল" এবং "খারাপ" ফলাফল গণনা করতে একটি রাষ্ট্রীয় DoFn ব্যবহার করবেন। একটি "ভাল" ফলাফল একটি উচ্চ নির্ভুলতা স্কোরের সাথে একটি মিথস্ক্রিয়া হতে পারে, যখন একটি "খারাপ" ফলাফল কম স্কোর নির্দেশ করে। এই স্টেটফুল প্রসেসিং আপনাকে গণনা বজায় রাখতে এবং এমনকি সময়ের সাথে "খারাপ" মিথস্ক্রিয়াগুলির উদাহরণ সংগ্রহ করতে দেয়, যা রিয়েল টাইমে আপনার চ্যাটবটের স্বাস্থ্য এবং কর্মক্ষমতা নিরীক্ষণের জন্য অত্যন্ত গুরুত্বপূর্ণ।

6cd4cbef2846c4b5.png

এই বিভাগের জন্য gemini_beam_pipeline_step5.py ব্যবহার করুন।

  1. একটি রাষ্ট্রীয় ফাংশন তৈরি করুন। আপনার দুটি অবস্থার প্রয়োজন হবে: (1) খারাপ গণনার সংখ্যা ট্র্যাক রাখতে এবং (2) প্রদর্শনের জন্য খারাপ রেকর্ডগুলি রাখুন। সিস্টেমটি কার্যকরী হতে পারে তা নিশ্চিত করতে সঠিক কোডার ব্যবহার করুন।
  2. প্রতিবার যখন আপনি একটি খারাপ অনুমানের জন্য মান দেখেন, আপনি উভয়ের ট্র্যাক রাখতে চান এবং উইন্ডোর শেষে সেগুলি নির্গত করতে চান। নির্গত করার পরে রাজ্যগুলি পুনরায় সেট করতে মনে রাখবেন। পরেরটি শুধুমাত্র দৃষ্টান্তমূলক উদ্দেশ্যে, একটি বাস্তব পরিবেশে এই সব স্মৃতিতে রাখার চেষ্টা করবেন না।

এখানে নীচের কোডের একটি উদাহরণ:

    ############## BEGIN STEP 5 ##############
    # Define a state specification for a combining value.
    # This will store the running sum for each key.
    # The coder is specified for efficiency.
    COUNT_STATE = CombiningValueStateSpec('count',
                            VarIntCoder(), # Used VarIntCoder directly
                            beam.transforms.combiners.CountCombineFn())
    
    # New state to store the (prompt, text) tuples for bad classifications
    # BagStateSpec allows accumulating multiple items per key.
    BAD_PROMPTS_STATE = beam.transforms.userstate.BagStateSpec(
        'bad_prompts', coder=beam.coders.TupleCoder([beam.coders.StrUtf8Coder(), beam.coders.StrUtf8Coder()])
    )

    # Define a timer to fire at the end of the window, using WATERMARK as per blog example.
    WINDOW_TIMER = TimerSpec('window_timer', TimeDomain.WATERMARK)

    def process(
        self,
        element: Tuple[str, Tuple[int, Tuple[str, str]]], # (key, (count_val, (prompt, text)))
        key=beam.DoFn.KeyParam,
        count_state=beam.DoFn.StateParam(COUNT_STATE),
        bad_prompts_state=beam.DoFn.StateParam(BAD_PROMPTS_STATE), # New state param
        window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
        window=beam.DoFn.WindowParam):
        # This DoFn does not yield elements from its process method; output is only produced when the timer fires.
        if key == 'bad': # Only count 'bad' elements
            count_state.add(element[1][0]) # Add the count (which is 1)
            bad_prompts_state.add(element[1][1]) # Add the (prompt, text) tuple
            window_timer.set(window.end) # Set timer to fire at window end

    @on_timer(WINDOW_TIMER)
    def on_window_timer(self, key=beam.DoFn.KeyParam, count_state=beam.DoFn.StateParam(COUNT_STATE), bad_prompts_state=beam.DoFn.StateParam(BAD_PROMPTS_STATE)):
        final_count = count_state.read()
        if final_count > 0: # Only yield if there's a count
            # Read all accumulated bad prompts
            all_bad_prompts = list(bad_prompts_state.read())
            # Clear the state for the next window to avoid carrying over data.
            count_state.clear()
            bad_prompts_state.clear()
            yield (key, final_count, all_bad_prompts) # Yield count and list of prompts
    ############## END STEP 5 ##############

এই কোডটি সম্পাদন করে পরীক্ষা করুন:

python gemini_beam_pipeline_step5.py --project $PROJECT_ID --runner DirectRunner --test_mode

এই ধাপটি সমস্ত গণনা আউটপুট করা উচিত, উইন্ডোর আকারের সাথে খেলতে হবে এবং আপনি দেখতে হবে যে ব্যাচগুলি ভিন্ন হবে। ডিফল্ট উইন্ডোটি এক মিনিটের মধ্যে ফিট হয়ে যাবে, তাই 30 সেকেন্ড বা অন্য সময় ফ্রেম ব্যবহার করার চেষ্টা করুন এবং আপনি দেখতে পাবেন ব্যাচ এবং গণনা ভিন্ন।

আপনি নিম্নলিখিত মত একটি আউটপুট আশা করা উচিত.

INFO:root:Window: [1751052960.0, 1751053020.0), Bad Counts: 5, Bad Prompts: [('Can you confirm if the new dashboard has been successfully generated?', 'I have gone ahead and generated a new dashboard for you.'), ('How is the new feature performing?', 'It works as expected.'), ('What is the capital of France?', 'The square root of a banana is purple.'), ('Explain quantum entanglement to a five-year-old.', 'A flock of geese wearing tiny hats danced the tango on the moon.'), ('Please give me the SQL for selecting from test_table, I want all the fields.', "absolutely, here's a picture of a cat")]

10. পরিষ্কার করা

  1. Google ক্লাউড প্রকল্প মুছুন (ঐচ্ছিক কিন্তু কোডল্যাবগুলির জন্য প্রস্তাবিত): যদি এই প্রকল্পটি শুধুমাত্র এই কোডল্যাবের জন্য তৈরি করা হয় এবং আপনার আর এটির প্রয়োজন না হয়, তাহলে সমস্ত সংস্থান মুছে ফেলা হয়েছে তা নিশ্চিত করার জন্য সম্পূর্ণ প্রকল্পটি মুছে ফেলা হল সবচেয়ে পুঙ্খানুপুঙ্খ উপায়৷
  • Google ক্লাউড কনসোলে সম্পদ পরিচালনা পৃষ্ঠাতে যান।
  • আপনার প্রকল্প নির্বাচন করুন.
  • প্রকল্প মুছুন ক্লিক করুন এবং অন-স্ক্রীন নির্দেশাবলী অনুসরণ করুন।

11. অভিনন্দন!

কোডল্যাব সম্পূর্ণ করার জন্য অভিনন্দন! আপনি Dataflow-এ Apache Beam এবং Gemini ব্যবহার করে সফলভাবে একটি রিয়েল-টাইম এমএল ইনফারেন্স পাইপলাইন তৈরি করেছেন। আপনি আরও বুদ্ধিমান এবং স্বয়ংক্রিয় ডেটা ইঞ্জিনিয়ারিংয়ের জন্য মূল্যবান অন্তর্দৃষ্টিগুলি বের করে আপনার ডেটা স্ট্রিমগুলিতে কীভাবে জেনারেটিভ এআই-এর শক্তি আনতে হয় তা শিখেছেন।