gRPC-Rust দিয়ে শুরু করা - স্ট্রিমিং

1. ভূমিকা

এই কোডল্যাবে, আপনি একটি ক্লায়েন্ট এবং সার্ভার তৈরি করতে gRPC-Rust ব্যবহার করবেন যা Rust-এ লেখা একটি রুট-ম্যাপিং অ্যাপ্লিকেশনের ভিত্তি তৈরি করে।

টিউটোরিয়ালের শেষে, আপনার কাছে একটি ক্লায়েন্ট থাকবে যেটি একটি দূরবর্তী সার্ভারের সাথে সংযোগ করে একটি ক্লায়েন্টের রুটের বৈশিষ্ট্য সম্পর্কে তথ্য পেতে, একটি ক্লায়েন্টের রুটের একটি সারাংশ তৈরি করতে এবং সার্ভার এবং অন্যান্য ক্লায়েন্টদের সাথে ট্রাফিক আপডেটের মতো রুটের তথ্য বিনিময় করতে।

পরিষেবাটি একটি প্রোটোকল বাফার ফাইলে সংজ্ঞায়িত করা হয়েছে, যা ক্লায়েন্ট এবং সার্ভারের জন্য বয়লারপ্লেট কোড তৈরি করতে ব্যবহার করা হবে যাতে তারা একে অপরের সাথে যোগাযোগ করতে পারে, সেই কার্যকারিতা বাস্তবায়নে আপনার সময় এবং প্রচেষ্টা বাঁচাতে পারে।

এই তৈরি করা কোডটি সার্ভার এবং ক্লায়েন্টের মধ্যে যোগাযোগের জটিলতাই নয়, ডেটা সিরিয়ালাইজেশন এবং ডিসিরিয়ালাইজেশনেরও যত্ন নেয়।

আপনি কি শিখবেন

  • একটি পরিষেবা API সংজ্ঞায়িত করতে প্রোটোকল বাফারগুলি কীভাবে ব্যবহার করবেন।
  • স্বয়ংক্রিয় কোড জেনারেশন ব্যবহার করে প্রোটোকল বাফার সংজ্ঞা থেকে কীভাবে একটি জিআরপিসি-ভিত্তিক ক্লায়েন্ট এবং সার্ভার তৈরি করবেন।
  • gRPC-এর সাথে ক্লায়েন্ট-সার্ভার স্ট্রিমিং যোগাযোগের একটি বোঝাপড়া।

এই কোডল্যাবটি জিআরপিসি-তে নতুন বা জিআরপিসি-র রিফ্রেশার বা বিতরণ করা সিস্টেম তৈরিতে আগ্রহী অন্য যেকেউ মরিচা বিকাশকারীদের লক্ষ্য করে। কোন পূর্ব gRPC অভিজ্ঞতা প্রয়োজন নেই.

2. আপনি শুরু করার আগে

পূর্বশর্ত

নিশ্চিত করুন যে আপনি নিম্নলিখিত ইনস্টল করেছেন:

  • জিসিসি। এখানে নির্দেশাবলী অনুসরণ করুন
  • মরিচা , সর্বশেষ সংস্করণ। এখানে ইনস্টলেশন নির্দেশাবলী অনুসরণ করুন.

কোড পান

যাতে আপনাকে সম্পূর্ণরূপে স্ক্র্যাচ থেকে শুরু করতে না হয়, এই কোডল্যাবটি আপনাকে সম্পূর্ণ করার জন্য অ্যাপ্লিকেশনের সোর্স কোডের একটি স্ক্যাফোল্ড প্রদান করে। বয়লারপ্লেট জিআরপিসি কোড জেনারেট করতে প্রোটোকল বাফার কম্পাইলার প্লাগইনগুলি ব্যবহার করা সহ, নিম্নলিখিত পদক্ষেপগুলি আপনাকে কীভাবে অ্যাপ্লিকেশনটি শেষ করতে হবে তা দেখাবে।

প্রথমে, কোডল্যাব ওয়ার্কিং ডিরেক্টরি তৈরি করুন এবং এটিতে cd :

mkdir streaming-grpc-rust-getting-started && cd streaming-grpc-rust-getting-started

কোডল্যাব ডাউনলোড এবং এক্সট্রাক্ট করুন:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-rust-streaming/start_here

বিকল্পভাবে, আপনি শুধুমাত্র কোডল্যাব ডিরেক্টরি ধারণকারী .zip ফাইলটি ডাউনলোড করতে পারেন এবং ম্যানুয়ালি আনজিপ করতে পারেন।

আপনি যদি বাস্তবায়নে টাইপ করা এড়িয়ে যেতে চান তবে সম্পূর্ণ সোর্স কোডটি GitHub-এ উপলব্ধ

3. বার্তা এবং পরিষেবা সংজ্ঞায়িত করুন

আপনার প্রথম পদক্ষেপ হল প্রোটোকল বাফার ব্যবহার করে অ্যাপ্লিকেশনটির gRPC পরিষেবা, এর RPC পদ্ধতি এবং এর অনুরোধ এবং প্রতিক্রিয়া বার্তার ধরনগুলি সংজ্ঞায়িত করা। আপনার পরিষেবা প্রদান করবে:

  • RPC পদ্ধতিগুলিকে ListFeatures , RecordRoute , এবং RouteChat বলা হয় যা সার্ভার প্রয়োগ করে এবং ক্লায়েন্ট কল করে।
  • বার্তার ধরন হল Point , Feature , Rectangle , RouteNote এবং RouteSummary , যা উপরের পদ্ধতিগুলি কল করার সময় ক্লায়েন্ট এবং সার্ভারের মধ্যে আদান-প্রদান করা হয়।

এই RPC পদ্ধতি এবং এর বার্তার ধরন সবই প্রদত্ত সোর্স কোডের proto/routeguide.proto ফাইলে সংজ্ঞায়িত করা হবে।

প্রোটোকল বাফারগুলি সাধারণত প্রোটোবাফ হিসাবে পরিচিত। gRPC পরিভাষা সম্পর্কে আরও তথ্যের জন্য, gRPC-এর মূল ধারণা, স্থাপত্য, এবং জীবনচক্র দেখুন।

বার্তার ধরন নির্ধারণ করুন

আসুন প্রথমে আমাদের বার্তাগুলিকে সংজ্ঞায়িত করি যা আমাদের RPC দ্বারা ব্যবহৃত হবে। সোর্স কোডের routeguide/route_guide.proto ফাইলে, প্রথমে Point বার্তার ধরনটি সংজ্ঞায়িত করুন। একটি Point একটি মানচিত্রে একটি অক্ষাংশ-দ্রাঘিমাংশ স্থানাঙ্ক জুটিকে প্রতিনিধিত্ব করে। এই কোডল্যাবের জন্য, স্থানাঙ্কের জন্য পূর্ণসংখ্যা ব্যবহার করুন:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

1 এবং 2 নম্বরগুলি message কাঠামোর প্রতিটি ক্ষেত্রের জন্য অনন্য আইডি নম্বর।

এর পরে, Feature বার্তার ধরন সংজ্ঞায়িত করুন। একটি Feature একটি Point দ্বারা নির্দিষ্ট একটি অবস্থানে কোনো কিছুর নাম বা ডাক ঠিকানার জন্য একটি string ক্ষেত্র ব্যবহার করে:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

এর পরে একটি Rectangle বার্তা যা একটি অক্ষাংশ-দ্রাঘিমাংশের আয়তক্ষেত্রকে প্রতিনিধিত্ব করে, দুটি তির্যক বিপরীত বিন্দু "lo" এবং "hi" হিসাবে উপস্থাপিত হয়।

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

এছাড়াও একটি RouteNote বার্তা যা একটি নির্দিষ্ট বিন্দুতে প্রেরিত একটি বার্তা প্রতিনিধিত্ব করে।

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

আমরা একটি RouteSummary বার্তা প্রয়োজন হবে. এই বার্তাটি একটি RecordRoute RPC এর প্রতিক্রিয়া হিসাবে গৃহীত হয়েছে যা পরবর্তী বিভাগে ব্যাখ্যা করা হয়েছে। এতে প্রাপ্ত পৃথক পয়েন্টের সংখ্যা, সনাক্ত করা বৈশিষ্ট্যের সংখ্যা এবং প্রতিটি বিন্দুর মধ্যে দূরত্বের ক্রমবর্ধমান সমষ্টি হিসাবে আচ্ছাদিত মোট দূরত্ব রয়েছে।

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

সেবা পদ্ধতি সংজ্ঞায়িত করুন

আসুন প্রথমে আমাদের পরিষেবাটি সংজ্ঞায়িত করি এবং তারপরে আমাদের বার্তাগুলিকে সংজ্ঞায়িত করি৷ একটি পরিষেবা সংজ্ঞায়িত করতে, আপনি আপনার .proto ফাইলে একটি নামযুক্ত পরিষেবা নির্দিষ্ট করুন৷ proto/routeguide.proto ফাইলটিতে RouteGuide নামে একটি service কাঠামো রয়েছে যা অ্যাপ্লিকেশনটির পরিষেবা দ্বারা প্রদত্ত এক বা একাধিক পদ্ধতিকে সংজ্ঞায়িত করে৷

আপনার পরিষেবার সংজ্ঞার মধ্যে RPC পদ্ধতিগুলি সংজ্ঞায়িত করুন, তাদের অনুরোধ এবং প্রতিক্রিয়ার ধরনগুলি নির্দিষ্ট করুন৷ কোডল্যাবের এই বিভাগে, আসুন সংজ্ঞায়িত করা যাক:

তালিকা বৈশিষ্ট্য

প্রদত্ত Rectangle মধ্যে উপলব্ধ Feature প্রাপ্ত করে৷ ফলাফলগুলি একবারে প্রত্যাবর্তনের পরিবর্তে স্ট্রিম করা হয় (যেমন একটি পুনরাবৃত্তি ক্ষেত্র সহ একটি প্রতিক্রিয়া বার্তায়), কারণ আয়তক্ষেত্রটি একটি বৃহৎ এলাকা কভার করতে পারে এবং এতে বিপুল সংখ্যক বৈশিষ্ট্য থাকতে পারে।

এই RPC-এর জন্য একটি উপযুক্ত প্রকার হল সার্ভার-সাইড স্ট্রিমিং RPC: ক্লায়েন্ট সার্ভারে একটি অনুরোধ পাঠায় এবং বার্তাগুলির একটি ক্রম পড়ার জন্য একটি স্ট্রিম পায়। ক্লায়েন্ট প্রত্যাবর্তিত স্ট্রীম থেকে পাঠ করে যতক্ষণ না আর কোনো বার্তা নেই। আপনি যেমন আমাদের উদাহরণে দেখতে পাচ্ছেন, আপনি প্রতিক্রিয়া প্রকারের আগে stream কীওয়ার্ড স্থাপন করে একটি সার্ভার-সাইড স্ট্রিমিং পদ্ধতি নির্দিষ্ট করেন।

rpc ListFeatures(Rectangle) returns (stream Feature) {}

রেকর্ডরুট

ট্র্যাভার্সাল করা রুটে Point s-এর একটি স্ট্রীম গ্রহণ করে, ট্র্যাভার্সাল সম্পূর্ণ হলে একটি RouteSummary ফেরত দেয়।

একটি ক্লায়েন্ট-সাইড স্ট্রিমিং RPC এই ক্ষেত্রে উপযুক্ত বলে মনে হয়: ক্লায়েন্ট বার্তাগুলির একটি ক্রম লিখে এবং সার্ভারে পাঠায়, আবার একটি প্রদত্ত স্ট্রিম ব্যবহার করে। একবার ক্লায়েন্ট বার্তাগুলি লেখা শেষ করলে, এটি সার্ভারের জন্য সেগুলি পড়ার জন্য এবং তার প্রতিক্রিয়া ফেরত দেওয়ার জন্য অপেক্ষা করে। আপনি অনুরোধের প্রকারের আগে stream কীওয়ার্ড স্থাপন করে একটি ক্লায়েন্ট-সাইড স্ট্রিমিং পদ্ধতি নির্দিষ্ট করুন।

rpc RecordRoute(stream Point) returns (RouteSummary) {}

রুটচ্যাট

একটি রুট অতিক্রম করার সময় অন্যান্য RouteNote গুলি গ্রহণ করার সময় (যেমন অন্যান্য ব্যবহারকারীদের কাছ থেকে) প্রেরিত RouteNote এর একটি স্ট্রীম গ্রহণ করে৷

দ্বিমুখী স্ট্রিমিংয়ের ক্ষেত্রে এটি ঠিক একই ধরনের ব্যবহারের ক্ষেত্রে। একটি দ্বিমুখী স্ট্রিমিং RPC-তে উভয় পক্ষই একটি পাঠ-লেখা স্ট্রিম ব্যবহার করে বার্তাগুলির একটি ক্রম প্রেরণ করে। দুটি স্ট্রীম স্বাধীনভাবে কাজ করে, তাই ক্লায়েন্ট এবং সার্ভাররা তাদের পছন্দ অনুযায়ী পড়তে এবং লিখতে পারে।

উদাহরণস্বরূপ, সার্ভার তার প্রতিক্রিয়া লেখার আগে সমস্ত ক্লায়েন্ট বার্তা পাওয়ার জন্য অপেক্ষা করতে পারে, বা এটি বিকল্পভাবে একটি বার্তা পড়তে পারে তারপর একটি বার্তা লিখতে পারে, বা পঠিত এবং লেখার অন্য কিছু সংমিশ্রণ করতে পারে।

প্রতিটি প্রবাহে বার্তার ক্রম সংরক্ষিত হয়। আপনি অনুরোধ এবং প্রতিক্রিয়া উভয়ের আগে stream কীওয়ার্ড স্থাপন করে এই ধরনের পদ্ধতি নির্দিষ্ট করুন।

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. ক্লায়েন্ট এবং সার্ভার কোড তৈরি করুন

আমরা আপনাকে ইতিমধ্যেই জেনারেটেড ডিরেক্টরিতে .proto ফাইল থেকে জেনারেট করা কোড দিয়েছি।

আপনি যদি নিজে নিজে .proto ফাইল থেকে কোড জেনারেট করতে চান বা .proto ফাইলে কোনো পরিবর্তন করতে চান এবং সেগুলো পরীক্ষা করতে চান তাহলে এই নির্দেশাবলী পড়ুন।

উত্পন্ন কোড রয়েছে:

  • বার্তার ধরন Point , Feature , Rectangle , RouteNote এবং RouteSummary জন্য কাঠামোগত সংজ্ঞা।
  • একটি পরিষেবা বৈশিষ্ট্য যা আমাদের বাস্তবায়ন করতে হবে: route_guide_server::RouteGuide
  • একটি ক্লায়েন্ট প্রকার যা আমরা সার্ভারকে কল করতে ব্যবহার করব: route_guide_client::RouteGuideClient<T>

এর পরে, আমরা সার্ভার-সাইডে পদ্ধতিগুলি প্রয়োগ করব, যাতে ক্লায়েন্ট যখন একটি অনুরোধ পাঠায়, সার্ভার একটি উত্তর দিয়ে উত্তর দিতে পারে।

5. পরিষেবাটি বাস্তবায়ন করুন

প্রথমে দেখা যাক কিভাবে আমরা একটি RouteGuide সার্ভার তৈরি করি। আমাদের RouteGuide পরিষেবাটি তার কাজ করার জন্য দুটি অংশ রয়েছে:

  • আমাদের পরিষেবার সংজ্ঞা থেকে উত্পন্ন পরিষেবা ইন্টারফেস বাস্তবায়ন করা: আমাদের পরিষেবার প্রকৃত "কাজ" করা।
  • ক্লায়েন্টদের কাছ থেকে অনুরোধ শোনার জন্য একটি gRPC সার্ভার চালানো এবং তাদের সঠিক পদ্ধতি বাস্তবায়নে প্রেরণ করা।

src/server/server.rs include_generated_proto! ম্যাক্রো এবং RouteGuide বৈশিষ্ট্য এবং Point আমদানি করুন।

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

pub use grpc_pb::{
    route_guide_server::{RouteGuideServer, RouteGuide},
    Point, Feature, Rectangle, RouteNote, RouteSummary
};

আমরা আমাদের পরিষেবার প্রতিনিধিত্ব করার জন্য একটি কাঠামো সংজ্ঞায়িত করে শুরু করতে পারি। আমরা আপাতত src/server/server.rs এ এটি করতে পারি:

#[derive(Debug)]
pub struct RouteGuideService {
    features: Vec<Feature>,
}

এখন, আমাদের তৈরি করা কোড থেকে route_guide_server::RouteGuide বৈশিষ্ট্যটি বাস্তবায়ন করতে হবে।

RouteGuide বাস্তবায়ন করুন

আমাদের তৈরি করা RouteGuide ইন্টারফেস বাস্তবায়ন করতে হবে। এইভাবে বাস্তবায়ন দেখতে হবে. এটি ইতিমধ্যেই টেমপ্লেটে রয়েছে৷

#[tonic::async_trait]
impl RouteGuide for RouteGuideService {
    async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        ...
    }

    async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        ...
    }

    async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        ...
    }
}

আসুন আমরা প্রতিটি RPC বাস্তবায়ন বিস্তারিতভাবে দেখি।

সার্ভার-সাইড স্ট্রিমিং RPC

চলুন শুরু করা যাক ListFeatures দিয়ে। এটি একটি সার্ভার-সাইড স্ট্রিমিং RPC, তাই আমাদের ক্লায়েন্টকে একাধিক Feature ফেরত পাঠাতে হবে।

async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        println!("ListFeatures = {:?}", request);

        let (tx, rx) = mpsc::channel(4);
        let features = self.features.clone();

        tokio::spawn(async move {
            for feature in &features[..] {
                if in_range(&feature.location().to_owned(), request.get_ref()) {
                    println!("  => send {feature:?}");
                    tx.send(Ok(feature.clone())).await.unwrap();
                }
            }
            println!(" /// done sending");
        });

        let output_stream = ReceiverStream::new(rx);
        Ok(Response::new(Box::pin(output_stream)))
    }

আপনি দেখতে পাচ্ছেন, আমরা একটি অনুরোধ বস্তু পাই ( Rectangle যেখানে আমাদের ক্লায়েন্ট Features খুঁজে পেতে চায়)। এই সময়, আমরা মান একটি প্রবাহ ফিরে প্রয়োজন. আমরা একটি চ্যানেল তৈরি করি এবং একটি নতুন অ্যাসিঙ্ক্রোনাস টাস্ক তৈরি করি যেখানে আমরা একটি সন্ধান করি, চ্যানেলে আমাদের সীমাবদ্ধতাগুলিকে সন্তুষ্ট করে এমন বৈশিষ্ট্যগুলি প্রেরণ করি। চ্যানেলের স্ট্রিম অর্ধেক কলারকে ফিরিয়ে দেওয়া হয়, একটি tonic::Response মোড়ানো।

ক্লায়েন্ট-সাইড স্ট্রিমিং RPC

এখন আসুন একটু জটিল কিছু দেখি: ক্লায়েন্ট-সাইড স্ট্রিমিং পদ্ধতি RecordRoute , যেখানে আমরা ক্লায়েন্টের কাছ থেকে Points একটি স্ট্রীম পাই এবং তাদের ট্রিপ সম্পর্কে তথ্য সহ একটি একক RouteSummary ফেরত দেই। এটি একটি ইনপুট হিসাবে একটি স্ট্রীম পায়, যা সার্ভার বার্তা পড়তে এবং লিখতে উভয়ই ব্যবহার করতে পারে। এটি তার next() পদ্ধতি ব্যবহার করে ক্লায়েন্ট বার্তাগুলির মাধ্যমে পুনরাবৃত্তি করতে পারে এবং তার একক প্রতিক্রিয়া ফিরিয়ে দিতে পারে।

async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        println!("RecordRoute");
        let mut stream = request.into_inner();
        let mut summary = RouteSummary::default();
        let mut last_point = None;
        let now = Instant::now();

        while let Some(point) = stream.next().await {
            let point = point?;
            println!("  ==> Point = {point:?}");

            // Increment the point count
            summary.set_point_count(summary.point_count() + 1);

            // Find features
            for feature in &self.features[..] {
                if feature.location().latitude() == point.latitude() {
                    if feature.location().longitude() == point.longitude(){
                        summary.set_feature_count(summary.feature_count() + 1);
                    }
                }
            }

            // Calculate the distance
            if let Some(ref last_point) = last_point {
                let new_dist = summary.distance() + calc_distance(last_point, &point);
                summary.set_distance(new_dist);
            }
            last_point = Some(point);
        }
        summary.set_elapsed_time(now.elapsed().as_secs() as i32);
        Ok(Response::new(summary))
    }

মেথড বডিতে, আমরা আমাদের ক্লায়েন্টের রিকোয়েস্ট অবজেক্টে বারবার পড়ার জন্য স্ট্রীমের next() মেথড ব্যবহার করি (এই ক্ষেত্রে একটি Point ) যতক্ষণ না আর কোনো বার্তা না আসে। যদি এটি কিছুই না হয়, স্ট্রীম এখনও ভাল এবং এটি পড়া চালিয়ে যেতে পারে।

দ্বিমুখী স্ট্রিমিং RPC

অবশেষে, আসুন আমাদের দ্বিমুখী স্ট্রিমিং RPC RouteChat() দেখুন।

async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        println!("RouteChat");

        let mut notes: HashMap<(i32, i32), Vec<RouteNote>> = HashMap::new();
        let mut stream = request.into_inner();

        let output = async_stream::try_stream! {
            while let Some(note) = stream.next().await {
                let note = note?;
                let location = note.location();
                let key = (location.latitude(), location.longitude());
                let location_notes = notes.entry(key).or_insert(vec![]);
                location_notes.push(note);
                for note in location_notes {
                    yield note.clone();
                }
            }
        };
        Ok(Response::new(Box::pin(output)))
    }

এইবার আমরা একটি স্ট্রীম পেয়েছি যা আমাদের ক্লায়েন্ট-সাইড স্ট্রিমিং উদাহরণের মতো, বার্তাগুলি পড়তে এবং লিখতে ব্যবহার করা যেতে পারে। যাইহোক, এই সময় আমরা আমাদের পদ্ধতির স্ট্রীমের মাধ্যমে মান ফিরিয়ে দিই যখন ক্লায়েন্ট এখনও তাদের বার্তা স্ট্রীমে বার্তা লিখছে। এখানে পড়া এবং লেখার জন্য সিনট্যাক্স আমাদের ক্লায়েন্ট-স্ট্রিমিং পদ্ধতির সাথে খুব মিল, সার্ভারটি একটি RouteChatStream প্রদান করে। যদিও প্রতিটি পক্ষ সর্বদা অন্যের বার্তাগুলি সেগুলি যে ক্রমে লেখা হয়েছিল সেই ক্রমে পাবে, ক্লায়েন্ট এবং সার্ভার উভয়ই যে কোনও ক্রমে পড়তে এবং লিখতে পারে — স্ট্রিমগুলি সম্পূর্ণ স্বাধীনভাবে কাজ করে৷

আমরা try_stream! ব্যবহার করে output স্ট্রীম তৈরি করি! , যা নির্দেশ করে যে স্ট্রীম ত্রুটিগুলি ফেরত দিতে পারে৷

সার্ভার শুরু করুন

একবার আমরা এই পদ্ধতিটি প্রয়োগ করার পরে, আমাদের একটি gRPC সার্ভার শুরু করতে হবে যাতে ক্লায়েন্টরা আসলে আমাদের পরিষেবা ব্যবহার করতে পারে। main() পূরণ করুন।

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:10000".parse().unwrap();
    println!("RouteGuideServer listening on: {addr}");
    let route_guide = RouteGuideService {
        features: load(),
    };
    let svc = RouteGuideServer::new(route_guide);
    Server::builder().add_service(svc).serve(addr).await?;
    Ok(())
}

এখানে main() তে কি ঘটছে, ধাপে ধাপে:

  1. ক্লায়েন্টের অনুরোধ শোনার জন্য আমরা যে পোর্ট ব্যবহার করতে চাই তা নির্দিষ্ট করুন
  2. লোড করা বৈশিষ্ট্য সহ একটি RouteGuideService তৈরি করুন৷
  3. আমাদের তৈরি পরিষেবা ব্যবহার করে RouteGuideServer::new() ব্যবহার করে gRPC সার্ভারের একটি উদাহরণ তৈরি করুন।
  4. gRPC সার্ভারের সাথে আমাদের পরিষেবা বাস্তবায়ন নিবন্ধন করুন।
  5. আমাদের পোর্টের বিবরণ সহ serve() কল করুন

6. ক্লায়েন্ট তৈরি করুন

এই বিভাগে, আমরা src/client/client.rs এ আমাদের RouteGuide পরিষেবার জন্য একটি মরিচা ক্লায়েন্ট তৈরি করার বিষয়ে দেখব।

প্রথমত, জেনারেট করা কোডটিকে স্কোপের মধ্যে আনুন।

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

use grpc_pb::route_guide_client::RouteGuideClient;
use grpc_pb::{Point, Rectangle, RouteNote};

কল পরিষেবা পদ্ধতি

এখন আসুন আমরা আমাদের পরিষেবা পদ্ধতিগুলিকে কীভাবে বলি তা দেখি। gRPC-Rust-এ, RPCগুলি একটি ব্লকিং/সিঙ্ক্রোনাস মোডে কাজ করে, যার অর্থ হল RPC কলটি সার্ভারের সাড়া দেওয়ার জন্য অপেক্ষা করে এবং হয় একটি প্রতিক্রিয়া বা ত্রুটি ফিরিয়ে দেবে।

সার্ভার-সাইড স্ট্রিমিং RPC

এখানে আমরা সার্ভার-সাইড স্ট্রিমিং পদ্ধতিকে বলি ListFeatures , যা ভৌগলিক Feature বস্তুর একটি স্ট্রিম প্রদান করে।

async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let rectangle = proto!(Rectangle {
        lo: proto!(Point {
            latitude: 400_000_000,
            longitude: -750_000_000,
        }),
        hi: proto!(Point {
            latitude: 420_000_000,
            longitude: -730_000_000,
        }),
    });

    let mut stream = client
        .list_features(Request::new(rectangle))
        .await?
        .into_inner();

    while let Some(feature) = stream.message().await? {
        println!("FEATURE: Name = \"{}\", Lat = {}, Lon = {}",
            feature.name(),
            feature.location().latitude(),
            feature.location().longitude());
        }
    Ok(())
}

আমরা পদ্ধতিটি একটি অনুরোধ পাস করি এবং ListFeaturesStream এর একটি উদাহরণ ফিরে পাই। ক্লায়েন্ট সার্ভারের প্রতিক্রিয়া পড়তে ListFeaturesStream স্ট্রিম ব্যবহার করতে পারে। আমরা ListFeaturesStream এর message() পদ্ধতিটি ব্যবহার করি সার্ভারের প্রতিক্রিয়া প্রোটোকল বাফার অবজেক্টে বারবার পড়ার জন্য (এই ক্ষেত্রে একটি Feature ) যতক্ষণ না আর কোনও বার্তা না আসে।

ক্লায়েন্ট-সাইড স্ট্রিমিং RPC

এখানে record_route এর জন্য, আমরা পয়েন্টের একটি ভেক্টরকে স্ট্রীমে পরিণত করি। তারপরে আমরা অনুরোধ হিসাবে এই স্ট্রীমটিকে record_route() এ পাস করি এবং সার্ভার দ্বারা স্ট্রীমটি সম্পূর্ণরূপে প্রক্রিয়া করার পরে একটি একক RouteSummary প্রতিক্রিয়া পাই৷

async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let mut rng = rand::rng();
    let point_count: i32 = rng.random_range(2..100);

    let mut points = vec![];
    for _ in 0..=point_count {
        points.push(random_point(&mut rng))
    }

    println!("Traversing {} points", points.len());
    let request = Request::new(tokio_stream::iter(points));

    match client.record_route(request).await {
        Ok(response) => {
            let response = response.into_inner();
            println!("SUMMARY: Feature Count = {}, Distance = {}", response.feature_count(), response.distance())},
        Err(e) => println!("something went wrong: {e:?}"),
    }

    Ok(())
}

দ্বিমুখী স্ট্রিমিং RPC

অবশেষে, আসুন আমাদের দ্বিমুখী স্ট্রিমিং RPC RouteChat() দেখুন। আমরা পদ্ধতিটি একটি স্ট্রিম অনুরোধ পাস করি যা আমরা লিখি এবং একটি স্ট্রীম ফিরে পাই যেখান থেকে আমরা বার্তা পড়তে পারি। এই সময় আমরা আমাদের পদ্ধতির স্ট্রীমের মাধ্যমে মান ফিরিয়ে দিই যখন সার্ভার এখনও তাদের বার্তা স্ট্রীমে বার্তা লিখছে।

async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let start = time::Instant::now();
    let outbound = async_stream::stream! {
        let mut interval = time::interval(Duration::from_secs(1));
        for _ in 0..10 {
            let time = interval.tick().await;
            let elapsed = time.duration_since(start);
            let note = proto!(RouteNote {
                location: proto!(Point {
                    latitude: 409146138 + elapsed.as_secs() as i32,
                    longitude: -746188906,
                }),
                message: format!("at {elapsed:?}"),
            });
            yield note;
        }
    };
    let response = client.route_chat(Request::new(outbound)).await?;
    let mut inbound = response.into_inner();
    while let Some(note) = inbound.message().await? {
        println!("Note: Latitude = {}, Longitude = {}, Message = \"{}\"",
            note.location().latitude(),
            note.location().longitude(),
            note.message());
        }
    Ok(())
}

যদিও প্রতিটি পক্ষ সর্বদা অন্যের বার্তাগুলি সেগুলি যে ক্রমে লেখা হয়েছিল সেই ক্রমে পাবে, ক্লায়েন্ট এবং সার্ভার উভয়ই যে কোনও ক্রমে পড়তে এবং লিখতে পারে — স্ট্রিমগুলি সম্পূর্ণ স্বাধীনভাবে কাজ করে৷

কল সাহায্যকারী পদ্ধতি

পরিষেবা পদ্ধতি কল করার জন্য, আমাদের প্রথমে সার্ভারের সাথে যোগাযোগের জন্য একটি চ্যানেল তৈরি করতে হবে। আমরা প্রথমে একটি এন্ডপয়েন্ট তৈরি করে, সেই এন্ডপয়েন্টের সাথে সংযোগ স্থাপন করে এবং RouteGuideClient::new() এর সাথে সংযুক্ত থাকাকালীন চ্যানেলটি পাস করে এটি তৈরি করি:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel); 
    Ok(())
}

main() তে, আমরা এইমাত্র তৈরি করা পদ্ধতিগুলি চালান।

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel);

    println!("\n*** SERVER STREAMING ***");
    print_features(&mut client).await?;

    println!("\n*** CLIENT STREAMING ***");
    run_record_route(&mut client).await?;

    println!("\n*** BIDIRECTIONAL STREAMING ***");
    run_route_chat(&mut client).await?;

    Ok(())
}

7. এটা চেষ্টা করে দেখুন

প্রথমে, আমাদের ক্লায়েন্ট এবং সার্ভার চালানোর জন্য, আমাদের ক্রেটে বাইনারি টার্গেট হিসাবে তাদের যোগ করা যাক। সেই অনুযায়ী আমাদের Cargo.toml সম্পাদনা করতে হবে:

[[bin]]
name = "routeguide-server"
path = "src/server/server.rs"

[[bin]]
name = "routeguide-client"
path = "src/client/client.rs"

যেকোনো প্রকল্পের মতো, আমাদের কোডের কাজ করার জন্য প্রয়োজনীয় নির্ভরতাগুলিও আমাদের ভাবতে হবে। মরিচা প্রকল্পগুলির জন্য, নির্ভরতাগুলি হবে Cargo.toml এ। আমরা ইতিমধ্যেই Cargo.toml ফাইলে প্রয়োজনীয় নির্ভরতা তালিকাভুক্ত করেছি।

তারপরে, আমাদের ওয়ার্কিং ডিরেক্টরি থেকে নিম্নলিখিত কমান্ডগুলি চালান:

  1. একটি টার্মিনালে সার্ভার চালান:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-server 
  1. অন্য টার্মিনাল থেকে ক্লায়েন্ট চালান:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-client

আপনি এই মত আউটপুট দেখতে পাবেন:

*** SERVER STREAMING ***
FEATURE: Name = "Patriots Path, Mendham, NJ 07945, USA", Lat = 407838351, Lon = -746143763
FEATURE: Name = "101 New Jersey 10, Whippany, NJ 07981, USA", Lat = 408122808, Lon = -743999179
FEATURE: Name = "U.S. 6, Shohola, PA 18458, USA", Lat = 413628156, Lon = -749015468
...
*** CLIENT STREAMING ***
Traversing 86 points
SUMMARY: Feature Count = 0, Distance = 803709356

*** BIDIRECTIONAL STREAMING ***
Note: Latitude = 409146138, Longitude = -746188906, Message = "at 112.45µs"
Note: Latitude = 409146139, Longitude = -746188906, Message = "at 1.00011245s"
Note: Latitude = 409146140, Longitude = -746188906, Message = "at 2.00011245s"

8. পরবর্তী কি