1. ভূমিকা
এই কোডল্যাবে, আপনি একটি ক্লায়েন্ট এবং সার্ভার তৈরি করতে gRPC-Java ব্যবহার করবেন যা জাভাতে লেখা একটি রুট-ম্যাপিং অ্যাপ্লিকেশনের ভিত্তি তৈরি করে।
টিউটোরিয়ালের শেষে, আপনার কাছে একটি ক্লায়েন্ট থাকবে যেটি একটি দূরবর্তী সার্ভারের সাথে সংযোগ করে একটি ক্লায়েন্টের রুটের বৈশিষ্ট্য সম্পর্কে তথ্য পেতে, একটি ক্লায়েন্টের রুটের একটি সারাংশ তৈরি করতে এবং সার্ভার এবং অন্যান্য ক্লায়েন্টদের সাথে ট্রাফিক আপডেটের মতো রুটের তথ্য বিনিময় করতে।
পরিষেবাটি একটি প্রোটোকল বাফার ফাইলে সংজ্ঞায়িত করা হয়েছে, যা ক্লায়েন্ট এবং সার্ভারের জন্য বয়লারপ্লেট কোড তৈরি করতে ব্যবহার করা হবে যাতে তারা একে অপরের সাথে যোগাযোগ করতে পারে, সেই কার্যকারিতা বাস্তবায়নে আপনার সময় এবং প্রচেষ্টা বাঁচাতে পারে।
এই তৈরি করা কোডটি সার্ভার এবং ক্লায়েন্টের মধ্যে যোগাযোগের জটিলতাই নয়, ডেটা সিরিয়ালাইজেশন এবং ডিসিরিয়ালাইজেশনেরও যত্ন নেয়।
আপনি কি শিখবেন
- একটি পরিষেবা API সংজ্ঞায়িত করতে প্রোটোকল বাফারগুলি কীভাবে ব্যবহার করবেন।
- স্বয়ংক্রিয় কোড জেনারেশন ব্যবহার করে প্রোটোকল বাফার সংজ্ঞা থেকে কীভাবে একটি জিআরপিসি-ভিত্তিক ক্লায়েন্ট এবং সার্ভার তৈরি করবেন।
- gRPC-এর সাথে ক্লায়েন্ট-সার্ভার স্ট্রিমিং যোগাযোগের একটি বোঝাপড়া।
এই কোডল্যাবটি জিআরপিসি-তে নতুন জাভা ডেভেলপারদের জন্য বা জিআরপিসি-র রিফ্রেশার খোঁজার জন্য, বা বিতরণ করা সিস্টেম তৈরি করতে আগ্রহী অন্য যেকেউ লক্ষ্য করে। কোন পূর্ব gRPC অভিজ্ঞতা প্রয়োজন নেই.
2. আপনি শুরু করার আগে
পূর্বশর্ত
- JDK সংস্করণ 24।
কোড পান
যাতে আপনাকে সম্পূর্ণরূপে স্ক্র্যাচ থেকে শুরু করতে না হয়, এই কোডল্যাবটি আপনাকে সম্পূর্ণ করার জন্য অ্যাপ্লিকেশনের সোর্স কোডের একটি স্ক্যাফোল্ড প্রদান করে। বয়লারপ্লেট জিআরপিসি কোড জেনারেট করতে প্রোটোকল বাফার কম্পাইলার প্লাগইনগুলি ব্যবহার করা সহ, নিম্নলিখিত পদক্ষেপগুলি আপনাকে কীভাবে অ্যাপ্লিকেশনটি শেষ করতে হবে তা দেখাবে।
প্রথমে, কোডল্যাব ওয়ার্কিং ডিরেক্টরি তৈরি করুন এবং এটিতে সিডি করুন:
mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started
কোডল্যাব ডাউনলোড এবং এক্সট্রাক্ট করুন:
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
| tar xvz --strip-components=4 \
grpc-codelabs-1/codelabs/grpc-java-streaming/start_here
বিকল্পভাবে, আপনি শুধুমাত্র কোডল্যাব ডিরেক্টরি ধারণকারী .zip ফাইলটি ডাউনলোড করতে পারেন এবং ম্যানুয়ালি আনজিপ করতে পারেন।
আপনি যদি বাস্তবায়নে টাইপ করা এড়িয়ে যেতে চান তবে সম্পূর্ণ সোর্স কোডটি GitHub-এ উপলব্ধ ।
3. বার্তা এবং পরিষেবা সংজ্ঞায়িত করুন
আপনার প্রথম ধাপ হল প্রোটোকল বাফার ব্যবহার করে অ্যাপ্লিকেশনটির gRPC পরিষেবা, এর RPC পদ্ধতি এবং এর অনুরোধ এবং প্রতিক্রিয়া বার্তার ধরনগুলিকে সংজ্ঞায়িত করা। আপনার পরিষেবা প্রদান করবে:
- RPC পদ্ধতিগুলিকে
ListFeatures
,RecordRoute
, এবংRouteChat
বলা হয় যা সার্ভার প্রয়োগ করে এবং ক্লায়েন্ট কল করে। - বার্তার ধরন হল
Point
,Feature
,Rectangle
,RouteNote
এবংRouteSummary
, যা উপরের পদ্ধতিগুলি কল করার সময় ক্লায়েন্ট এবং সার্ভারের মধ্যে আদান-প্রদান করা হয়।
প্রোটোকল বাফারগুলি সাধারণত প্রোটোবাফ হিসাবে পরিচিত। gRPC পরিভাষা সম্পর্কে আরও তথ্যের জন্য, gRPC-এর মূল ধারণা, স্থাপত্য, এবং জীবনচক্র দেখুন।
এই RPC পদ্ধতি এবং এর বার্তা প্রকারগুলি সমস্ত প্রদত্ত সোর্স কোডের proto/routeguide/route_guide.proto
ফাইলে সংজ্ঞায়িত করা হবে৷
আসুন একটি route_guide.proto
ফাইল তৈরি করি।
যেহেতু আমরা এই উদাহরণে জাভা কোড তৈরি করছি, আমরা আমাদের .proto
এ একটি java_package
ফাইল বিকল্প নির্দিষ্ট করেছি:
option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";
বার্তার ধরন সংজ্ঞায়িত করুন
সোর্স কোডের proto/routeguide/route_guide.proto
ফাইলে, প্রথমে Point
বার্তার ধরনটি সংজ্ঞায়িত করুন। একটি Point
একটি মানচিত্রে একটি অক্ষাংশ-দ্রাঘিমাংশ স্থানাঙ্ক জুটিকে প্রতিনিধিত্ব করে। এই কোডল্যাবের জন্য, স্থানাঙ্কের জন্য পূর্ণসংখ্যা ব্যবহার করুন:
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
1
এবং 2
নম্বরগুলি message
কাঠামোর প্রতিটি ক্ষেত্রের জন্য অনন্য আইডি নম্বর।
এর পরে, Feature
বার্তার ধরন সংজ্ঞায়িত করুন। একটি Feature
একটি Point
দ্বারা নির্দিষ্ট একটি অবস্থানে কোনো কিছুর নাম বা ডাক ঠিকানার জন্য একটি string
ক্ষেত্র ব্যবহার করে:
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
যাতে একটি এলাকার মধ্যে একাধিক পয়েন্ট একটি ক্লায়েন্টের কাছে স্ট্রিম করা যায়, আপনার একটি Rectangle
বার্তার প্রয়োজন হবে যা একটি অক্ষাংশ-দ্রাঘিমাংশের আয়তক্ষেত্রকে প্রতিনিধিত্ব করে, যা দুটি তির্যক বিপরীত বিন্দু lo
এবং hi
হিসাবে উপস্থাপিত হয়:
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
এছাড়াও, একটি RouteNote
বার্তা যা প্রদত্ত বিন্দুতে প্রেরিত একটি বার্তা প্রতিনিধিত্ব করে:
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
অবশেষে, আপনার একটি RouteSummary
বার্তার প্রয়োজন হবে। এই বার্তাটি একটি RecordRoute
RPC-এর প্রতিক্রিয়া হিসাবে গৃহীত হয়েছে, যা পরবর্তী বিভাগে ব্যাখ্যা করা হয়েছে। এতে প্রাপ্ত পৃথক পয়েন্টের সংখ্যা, সনাক্ত করা বৈশিষ্ট্যের সংখ্যা এবং প্রতিটি বিন্দুর মধ্যে দূরত্বের ক্রমবর্ধমান সমষ্টি হিসাবে আচ্ছাদিত মোট দূরত্ব রয়েছে।
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
সেবা পদ্ধতি সংজ্ঞায়িত করুন
একটি পরিষেবা সংজ্ঞায়িত করতে, আপনি আপনার .proto
ফাইলে একটি নামযুক্ত পরিষেবা নির্দিষ্ট করুন৷ route_guide.proto
ফাইলটিতে RouteGuide
নামে একটি service
কাঠামো রয়েছে যা অ্যাপ্লিকেশনটির পরিষেবা দ্বারা প্রদত্ত এক বা একাধিক পদ্ধতিকে সংজ্ঞায়িত করে৷
যখন আপনি আপনার পরিষেবার সংজ্ঞার মধ্যে RPC
পদ্ধতিগুলি সংজ্ঞায়িত করেন, আপনি তাদের অনুরোধ এবং প্রতিক্রিয়া প্রকারগুলি নির্দিষ্ট করেন। কোডল্যাবের এই বিভাগে, আসুন সংজ্ঞায়িত করা যাক:
তালিকা বৈশিষ্ট্য
প্রদত্ত Rectangle
মধ্যে উপলব্ধ Feature
বস্তুগুলি প্রাপ্ত করে। ফলাফলগুলি একবারে ফেরত দেওয়ার পরিবর্তে স্ট্রিম করা হয় কারণ আয়তক্ষেত্রটি একটি বড় এলাকা কভার করতে পারে এবং এতে বিপুল সংখ্যক বৈশিষ্ট্য থাকতে পারে।
এই অ্যাপ্লিকেশনটির জন্য, আপনি একটি সার্ভার-সাইড স্ট্রিমিং RPC ব্যবহার করবেন: ক্লায়েন্ট সার্ভারে একটি অনুরোধ পাঠায় এবং বার্তাগুলির একটি ক্রম পড়ার জন্য একটি স্ট্রিম পায়৷ ক্লায়েন্ট প্রত্যাবর্তিত স্ট্রীম থেকে পাঠ করে যতক্ষণ না আর কোনো বার্তা নেই। আপনি যেমন আমাদের উদাহরণে দেখতে পাচ্ছেন, আপনি প্রতিক্রিয়া প্রকারের আগে স্ট্রিম কীওয়ার্ড স্থাপন করে একটি সার্ভার-সাইড স্ট্রিমিং পদ্ধতি নির্দিষ্ট করেন।
rpc ListFeatures(Rectangle) returns (stream Feature) {}
রেকর্ডরুট
ট্র্যাভার্সাল করা রুটে পয়েন্টের একটি স্ট্রীম গ্রহণ করে, যখন ট্রাভার্সাল সম্পূর্ণ হয় তখন একটি RouteSummary
ফেরত দেয়।
একটি ক্লায়েন্ট-সাইড স্ট্রিমিং RPC এই ক্ষেত্রে উপযুক্ত: ক্লায়েন্ট বার্তাগুলির একটি ক্রম লেখে এবং সার্ভারে পাঠায়, আবার একটি প্রদত্ত স্ট্রিম ব্যবহার করে। একবার ক্লায়েন্ট বার্তাগুলি লেখা শেষ করলে, এটি সার্ভারের জন্য সেগুলি পড়ার জন্য এবং তার প্রতিক্রিয়া ফেরত দেওয়ার জন্য অপেক্ষা করে। আপনি অনুরোধের প্রকারের আগে স্ট্রিম কীওয়ার্ড স্থাপন করে একটি ক্লায়েন্ট-সাইড স্ট্রিমিং পদ্ধতি নির্দিষ্ট করুন।
rpc RecordRoute(stream Point) returns (RouteSummary) {}
রুটচ্যাট
অন্যান্য RouteNotes
গ্রহণ করার সময় (যেমন অন্যান্য ব্যবহারকারীদের কাছ থেকে) একটি রুট অতিক্রম করার সময় প্রেরিত RouteNotes
এর একটি স্ট্রীম গ্রহণ করে৷
দ্বিমুখী স্ট্রিমিংয়ের জন্য এটি ঠিক একই ধরণের ব্যবহারকেস। একটি দ্বিমুখী স্ট্রিমিং RPC যেখানে উভয় পক্ষই একটি পাঠ-লেখা স্ট্রীম ব্যবহার করে বার্তাগুলির একটি ক্রম প্রেরণ করে। দুটি স্ট্রীম স্বাধীনভাবে কাজ করে, তাই ক্লায়েন্ট এবং সার্ভার তাদের পছন্দ অনুযায়ী পড়তে এবং লিখতে পারে: উদাহরণস্বরূপ, সার্ভার তার প্রতিক্রিয়া লেখার আগে সমস্ত ক্লায়েন্ট বার্তা পাওয়ার জন্য অপেক্ষা করতে পারে, বা এটি বিকল্পভাবে একটি বার্তা পড়তে পারে তারপর একটি বার্তা লিখতে পারে, বা পঠিত এবং লেখার অন্য কিছু সংমিশ্রণ। প্রতিটি প্রবাহে বার্তার ক্রম সংরক্ষিত হয়। আপনি অনুরোধ এবং প্রতিক্রিয়া উভয়ের আগে স্ট্রিম কীওয়ার্ড স্থাপন করে এই ধরনের পদ্ধতি নির্দিষ্ট করুন।
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. ক্লায়েন্ট এবং সার্ভার কোড তৈরি করুন
পরবর্তীতে আমাদের .proto
পরিষেবা সংজ্ঞা থেকে gRPC ক্লায়েন্ট এবং সার্ভার ইন্টারফেস তৈরি করতে হবে। আমরা একটি বিশেষ gRPC Java প্লাগইন সহ প্রোটোকল বাফার কম্পাইলার protoc
ব্যবহার করে এটি করি। gRPC পরিষেবাগুলি তৈরি করার জন্য আপনাকে proto3 কম্পাইলার (যা proto2 এবং proto3 উভয় সিনট্যাক্স সমর্থন করে) ব্যবহার করতে হবে।
Gradle বা Maven ব্যবহার করার সময়, প্রোটোক বিল্ড প্লাগইন বিল্ডের অংশ হিসাবে প্রয়োজনীয় কোড তৈরি করতে পারে। কিভাবে আপনার নিজের .proto
ফাইল থেকে কোড জেনারেট করতে হয় তার জন্য আপনি grpc-java README দেখতে পারেন।
আমরা Gradle কনফিগারেশন প্রদান করেছি।
streaming-grpc-java-getting-started
ডিরেক্টরি থেকে প্রবেশ করুন
$ chmod +x gradlew $ ./gradlew generateProto
নিম্নলিখিত ক্লাসগুলি আমাদের পরিষেবা সংজ্ঞা থেকে তৈরি করা হয়েছে ( build/generated/sources/proto/main/java
):
- প্রতিটি বার্তা প্রকারের জন্য একটি:
Feature.java
,Rectangle.java, ...
যেটিতে আমাদের অনুরোধ এবং প্রতিক্রিয়া বার্তা প্রকারগুলিকে পপুলেট, সিরিয়ালাইজ এবং পুনরুদ্ধার করার জন্য সমস্ত প্রোটোকল বাফার কোড রয়েছে৷ -
RouteGuideGrpc.java
যার মধ্যে রয়েছে (অন্যান্য কিছু দরকারী কোড সহ)RouteGuide
সার্ভারগুলি বাস্তবায়নের জন্য একটি বেস ক্লাস,RouteGuideGrpc.RouteGuideImplBase
,RouteGuide
পরিষেবাতে সংজ্ঞায়িত সমস্ত পদ্ধতি এবং ক্লায়েন্টদের ব্যবহারের জন্য স্টাব ক্লাস সহ
5. পরিষেবাটি বাস্তবায়ন করুন
প্রথমে দেখা যাক কিভাবে আমরা একটি RouteGuide
সার্ভার তৈরি করি। আমাদের RouteGuide
পরিষেবাটি তার কাজ করার জন্য দুটি অংশ রয়েছে:
- আমাদের পরিষেবার সংজ্ঞা থেকে উত্পন্ন পরিষেবা ইন্টারফেস বাস্তবায়ন করা: আমাদের পরিষেবার প্রকৃত "কাজ" করা।
- ক্লায়েন্টদের কাছ থেকে অনুরোধ শোনার জন্য এবং সঠিক পরিষেবা বাস্তবায়নে তাদের পাঠানোর জন্য একটি gRPC সার্ভার চালানো।
RouteGuide বাস্তবায়ন করুন
আমরা একটি RouteGuideService
ক্লাস বাস্তবায়ন করব যা জেনারেট করা RouteGuideGrpc.RouteGuideImplBase ক্লাসকে প্রসারিত করবে। এইভাবে বাস্তবায়ন দেখতে হবে.
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
...
}
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
...
}
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
...
}
আসুন আমরা প্রতিটি RPC বাস্তবায়ন বিস্তারিতভাবে দেখি
সার্ভার-সাইড স্ট্রিমিং RPC
এর পরে আসুন আমাদের স্ট্রিমিং RPC গুলির মধ্যে একটি দেখুন। ListFeatures
হল একটি সার্ভার-সাইড স্ট্রিমিং RPC, তাই আমাদের ক্লায়েন্টকে একাধিক Features
ফেরত পাঠাতে হবে।
private final Collection<Feature> features;
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
সাধারণ RPC-এর মতো, এই পদ্ধতিটি একটি অনুরোধ বস্তু (যে Rectangle
আমাদের ক্লায়েন্ট Features
খুঁজে পেতে চায়) এবং একটি StreamObserver
প্রতিক্রিয়া পর্যবেক্ষক পায়।
এইবার, আমরা ক্লায়েন্টের কাছে ফিরে আসার জন্য যতগুলি Feature
অবজেক্ট প্রয়োজন ততগুলি আমরা পাই (এই ক্ষেত্রে, আমরা সেগুলিকে পরিষেবার বৈশিষ্ট্য সংগ্রহ থেকে নির্বাচন করি যে সেগুলি আমাদের অনুরোধ Rectangle
ভিতরে আছে কিনা ) এবং প্রতিটিটিকে তার onNext()
পদ্ধতি ব্যবহার করে প্রতিক্রিয়া পর্যবেক্ষকের কাছে লিখুন৷ অবশেষে, আমাদের সাধারণ RPC-তে, আমরা প্রতিক্রিয়া পর্যবেক্ষকের onCompleted()
পদ্ধতি ব্যবহার করি gRPC কে জানাতে যে আমরা প্রতিক্রিয়া লেখা শেষ করেছি।
ক্লায়েন্ট-সাইড স্ট্রিমিং RPC
এখন আসুন একটু জটিল কিছু দেখি: ক্লায়েন্ট-সাইড স্ট্রিমিং পদ্ধতি RecordRoute()
, যেখানে আমরা ক্লায়েন্টের কাছ থেকে Points
একটি স্ট্রীম পাই এবং তাদের ট্রিপের তথ্য সহ একটি একক RouteSummary
ফেরত দেই।
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
আপনি দেখতে পাচ্ছেন, আগের পদ্ধতির প্রকারের মতো আমাদের পদ্ধতিটি একটি StreamObserver
responseObserver
প্যারামিটার পায়, কিন্তু এবার এটি ক্লায়েন্টকে তার Points
লিখতে একটি StreamObserver
প্রদান করে।
মেথড বডিতে আমরা একটি বেনামী StreamObserver
ফিরে আসার জন্য ইনস্ট্যান্টিয়েট করি, যেখানে আমরা:
- প্রতিবার ক্লায়েন্ট যখন বার্তা স্ট্রীমে একটি
Point
লেখেন তখন বৈশিষ্ট্য এবং অন্যান্য তথ্য পেতেonNext()
পদ্ধতিটি ওভাররাইড করুন। - আমাদের
RouteSummary
তৈরি ও তৈরি করতেonCompleted()
পদ্ধতি ( ক্লায়েন্ট বার্তা লেখা শেষ হলে বলা হয়) ওভাররাইড করুন। তারপরে আমরা আমাদেরRouteSummary
সাথে আমাদের পদ্ধতির নিজস্ব প্রতিক্রিয়া পর্যবেক্ষকেরonNext()
কল করি এবং তারপর সার্ভারের দিক থেকে কলটি শেষ করতে এটিরonCompleted()
পদ্ধতিতে কল করি।
দ্বিমুখী স্ট্রিমিং RPC
অবশেষে, আসুন আমাদের দ্বিমুখী স্ট্রিমিং RPC RouteChat()
দেখুন।
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
আমাদের ক্লায়েন্ট-সাইড স্ট্রিমিং উদাহরণের মতো, আমরা উভয়েই একটি StreamObserver
পাই এবং ফেরত দিই, এই সময় ব্যতীত আমরা আমাদের পদ্ধতির প্রতিক্রিয়া পর্যবেক্ষকের মাধ্যমে মান ফেরত দিই যখন ক্লায়েন্ট এখনও তাদের বার্তা স্ট্রীমে বার্তা লিখছে। এখানে পড়া এবং লেখার জন্য সিনট্যাক্স আমাদের ক্লায়েন্ট-স্ট্রিমিং এবং সার্ভার-স্ট্রিমিং পদ্ধতির মতোই। যদিও প্রতিটি পক্ষ সর্বদা অন্যের বার্তাগুলি সেগুলি যে ক্রমে লেখা হয়েছিল সেই ক্রমে পাবে, ক্লায়েন্ট এবং সার্ভার উভয়ই যে কোনও ক্রমে পড়তে এবং লিখতে পারে — স্ট্রিমগুলি সম্পূর্ণ স্বাধীনভাবে কাজ করে৷
সার্ভার শুরু করুন
একবার আমরা আমাদের সমস্ত পদ্ধতি প্রয়োগ করে ফেললে, আমাদের একটি gRPC সার্ভারও চালু করতে হবে যাতে ক্লায়েন্টরা আসলে আমাদের পরিষেবা ব্যবহার করতে পারে। নিম্নলিখিত স্নিপেটটি দেখায় যে আমরা আমাদের RouteGuide
পরিষেবার জন্য কীভাবে এটি করি:
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}
/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
}
আপনি দেখতে পাচ্ছেন, আমরা একটি ServerBuilder
ব্যবহার করে আমাদের সার্ভার তৈরি এবং শুরু করি।
এটি করার জন্য, আমরা:
- বিল্ডারের
forPort()
পদ্ধতি ব্যবহার করে ক্লায়েন্টের অনুরোধ শুনতে আমরা যে ঠিকানা এবং পোর্ট ব্যবহার করতে চাই তা নির্দিষ্ট করুন। - আমাদের পরিষেবা বাস্তবায়ন ক্লাস
RouteGuideService
এর একটি উদাহরণ তৈরি করুন এবং এটি নির্মাতারaddService()
পদ্ধতিতে পাস করুন। - আমাদের পরিষেবার জন্য একটি RPC সার্ভার তৈরি করতে এবং শুরু করতে বিল্ডারকে
build()
এবংstart()
কল করুন।
যেহেতু সার্ভার বিল্ডার ইতিমধ্যেই পোর্টটি অন্তর্ভুক্ত করেছে, তাই আমরা একটি পোর্ট পাস করার একমাত্র কারণ হল লগিং এর জন্য এটি ব্যবহার করা।
6. ক্লায়েন্ট তৈরি করুন
এই বিভাগে, আমরা আমাদের RouteGuide
পরিষেবার জন্য একটি ক্লায়েন্ট তৈরি করার বিষয়ে দেখব। আপনি ../complete/src/main/java/io/grpc/complete/routeguide/
RouteGuideClient.java
এ আমাদের সম্পূর্ণ উদাহরণ ক্লায়েন্ট কোড দেখতে পারেন।
একটি স্টাব তাত্ক্ষণিক
পরিষেবা পদ্ধতিতে কল করার জন্য, আমাদের প্রথমে একটি স্টাব তৈরি করতে হবে, বা বরং দুটি স্টাব তৈরি করতে হবে:
- একটি ব্লকিং/সিঙ্ক্রোনাস স্টাব: এর মানে হল যে RPC কল সার্ভারের সাড়া দেওয়ার জন্য অপেক্ষা করে, এবং হয় একটি প্রতিক্রিয়া প্রদান করবে বা একটি ব্যতিক্রম উত্থাপন করবে।
- একটি নন-ব্লকিং/অসিঙ্ক্রোনাস স্টাব যা সার্ভারে নন-ব্লকিং কল করে, যেখানে প্রতিক্রিয়া অ্যাসিঙ্ক্রোনাসভাবে ফেরত দেওয়া হয়। আপনি শুধুমাত্র একটি অ্যাসিঙ্ক্রোনাস স্টাব ব্যবহার করে নির্দিষ্ট ধরনের স্ট্রিমিং কল করতে পারেন।
প্রথমে আমাদের স্টাবের জন্য একটি জিআরপিসি চ্যানেল তৈরি করতে হবে, যে সার্ভারের ঠিকানা এবং পোর্টে আমরা সংযোগ করতে চাই তা উল্লেখ করে:
public static void main(String[] args) throws InterruptedException {
String target = "localhost:8980";
if (args.length > 0) {
if ("--help".equals(args[0])) {
System.err.println("Usage: [target]");
System.err.println("");
System.err.println(" target The server to connect to. Defaults to " + target);
System.exit(1);
}
target = args[0];
}
List<Feature> features;
try {
features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
} catch (IOException ex) {
ex.printStackTrace();
return;
}
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
.build();
try {
RouteGuideClient client = new RouteGuideClient(channel);
// Looking for features between 40, -75 and 42, -73.
client.listFeatures(400000000, -750000000, 420000000, -730000000);
// Record a few randomly selected points from the features file.
client.recordRoute(features, 10);
// Send and receive some notes.
CountDownLatch finishLatch = client.routeChat();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
client.warning("routeChat did not finish within 1 minutes");
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
চ্যানেল তৈরি করতে আমরা একটি ManagedChannelBuilder
ব্যবহার করি।
এখন আমরা আমাদের .proto
থেকে তৈরি করা RouteGuideGrpc
ক্লাসে প্রদত্ত newStub
এবং newBlockingStub
পদ্ধতি ব্যবহার করে আমাদের স্টাব তৈরি করতে চ্যানেলটি ব্যবহার করতে পারি।
public RouteGuideClient(Channel channel) {
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
মনে রাখবেন, এটি ব্লক না হলে, এটি অ্যাসিঙ্ক
কল পরিষেবা পদ্ধতি
এখন আসুন আমরা আমাদের পরিষেবা পদ্ধতিগুলিকে কীভাবে বলি তা দেখি। নোট করুন যে ব্লকিং স্টাব থেকে তৈরি যেকোন RPC একটি ব্লকিং/সিঙ্ক্রোনাস মোডে কাজ করবে, যার মানে হল যে RPC কলটি সার্ভারের সাড়া দেওয়ার জন্য অপেক্ষা করে এবং হয় একটি প্রতিক্রিয়া বা ত্রুটি ফিরিয়ে দেবে।
সার্ভার-সাইড স্ট্রিমিং RPC
এর পরে, আসুন ListFeatures
এ একটি সার্ভার-সাইড স্ট্রিমিং কল দেখি, যা ভৌগলিক Feature
একটি স্ট্রিম প্রদান করে:
Rectangle request = Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
আপনি দেখতে পাচ্ছেন, এটি Getting_Started_With_gRPC_Java কোডল্যাবে আমরা যে সাধারণ ইউনারি আরপিসি দেখেছি তার সাথে খুব মিল, একটি একক Feature
ফেরত দেওয়ার পরিবর্তে, পদ্ধতিটি একটি Iterator
প্রদান করে যা ক্লায়েন্ট সমস্ত প্রত্যাবর্তিত Features
পড়তে ব্যবহার করতে পারে।
ক্লায়েন্ট-সাইড স্ট্রিমিং RPC
এখন একটু জটিল কিছুর জন্য: ক্লায়েন্ট-সাইড স্ট্রিমিং পদ্ধতি RecordRoute
, যেখানে আমরা সার্ভারে Points
একটি স্ট্রীম পাঠাই এবং একটি একক RouteSummary
ফিরে পাই। এই পদ্ধতির জন্য আমাদের অ্যাসিঙ্ক্রোনাস স্টাব ব্যবহার করতে হবে। আপনি যদি ইতিমধ্যে সার্ভার তৈরি করা পড়ে থাকেন তবে এর মধ্যে কয়েকটি খুব পরিচিত মনে হতে পারে - অ্যাসিঙ্ক্রোনাস স্ট্রিমিং RPCগুলি উভয় দিকে একইভাবে প্রয়োগ করা হয়।
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
আপনি দেখতে পাচ্ছেন, এই পদ্ধতিটিকে কল করার জন্য আমাদের একটি StreamObserver
তৈরি করতে হবে, যা সার্ভারের জন্য একটি বিশেষ ইন্টারফেস প্রয়োগ করে যার RouteSummary
প্রতিক্রিয়ার সাথে কল করা যায়। আমাদের StreamObserver
এ আমরা:
- সার্ভার যখন বার্তা স্ট্রীমে একটি
RouteSummary
লেখে তখন ফেরত তথ্য মুদ্রণ করতেonNext()
পদ্ধতিটি ওভাররাইড করুন। - একটি
CountDownLatch
কমাতেonCompleted()
পদ্ধতি (যাকে সার্ভার তার পাশে কলটি সম্পন্ন করলে বলা হয়) ওভাররাইড করুন যাতে সার্ভার লেখা শেষ হয়েছে কিনা তা পরীক্ষা করতে পারি।
তারপরে আমরা StreamObserver
অ্যাসিঙ্ক্রোনাস স্টাবের recordRoute()
পদ্ধতিতে পাস করি এবং আমাদের নিজস্ব StreamObserver
অনুরোধ পর্যবেক্ষককে সার্ভারে পাঠানোর জন্য আমাদের Points
লিখতে ফিরে পাই। একবার আমরা পয়েন্ট লেখা শেষ করার পরে, আমরা জিআরপিসিকে জানাতে অনুরোধ পর্যবেক্ষকের onCompleted()
পদ্ধতি ব্যবহার করি যে আমরা ক্লায়েন্ট সাইডে লেখা শেষ করেছি। একবার আমাদের কাজ শেষ হলে, সার্ভারটি তার পাশে সম্পন্ন হয়েছে কিনা তা দেখতে আমরা আমাদের CountDownLatch
পরীক্ষা করি।
দ্বিমুখী স্ট্রিমিং RPC
অবশেষে, আসুন আমাদের দ্বিমুখী স্ট্রিমিং RPC RouteChat()
দেখুন।
public CountDownLatch routeChat() {
info("*** RouteChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@Override
public void onError(Throwable t) {
warning("RouteChat Failed: {0}", Status.fromThrowable(t));
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// return the latch while receiving happens asynchronously
return finishLatch;
}
আমাদের ক্লায়েন্ট-সাইড স্ট্রিমিং উদাহরণের মতো, আমরা উভয়েই একটি StreamObserver
প্রতিক্রিয়া পর্যবেক্ষক পাই এবং ফেরত দিই, এই সময়টি ছাড়া আমরা আমাদের পদ্ধতির প্রতিক্রিয়া পর্যবেক্ষকের মাধ্যমে মান পাঠাই যখন সার্ভার এখনও তাদের বার্তা স্ট্রীমে বার্তা লিখছে। এখানে পড়া এবং লেখার জন্য সিনট্যাক্স আমাদের ক্লায়েন্ট-স্ট্রিমিং পদ্ধতির মতোই। যদিও প্রতিটি পক্ষ সর্বদা অন্যের বার্তাগুলি সেগুলি যে ক্রমে লেখা হয়েছিল সেই ক্রমে পাবে, ক্লায়েন্ট এবং সার্ভার উভয়ই যে কোনও ক্রমে পড়তে এবং লিখতে পারে — স্ট্রিমগুলি সম্পূর্ণ স্বাধীনভাবে কাজ করে৷
7. এটা চেষ্টা করে দেখুন!
-
start_here
ডিরেক্টরি থেকে:
$ ./gradlew installDist
এটি আপনার কোড কম্পাইল করবে, এটি একটি জারে প্যাকেজ করবে এবং স্ক্রিপ্টগুলি তৈরি করবে যা উদাহরণটি চালায়। এগুলি build/install/start_here/bin/
ডিরেক্টরিতে তৈরি করা হবে। স্ক্রিপ্টগুলি হল: route-guide-server
এবং route-guide-client
।
ক্লায়েন্ট শুরু করার আগে সার্ভারটি চলতে হবে।
- সার্ভার চালান:
$ ./build/install/start_here/bin/route-guide-server
- ক্লায়েন্ট চালান:
$ ./build/install/start_here/bin/route-guide-client
8. পরবর্তী কি
- জিআরপিসি কীভাবে জিআরপিসি এবং মূল ধারণার ভূমিকায় কাজ করে তা জানুন
- বেসিক টিউটোরিয়ালের মাধ্যমে কাজ করুন
- API রেফারেন্স অন্বেষণ করুন।