gRPC-Java - স্ট্রিমিং দিয়ে শুরু করা

1. ভূমিকা

এই কোডল্যাবে, আপনি একটি ক্লায়েন্ট এবং সার্ভার তৈরি করতে gRPC-Java ব্যবহার করবেন যা জাভাতে লেখা একটি রুট-ম্যাপিং অ্যাপ্লিকেশনের ভিত্তি তৈরি করে।

টিউটোরিয়ালের শেষে, আপনার কাছে একটি ক্লায়েন্ট থাকবে যেটি একটি দূরবর্তী সার্ভারের সাথে সংযোগ করে একটি ক্লায়েন্টের রুটের বৈশিষ্ট্য সম্পর্কে তথ্য পেতে, একটি ক্লায়েন্টের রুটের একটি সারাংশ তৈরি করতে এবং সার্ভার এবং অন্যান্য ক্লায়েন্টদের সাথে ট্রাফিক আপডেটের মতো রুটের তথ্য বিনিময় করতে।

পরিষেবাটি একটি প্রোটোকল বাফার ফাইলে সংজ্ঞায়িত করা হয়েছে, যা ক্লায়েন্ট এবং সার্ভারের জন্য বয়লারপ্লেট কোড তৈরি করতে ব্যবহার করা হবে যাতে তারা একে অপরের সাথে যোগাযোগ করতে পারে, সেই কার্যকারিতা বাস্তবায়নে আপনার সময় এবং প্রচেষ্টা বাঁচাতে পারে।

এই তৈরি করা কোডটি সার্ভার এবং ক্লায়েন্টের মধ্যে যোগাযোগের জটিলতাই নয়, ডেটা সিরিয়ালাইজেশন এবং ডিসিরিয়ালাইজেশনেরও যত্ন নেয়।

আপনি কি শিখবেন

  • একটি পরিষেবা API সংজ্ঞায়িত করতে প্রোটোকল বাফারগুলি কীভাবে ব্যবহার করবেন।
  • স্বয়ংক্রিয় কোড জেনারেশন ব্যবহার করে প্রোটোকল বাফার সংজ্ঞা থেকে কীভাবে একটি জিআরপিসি-ভিত্তিক ক্লায়েন্ট এবং সার্ভার তৈরি করবেন।
  • gRPC-এর সাথে ক্লায়েন্ট-সার্ভার স্ট্রিমিং যোগাযোগের একটি বোঝাপড়া।

এই কোডল্যাবটি জিআরপিসি-তে নতুন জাভা ডেভেলপারদের জন্য বা জিআরপিসি-র রিফ্রেশার খোঁজার জন্য, বা বিতরণ করা সিস্টেম তৈরি করতে আগ্রহী অন্য যেকেউ লক্ষ্য করে। কোন পূর্ব gRPC অভিজ্ঞতা প্রয়োজন নেই.

2. আপনি শুরু করার আগে

পূর্বশর্ত

  • JDK সংস্করণ 24।

কোড পান

যাতে আপনাকে সম্পূর্ণরূপে স্ক্র্যাচ থেকে শুরু করতে না হয়, এই কোডল্যাবটি আপনাকে সম্পূর্ণ করার জন্য অ্যাপ্লিকেশনের সোর্স কোডের একটি স্ক্যাফোল্ড প্রদান করে। বয়লারপ্লেট জিআরপিসি কোড জেনারেট করতে প্রোটোকল বাফার কম্পাইলার প্লাগইনগুলি ব্যবহার করা সহ, নিম্নলিখিত পদক্ষেপগুলি আপনাকে কীভাবে অ্যাপ্লিকেশনটি শেষ করতে হবে তা দেখাবে।

প্রথমে, কোডল্যাব ওয়ার্কিং ডিরেক্টরি তৈরি করুন এবং এটিতে সিডি করুন:

mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started

কোডল্যাব ডাউনলোড এবং এক্সট্রাক্ট করুন:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-java-streaming/start_here

বিকল্পভাবে, আপনি শুধুমাত্র কোডল্যাব ডিরেক্টরি ধারণকারী .zip ফাইলটি ডাউনলোড করতে পারেন এবং ম্যানুয়ালি আনজিপ করতে পারেন।

আপনি যদি বাস্তবায়নে টাইপ করা এড়িয়ে যেতে চান তবে সম্পূর্ণ সোর্স কোডটি GitHub-এ উপলব্ধ

3. বার্তা এবং পরিষেবা সংজ্ঞায়িত করুন

আপনার প্রথম ধাপ হল প্রোটোকল বাফার ব্যবহার করে অ্যাপ্লিকেশনটির gRPC পরিষেবা, এর RPC পদ্ধতি এবং এর অনুরোধ এবং প্রতিক্রিয়া বার্তার ধরনগুলিকে সংজ্ঞায়িত করা। আপনার পরিষেবা প্রদান করবে:

  • RPC পদ্ধতিগুলিকে ListFeatures , RecordRoute , এবং RouteChat বলা হয় যা সার্ভার প্রয়োগ করে এবং ক্লায়েন্ট কল করে।
  • বার্তার ধরন হল Point , Feature , Rectangle , RouteNote এবং RouteSummary , যা উপরের পদ্ধতিগুলি কল করার সময় ক্লায়েন্ট এবং সার্ভারের মধ্যে আদান-প্রদান করা হয়।

প্রোটোকল বাফারগুলি সাধারণত প্রোটোবাফ হিসাবে পরিচিত। gRPC পরিভাষা সম্পর্কে আরও তথ্যের জন্য, gRPC-এর মূল ধারণা, স্থাপত্য, এবং জীবনচক্র দেখুন।

এই RPC পদ্ধতি এবং এর বার্তা প্রকারগুলি সমস্ত প্রদত্ত সোর্স কোডের proto/routeguide/route_guide.proto ফাইলে সংজ্ঞায়িত করা হবে৷

আসুন একটি route_guide.proto ফাইল তৈরি করি।

যেহেতু আমরা এই উদাহরণে জাভা কোড তৈরি করছি, আমরা আমাদের .proto এ একটি java_package ফাইল বিকল্প নির্দিষ্ট করেছি:

option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";

বার্তার ধরন সংজ্ঞায়িত করুন

সোর্স কোডের proto/routeguide/route_guide.proto ফাইলে, প্রথমে Point বার্তার ধরনটি সংজ্ঞায়িত করুন। একটি Point একটি মানচিত্রে একটি অক্ষাংশ-দ্রাঘিমাংশ স্থানাঙ্ক জুটিকে প্রতিনিধিত্ব করে। এই কোডল্যাবের জন্য, স্থানাঙ্কের জন্য পূর্ণসংখ্যা ব্যবহার করুন:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

1 এবং 2 নম্বরগুলি message কাঠামোর প্রতিটি ক্ষেত্রের জন্য অনন্য আইডি নম্বর।

এর পরে, Feature বার্তার ধরন সংজ্ঞায়িত করুন। একটি Feature একটি Point দ্বারা নির্দিষ্ট একটি অবস্থানে কোনো কিছুর নাম বা ডাক ঠিকানার জন্য একটি string ক্ষেত্র ব্যবহার করে:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

যাতে একটি এলাকার মধ্যে একাধিক পয়েন্ট একটি ক্লায়েন্টের কাছে স্ট্রিম করা যায়, আপনার একটি Rectangle বার্তার প্রয়োজন হবে যা একটি অক্ষাংশ-দ্রাঘিমাংশের আয়তক্ষেত্রকে প্রতিনিধিত্ব করে, যা দুটি তির্যক বিপরীত বিন্দু lo এবং hi হিসাবে উপস্থাপিত হয়:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

এছাড়াও, একটি RouteNote বার্তা যা প্রদত্ত বিন্দুতে প্রেরিত একটি বার্তা প্রতিনিধিত্ব করে:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

অবশেষে, আপনার একটি RouteSummary বার্তার প্রয়োজন হবে। এই বার্তাটি একটি RecordRoute RPC-এর প্রতিক্রিয়া হিসাবে গৃহীত হয়েছে, যা পরবর্তী বিভাগে ব্যাখ্যা করা হয়েছে। এতে প্রাপ্ত পৃথক পয়েন্টের সংখ্যা, সনাক্ত করা বৈশিষ্ট্যের সংখ্যা এবং প্রতিটি বিন্দুর মধ্যে দূরত্বের ক্রমবর্ধমান সমষ্টি হিসাবে আচ্ছাদিত মোট দূরত্ব রয়েছে।

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

সেবা পদ্ধতি সংজ্ঞায়িত করুন

একটি পরিষেবা সংজ্ঞায়িত করতে, আপনি আপনার .proto ফাইলে একটি নামযুক্ত পরিষেবা নির্দিষ্ট করুন৷ route_guide.proto ফাইলটিতে RouteGuide নামে একটি service কাঠামো রয়েছে যা অ্যাপ্লিকেশনটির পরিষেবা দ্বারা প্রদত্ত এক বা একাধিক পদ্ধতিকে সংজ্ঞায়িত করে৷

যখন আপনি আপনার পরিষেবার সংজ্ঞার মধ্যে RPC পদ্ধতিগুলি সংজ্ঞায়িত করেন, আপনি তাদের অনুরোধ এবং প্রতিক্রিয়া প্রকারগুলি নির্দিষ্ট করেন। কোডল্যাবের এই বিভাগে, আসুন সংজ্ঞায়িত করা যাক:

তালিকা বৈশিষ্ট্য

প্রদত্ত Rectangle মধ্যে উপলব্ধ Feature বস্তুগুলি প্রাপ্ত করে। ফলাফলগুলি একবারে ফেরত দেওয়ার পরিবর্তে স্ট্রিম করা হয় কারণ আয়তক্ষেত্রটি একটি বড় এলাকা কভার করতে পারে এবং এতে বিপুল সংখ্যক বৈশিষ্ট্য থাকতে পারে।

এই অ্যাপ্লিকেশনটির জন্য, আপনি একটি সার্ভার-সাইড স্ট্রিমিং RPC ব্যবহার করবেন: ক্লায়েন্ট সার্ভারে একটি অনুরোধ পাঠায় এবং বার্তাগুলির একটি ক্রম পড়ার জন্য একটি স্ট্রিম পায়৷ ক্লায়েন্ট প্রত্যাবর্তিত স্ট্রীম থেকে পাঠ করে যতক্ষণ না আর কোনো বার্তা নেই। আপনি যেমন আমাদের উদাহরণে দেখতে পাচ্ছেন, আপনি প্রতিক্রিয়া প্রকারের আগে স্ট্রিম কীওয়ার্ড স্থাপন করে একটি সার্ভার-সাইড স্ট্রিমিং পদ্ধতি নির্দিষ্ট করেন।

rpc ListFeatures(Rectangle) returns (stream Feature) {}

রেকর্ডরুট

ট্র্যাভার্সাল করা রুটে পয়েন্টের একটি স্ট্রীম গ্রহণ করে, যখন ট্রাভার্সাল সম্পূর্ণ হয় তখন একটি RouteSummary ফেরত দেয়।

একটি ক্লায়েন্ট-সাইড স্ট্রিমিং RPC এই ক্ষেত্রে উপযুক্ত: ক্লায়েন্ট বার্তাগুলির একটি ক্রম লেখে এবং সার্ভারে পাঠায়, আবার একটি প্রদত্ত স্ট্রিম ব্যবহার করে। একবার ক্লায়েন্ট বার্তাগুলি লেখা শেষ করলে, এটি সার্ভারের জন্য সেগুলি পড়ার জন্য এবং তার প্রতিক্রিয়া ফেরত দেওয়ার জন্য অপেক্ষা করে। আপনি অনুরোধের প্রকারের আগে স্ট্রিম কীওয়ার্ড স্থাপন করে একটি ক্লায়েন্ট-সাইড স্ট্রিমিং পদ্ধতি নির্দিষ্ট করুন।

rpc RecordRoute(stream Point) returns (RouteSummary) {}

রুটচ্যাট

অন্যান্য RouteNotes গ্রহণ করার সময় (যেমন অন্যান্য ব্যবহারকারীদের কাছ থেকে) একটি রুট অতিক্রম করার সময় প্রেরিত RouteNotes এর একটি স্ট্রীম গ্রহণ করে৷

দ্বিমুখী স্ট্রিমিংয়ের জন্য এটি ঠিক একই ধরণের ব্যবহারকেস। একটি দ্বিমুখী স্ট্রিমিং RPC যেখানে উভয় পক্ষই একটি পাঠ-লেখা স্ট্রীম ব্যবহার করে বার্তাগুলির একটি ক্রম প্রেরণ করে। দুটি স্ট্রীম স্বাধীনভাবে কাজ করে, তাই ক্লায়েন্ট এবং সার্ভার তাদের পছন্দ অনুযায়ী পড়তে এবং লিখতে পারে: উদাহরণস্বরূপ, সার্ভার তার প্রতিক্রিয়া লেখার আগে সমস্ত ক্লায়েন্ট বার্তা পাওয়ার জন্য অপেক্ষা করতে পারে, বা এটি বিকল্পভাবে একটি বার্তা পড়তে পারে তারপর একটি বার্তা লিখতে পারে, বা পঠিত এবং লেখার অন্য কিছু সংমিশ্রণ। প্রতিটি প্রবাহে বার্তার ক্রম সংরক্ষিত হয়। আপনি অনুরোধ এবং প্রতিক্রিয়া উভয়ের আগে স্ট্রিম কীওয়ার্ড স্থাপন করে এই ধরনের পদ্ধতি নির্দিষ্ট করুন।

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. ক্লায়েন্ট এবং সার্ভার কোড তৈরি করুন

পরবর্তীতে আমাদের .proto পরিষেবা সংজ্ঞা থেকে gRPC ক্লায়েন্ট এবং সার্ভার ইন্টারফেস তৈরি করতে হবে। আমরা একটি বিশেষ gRPC Java প্লাগইন সহ প্রোটোকল বাফার কম্পাইলার protoc ব্যবহার করে এটি করি। gRPC পরিষেবাগুলি তৈরি করার জন্য আপনাকে proto3 কম্পাইলার (যা proto2 এবং proto3 উভয় সিনট্যাক্স সমর্থন করে) ব্যবহার করতে হবে।

Gradle বা Maven ব্যবহার করার সময়, প্রোটোক বিল্ড প্লাগইন বিল্ডের অংশ হিসাবে প্রয়োজনীয় কোড তৈরি করতে পারে। কিভাবে আপনার নিজের .proto ফাইল থেকে কোড জেনারেট করতে হয় তার জন্য আপনি grpc-java README দেখতে পারেন।

আমরা Gradle কনফিগারেশন প্রদান করেছি।

streaming-grpc-java-getting-started ডিরেক্টরি থেকে প্রবেশ করুন

$ chmod +x gradlew
$ ./gradlew generateProto

নিম্নলিখিত ক্লাসগুলি আমাদের পরিষেবা সংজ্ঞা থেকে তৈরি করা হয়েছে ( build/generated/sources/proto/main/java ):

  • প্রতিটি বার্তা প্রকারের জন্য একটি: Feature.java , Rectangle.java, ... যেটিতে আমাদের অনুরোধ এবং প্রতিক্রিয়া বার্তা প্রকারগুলিকে পপুলেট, সিরিয়ালাইজ এবং পুনরুদ্ধার করার জন্য সমস্ত প্রোটোকল বাফার কোড রয়েছে৷
  • RouteGuideGrpc.java যার মধ্যে রয়েছে (অন্যান্য কিছু দরকারী কোড সহ) RouteGuide সার্ভারগুলি বাস্তবায়নের জন্য একটি বেস ক্লাস, RouteGuideGrpc.RouteGuideImplBase , RouteGuide পরিষেবাতে সংজ্ঞায়িত সমস্ত পদ্ধতি এবং ক্লায়েন্টদের ব্যবহারের জন্য স্টাব ক্লাস সহ

5. পরিষেবাটি বাস্তবায়ন করুন

প্রথমে দেখা যাক কিভাবে আমরা একটি RouteGuide সার্ভার তৈরি করি। আমাদের RouteGuide পরিষেবাটি তার কাজ করার জন্য দুটি অংশ রয়েছে:

  • আমাদের পরিষেবার সংজ্ঞা থেকে উত্পন্ন পরিষেবা ইন্টারফেস বাস্তবায়ন করা: আমাদের পরিষেবার প্রকৃত "কাজ" করা।
  • ক্লায়েন্টদের কাছ থেকে অনুরোধ শোনার জন্য এবং সঠিক পরিষেবা বাস্তবায়নে তাদের পাঠানোর জন্য একটি gRPC সার্ভার চালানো।

RouteGuide বাস্তবায়ন করুন

আমরা একটি RouteGuideService ক্লাস বাস্তবায়ন করব যা জেনারেট করা RouteGuideGrpc.RouteGuideImplBase ক্লাসকে প্রসারিত করবে। এইভাবে বাস্তবায়ন দেখতে হবে.

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
        ...
}

public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {

        ...
}

public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {

        ...
}

আসুন আমরা প্রতিটি RPC বাস্তবায়ন বিস্তারিতভাবে দেখি

সার্ভার-সাইড স্ট্রিমিং RPC

এর পরে আসুন আমাদের স্ট্রিমিং RPC গুলির মধ্যে একটি দেখুন। ListFeatures হল একটি সার্ভার-সাইড স্ট্রিমিং RPC, তাই আমাদের ক্লায়েন্টকে একাধিক Features ফেরত পাঠাতে হবে।

private final Collection<Feature> features;

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

সাধারণ RPC-এর মতো, এই পদ্ধতিটি একটি অনুরোধ বস্তু (যে Rectangle আমাদের ক্লায়েন্ট Features খুঁজে পেতে চায়) এবং একটি StreamObserver প্রতিক্রিয়া পর্যবেক্ষক পায়।

এইবার, আমরা ক্লায়েন্টের কাছে ফিরে আসার জন্য যতগুলি Feature অবজেক্ট প্রয়োজন ততগুলি আমরা পাই (এই ক্ষেত্রে, আমরা সেগুলিকে পরিষেবার বৈশিষ্ট্য সংগ্রহ থেকে নির্বাচন করি যে সেগুলি আমাদের অনুরোধ Rectangle ভিতরে আছে কিনা ) এবং প্রতিটিটিকে তার onNext() পদ্ধতি ব্যবহার করে প্রতিক্রিয়া পর্যবেক্ষকের কাছে লিখুন৷ অবশেষে, আমাদের সাধারণ RPC-তে, আমরা প্রতিক্রিয়া পর্যবেক্ষকের onCompleted() পদ্ধতি ব্যবহার করি gRPC কে জানাতে যে আমরা প্রতিক্রিয়া লেখা শেষ করেছি।

ক্লায়েন্ট-সাইড স্ট্রিমিং RPC

এখন আসুন একটু জটিল কিছু দেখি: ক্লায়েন্ট-সাইড স্ট্রিমিং পদ্ধতি RecordRoute() , যেখানে আমরা ক্লায়েন্টের কাছ থেকে Points একটি স্ট্রীম পাই এবং তাদের ট্রিপের তথ্য সহ একটি একক RouteSummary ফেরত দেই।

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

আপনি দেখতে পাচ্ছেন, আগের পদ্ধতির প্রকারের মতো আমাদের পদ্ধতিটি একটি StreamObserver responseObserver প্যারামিটার পায়, কিন্তু এবার এটি ক্লায়েন্টকে তার Points লিখতে একটি StreamObserver প্রদান করে।

মেথড বডিতে আমরা একটি বেনামী StreamObserver ফিরে আসার জন্য ইনস্ট্যান্টিয়েট করি, যেখানে আমরা:

  • প্রতিবার ক্লায়েন্ট যখন বার্তা স্ট্রীমে একটি Point লেখেন তখন বৈশিষ্ট্য এবং অন্যান্য তথ্য পেতে onNext() পদ্ধতিটি ওভাররাইড করুন।
  • আমাদের RouteSummary তৈরি ও তৈরি করতে onCompleted() পদ্ধতি ( ক্লায়েন্ট বার্তা লেখা শেষ হলে বলা হয়) ওভাররাইড করুন। তারপরে আমরা আমাদের RouteSummary সাথে আমাদের পদ্ধতির নিজস্ব প্রতিক্রিয়া পর্যবেক্ষকের onNext() কল করি এবং তারপর সার্ভারের দিক থেকে কলটি শেষ করতে এটির onCompleted() পদ্ধতিতে কল করি।

দ্বিমুখী স্ট্রিমিং RPC

অবশেষে, আসুন আমাদের দ্বিমুখী স্ট্রিমিং RPC RouteChat() দেখুন।

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

আমাদের ক্লায়েন্ট-সাইড স্ট্রিমিং উদাহরণের মতো, আমরা উভয়েই একটি StreamObserver পাই এবং ফেরত দিই, এই সময় ব্যতীত আমরা আমাদের পদ্ধতির প্রতিক্রিয়া পর্যবেক্ষকের মাধ্যমে মান ফেরত দিই যখন ক্লায়েন্ট এখনও তাদের বার্তা স্ট্রীমে বার্তা লিখছে। এখানে পড়া এবং লেখার জন্য সিনট্যাক্স আমাদের ক্লায়েন্ট-স্ট্রিমিং এবং সার্ভার-স্ট্রিমিং পদ্ধতির মতোই। যদিও প্রতিটি পক্ষ সর্বদা অন্যের বার্তাগুলি সেগুলি যে ক্রমে লেখা হয়েছিল সেই ক্রমে পাবে, ক্লায়েন্ট এবং সার্ভার উভয়ই যে কোনও ক্রমে পড়তে এবং লিখতে পারে — স্ট্রিমগুলি সম্পূর্ণ স্বাধীনভাবে কাজ করে৷

সার্ভার শুরু করুন

একবার আমরা আমাদের সমস্ত পদ্ধতি প্রয়োগ করে ফেললে, আমাদের একটি gRPC সার্ভারও চালু করতে হবে যাতে ক্লায়েন্টরা আসলে আমাদের পরিষেবা ব্যবহার করতে পারে। নিম্নলিখিত স্নিপেটটি দেখায় যে আমরা আমাদের RouteGuide পরিষেবার জন্য কীভাবে এটি করি:

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
}

আপনি দেখতে পাচ্ছেন, আমরা একটি ServerBuilder ব্যবহার করে আমাদের সার্ভার তৈরি এবং শুরু করি।

এটি করার জন্য, আমরা:

  1. বিল্ডারের forPort() পদ্ধতি ব্যবহার করে ক্লায়েন্টের অনুরোধ শুনতে আমরা যে ঠিকানা এবং পোর্ট ব্যবহার করতে চাই তা নির্দিষ্ট করুন।
  2. আমাদের পরিষেবা বাস্তবায়ন ক্লাস RouteGuideService এর একটি উদাহরণ তৈরি করুন এবং এটি নির্মাতার addService() পদ্ধতিতে পাস করুন।
  3. আমাদের পরিষেবার জন্য একটি RPC সার্ভার তৈরি করতে এবং শুরু করতে বিল্ডারকে build() এবং start() কল করুন।

যেহেতু সার্ভার বিল্ডার ইতিমধ্যেই পোর্টটি অন্তর্ভুক্ত করেছে, তাই আমরা একটি পোর্ট পাস করার একমাত্র কারণ হল লগিং এর জন্য এটি ব্যবহার করা।

6. ক্লায়েন্ট তৈরি করুন

এই বিভাগে, আমরা আমাদের RouteGuide পরিষেবার জন্য একটি ক্লায়েন্ট তৈরি করার বিষয়ে দেখব। আপনি ../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java এ আমাদের সম্পূর্ণ উদাহরণ ক্লায়েন্ট কোড দেখতে পারেন।

একটি স্টাব তাত্ক্ষণিক

পরিষেবা পদ্ধতিতে কল করার জন্য, আমাদের প্রথমে একটি স্টাব তৈরি করতে হবে, বা বরং দুটি স্টাব তৈরি করতে হবে:

  • একটি ব্লকিং/সিঙ্ক্রোনাস স্টাব: এর মানে হল যে RPC কল সার্ভারের সাড়া দেওয়ার জন্য অপেক্ষা করে, এবং হয় একটি প্রতিক্রিয়া প্রদান করবে বা একটি ব্যতিক্রম উত্থাপন করবে।
  • একটি নন-ব্লকিং/অসিঙ্ক্রোনাস স্টাব যা সার্ভারে নন-ব্লকিং কল করে, যেখানে প্রতিক্রিয়া অ্যাসিঙ্ক্রোনাসভাবে ফেরত দেওয়া হয়। আপনি শুধুমাত্র একটি অ্যাসিঙ্ক্রোনাস স্টাব ব্যবহার করে নির্দিষ্ট ধরনের স্ট্রিমিং কল করতে পারেন।

প্রথমে আমাদের স্টাবের জন্য একটি জিআরপিসি চ্যানেল তৈরি করতে হবে, যে সার্ভারের ঠিকানা এবং পোর্টে আমরা সংযোগ করতে চাই তা উল্লেখ করে:

  public static void main(String[] args) throws InterruptedException {
    String target = "localhost:8980";
    if (args.length > 0) {
      if ("--help".equals(args[0])) {
        System.err.println("Usage: [target]");
        System.err.println("");
        System.err.println("  target  The server to connect to. Defaults to " + target);
        System.exit(1);
      }
      target = args[0];
    }

    List<Feature> features;
    try {
      features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
        .build();
    try {
      RouteGuideClient client = new RouteGuideClient(channel);

      // Looking for features between 40, -75 and 42, -73.
      client.listFeatures(400000000, -750000000, 420000000, -730000000);

      // Record a few randomly selected points from the features file.
      client.recordRoute(features, 10);

      // Send and receive some notes.
      CountDownLatch finishLatch = client.routeChat();

      if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        client.warning("routeChat did not finish within 1 minutes");
      }
    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }

চ্যানেল তৈরি করতে আমরা একটি ManagedChannelBuilder ব্যবহার করি।

এখন আমরা আমাদের .proto থেকে তৈরি করা RouteGuideGrpc ক্লাসে প্রদত্ত newStub এবং newBlockingStub পদ্ধতি ব্যবহার করে আমাদের স্টাব তৈরি করতে চ্যানেলটি ব্যবহার করতে পারি।

public RouteGuideClient(Channel channel) {
    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
  }

মনে রাখবেন, এটি ব্লক না হলে, এটি অ্যাসিঙ্ক

কল পরিষেবা পদ্ধতি

এখন আসুন আমরা আমাদের পরিষেবা পদ্ধতিগুলিকে কীভাবে বলি তা দেখি। নোট করুন যে ব্লকিং স্টাব থেকে তৈরি যেকোন RPC একটি ব্লকিং/সিঙ্ক্রোনাস মোডে কাজ করবে, যার মানে হল যে RPC কলটি সার্ভারের সাড়া দেওয়ার জন্য অপেক্ষা করে এবং হয় একটি প্রতিক্রিয়া বা ত্রুটি ফিরিয়ে দেবে।

সার্ভার-সাইড স্ট্রিমিং RPC

এর পরে, আসুন ListFeatures এ একটি সার্ভার-সাইড স্ট্রিমিং কল দেখি, যা ভৌগলিক Feature একটি স্ট্রিম প্রদান করে:

Rectangle request = Rectangle.newBuilder()
             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

আপনি দেখতে পাচ্ছেন, এটি Getting_Started_With_gRPC_Java কোডল্যাবে আমরা যে সাধারণ ইউনারি আরপিসি দেখেছি তার সাথে খুব মিল, একটি একক Feature ফেরত দেওয়ার পরিবর্তে, পদ্ধতিটি একটি Iterator প্রদান করে যা ক্লায়েন্ট সমস্ত প্রত্যাবর্তিত Features পড়তে ব্যবহার করতে পারে।

ক্লায়েন্ট-সাইড স্ট্রিমিং RPC

এখন একটু জটিল কিছুর জন্য: ক্লায়েন্ট-সাইড স্ট্রিমিং পদ্ধতি RecordRoute , যেখানে আমরা সার্ভারে Points একটি স্ট্রীম পাঠাই এবং একটি একক RouteSummary ফিরে পাই। এই পদ্ধতির জন্য আমাদের অ্যাসিঙ্ক্রোনাস স্টাব ব্যবহার করতে হবে। আপনি যদি ইতিমধ্যে সার্ভার তৈরি করা পড়ে থাকেন তবে এর মধ্যে কয়েকটি খুব পরিচিত মনে হতে পারে - অ্যাসিঙ্ক্রোনাস স্ট্রিমিং RPCগুলি উভয় দিকে একইভাবে প্রয়োগ করা হয়।

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {

    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

আপনি দেখতে পাচ্ছেন, এই পদ্ধতিটিকে কল করার জন্য আমাদের একটি StreamObserver তৈরি করতে হবে, যা সার্ভারের জন্য একটি বিশেষ ইন্টারফেস প্রয়োগ করে যার RouteSummary প্রতিক্রিয়ার সাথে কল করা যায়। আমাদের StreamObserver এ আমরা:

  • সার্ভার যখন বার্তা স্ট্রীমে একটি RouteSummary লেখে তখন ফেরত তথ্য মুদ্রণ করতে onNext() পদ্ধতিটি ওভাররাইড করুন।
  • একটি CountDownLatch কমাতে onCompleted() পদ্ধতি (যাকে সার্ভার তার পাশে কলটি সম্পন্ন করলে বলা হয়) ওভাররাইড করুন যাতে সার্ভার লেখা শেষ হয়েছে কিনা তা পরীক্ষা করতে পারি।

তারপরে আমরা StreamObserver অ্যাসিঙ্ক্রোনাস স্টাবের recordRoute() পদ্ধতিতে পাস করি এবং আমাদের নিজস্ব StreamObserver অনুরোধ পর্যবেক্ষককে সার্ভারে পাঠানোর জন্য আমাদের Points লিখতে ফিরে পাই। একবার আমরা পয়েন্ট লেখা শেষ করার পরে, আমরা জিআরপিসিকে জানাতে অনুরোধ পর্যবেক্ষকের onCompleted() পদ্ধতি ব্যবহার করি যে আমরা ক্লায়েন্ট সাইডে লেখা শেষ করেছি। একবার আমাদের কাজ শেষ হলে, সার্ভারটি তার পাশে সম্পন্ন হয়েছে কিনা তা দেখতে আমরা আমাদের CountDownLatch পরীক্ষা করি।

দ্বিমুখী স্ট্রিমিং RPC

অবশেষে, আসুন আমাদের দ্বিমুখী স্ট্রিমিং RPC RouteChat() দেখুন।

public CountDownLatch routeChat() {
    info("*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat(new StreamObserver<RouteNote>() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          }

          @Override
          public void onError(Throwable t) {
            warning("RouteChat Failed: {0}", Status.fromThrowable(t));
            finishLatch.countDown();
          }

          @Override
          public void onCompleted() {
            info("Finished RouteChat");
            finishLatch.countDown();
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
              newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // return the latch while receiving happens asynchronously
    return finishLatch;
  }

আমাদের ক্লায়েন্ট-সাইড স্ট্রিমিং উদাহরণের মতো, আমরা উভয়েই একটি StreamObserver প্রতিক্রিয়া পর্যবেক্ষক পাই এবং ফেরত দিই, এই সময়টি ছাড়া আমরা আমাদের পদ্ধতির প্রতিক্রিয়া পর্যবেক্ষকের মাধ্যমে মান পাঠাই যখন সার্ভার এখনও তাদের বার্তা স্ট্রীমে বার্তা লিখছে। এখানে পড়া এবং লেখার জন্য সিনট্যাক্স আমাদের ক্লায়েন্ট-স্ট্রিমিং পদ্ধতির মতোই। যদিও প্রতিটি পক্ষ সর্বদা অন্যের বার্তাগুলি সেগুলি যে ক্রমে লেখা হয়েছিল সেই ক্রমে পাবে, ক্লায়েন্ট এবং সার্ভার উভয়ই যে কোনও ক্রমে পড়তে এবং লিখতে পারে — স্ট্রিমগুলি সম্পূর্ণ স্বাধীনভাবে কাজ করে৷

7. এটা চেষ্টা করে দেখুন!

  1. start_here ডিরেক্টরি থেকে:
$ ./gradlew installDist

এটি আপনার কোড কম্পাইল করবে, এটি একটি জারে প্যাকেজ করবে এবং স্ক্রিপ্টগুলি তৈরি করবে যা উদাহরণটি চালায়। এগুলি build/install/start_here/bin/ ডিরেক্টরিতে তৈরি করা হবে। স্ক্রিপ্টগুলি হল: route-guide-server এবং route-guide-client

ক্লায়েন্ট শুরু করার আগে সার্ভারটি চলতে হবে।

  1. সার্ভার চালান:
$ ./build/install/start_here/bin/route-guide-server
  1. ক্লায়েন্ট চালান:
$ ./build/install/start_here/bin/route-guide-client

8. পরবর্তী কি