gRPC-Java का इस्तेमाल शुरू करना - स्ट्रीमिंग

1. परिचय

इस कोडलैब में, gRPC-Java का इस्तेमाल करके एक क्लाइंट और सर्वर बनाया जाएगा. ये दोनों, Java में लिखे गए रूट-मैपिंग ऐप्लिकेशन की बुनियादी संरचना तैयार करते हैं.

ट्यूटोरियल के आखिर तक, आपके पास एक ऐसा क्लाइंट होगा जो gRPC का इस्तेमाल करके रिमोट सर्वर से कनेक्ट होता है. इससे क्लाइंट के रूट पर मौजूद सुविधाओं के बारे में जानकारी मिलती है, क्लाइंट के रूट की खास जानकारी बनाई जाती है, और सर्वर और अन्य क्लाइंट के साथ रूट की जानकारी शेयर की जाती है. जैसे, ट्रैफ़िक अपडेट.

सेवा को प्रोटोकॉल बफ़र फ़ाइल में तय किया जाता है. इसका इस्तेमाल क्लाइंट और सर्वर के लिए बॉयलरप्लेट कोड जनरेट करने के लिए किया जाएगा, ताकि वे एक-दूसरे से कम्यूनिकेट कर सकें. इससे आपको इस सुविधा को लागू करने में समय और मेहनत नहीं करनी पड़ेगी.

जनरेट किया गया यह कोड, सर्वर और क्लाइंट के बीच कम्यूनिकेशन की जटिलताओं के साथ-साथ डेटा के क्रमबद्ध और क्रम से हटाने की प्रोसेस को भी मैनेज करता है.

आपको क्या सीखने को मिलेगा

  • किसी सेवा के एपीआई को तय करने के लिए, प्रोटोकॉल बफ़र का इस्तेमाल कैसे करें.
  • ऑटोमेटेड कोड जनरेशन का इस्तेमाल करके, Protocol Buffers की परिभाषा से gRPC पर आधारित क्लाइंट और सर्वर बनाने का तरीका.
  • gRPC के साथ क्लाइंट-सर्वर स्ट्रीमिंग कम्यूनिकेशन के बारे में जानकारी.

यह कोडलैब, Java डेवलपर के लिए है. इसमें gRPC का इस्तेमाल करने का तरीका बताया गया है. साथ ही, इसमें gRPC के बारे में जानकारी दी गई है. इसके अलावा, यह कोडलैब उन लोगों के लिए भी है जो डिस्ट्रिब्यूटेड सिस्टम बनाने में दिलचस्पी रखते हैं. इसके लिए, gRPC का अनुभव होना ज़रूरी नहीं है.

2. शुरू करने से पहले

ज़रूरी शर्तें

  • JDK का वर्शन 24.

कोड प्राप्त करें

इसलिए, आपको शुरू से काम न करना पड़े, इस कोडलैब में ऐप्लिकेशन के सोर्स कोड का एक स्केफ़ोल्ड दिया गया है. आपको इसे पूरा करना होगा. यहां दिए गए तरीके से, ऐप्लिकेशन को पूरा करने का तरीका जानें. इसमें, बॉयलरप्लेट gRPC कोड जनरेट करने के लिए, प्रोटोकॉल बफ़र कंपाइलर प्लगिन का इस्तेमाल करने का तरीका भी शामिल है.

सबसे पहले, कोडलैब की वर्किंग डायरेक्ट्री बनाएं और उसमें cd करें:

mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started

कोडलैब को डाउनलोड और एक्सट्रैक्ट करें:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-java-streaming/start_here

इसके अलावा, सिर्फ़ कोडलैब डायरेक्ट्री वाली .zip फ़ाइल डाउनलोड करके, उसे मैन्युअल तरीके से अनज़िप किया जा सकता है.

अगर आपको लागू करने के लिए टाइप नहीं करना है, तो पूरा सोर्स कोड GitHub पर उपलब्ध है.

3. संदेशों और सेवाओं के बारे में जानकारी

सबसे पहले, आपको प्रोटोकॉल बफ़र का इस्तेमाल करके, ऐप्लिकेशन की gRPC सेवा, उसके आरपीसी तरीके, और उसके अनुरोध और जवाब के मैसेज टाइप तय करने होंगे. आपकी सेवा से ये सुविधाएं मिलेंगी:

  • सर्वर लागू करता है और क्लाइंट कॉल करता है, जिन्हें RPC तरीके ListFeatures, RecordRoute, और RouteChat कहा जाता है.
  • मैसेज टाइप Point, Feature, Rectangle, RouteNote, और RouteSummary, जो ऊपर दिए गए तरीकों को कॉल करते समय क्लाइंट और सर्वर के बीच डेटा स्ट्रक्चर के तौर पर शेयर किए जाते हैं.

प्रोटोकॉल बफ़र को आम तौर पर, protobufs के नाम से जाना जाता है. gRPC की शब्दावली के बारे में ज़्यादा जानने के लिए, gRPC के मुख्य कॉन्सेप्ट, आर्किटेक्चर, और लाइफ़साइकल देखें.

इस आरपीसी तरीके और इसके मैसेज टाइप को, दिए गए सोर्स कोड की proto/routeguide/route_guide.proto फ़ाइल में तय किया जाएगा.

चलिए, route_guide.proto फ़ाइल बनाते हैं.

इस उदाहरण में, हम Java कोड जनरेट कर रहे हैं. इसलिए, हमने अपने .proto में java_package फ़ाइल का विकल्प चुना है:

option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";

मैसेज टाइप तय करना

सोर्स कोड की proto/routeguide/route_guide.proto फ़ाइल में, सबसे पहले Point मैसेज टाइप तय करें. Point, मैप पर अक्षांश-देशांतर के निर्देशांकों के जोड़े को दिखाता है. इस कोडलैब के लिए, पूर्णांकों का इस्तेमाल करें:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

1 और 2 नंबर, message स्ट्रक्चर में मौजूद हर फ़ील्ड के लिए यूनीक आईडी नंबर होते हैं.

इसके बाद, Feature मैसेज टाइप तय करें. Feature, Point में बताई गई जगह पर मौजूद किसी चीज़ के नाम या डाक पते के लिए string फ़ील्ड का इस्तेमाल करता है:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

किसी इलाके के कई पॉइंट को क्लाइंट को स्ट्रीम करने के लिए, आपको Rectangle मैसेज की ज़रूरत होगी. यह मैसेज, अक्षांश-देशांतर वाले आयत को दिखाता है. इसे दो विकर्ण विपरीत पॉइंट lo और hi के तौर पर दिखाया जाता है:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

इसके अलावा, RouteNote मैसेज, जो किसी समय पर भेजे गए मैसेज को दिखाता है:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

आखिर में, आपको RouteSummary मैसेज की ज़रूरत होगी. यह मैसेज, RecordRoute आरपीसी के जवाब में मिलता है. इसके बारे में अगले सेक्शन में बताया गया है. इसमें मिले अलग-अलग पॉइंट की संख्या, पहचानी गई सुविधाओं की संख्या, और तय की गई कुल दूरी शामिल होती है. यह दूरी, हर पॉइंट के बीच की दूरी के कुल योग के तौर पर होती है.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

सेवा देने के तरीके तय करना

किसी सेवा को तय करने के लिए, अपनी .proto फ़ाइल में सेवा का नाम डालें. route_guide.proto फ़ाइल में service स्ट्रक्चर होता है, जिसका नाम RouteGuide होता है. यह ऐप्लिकेशन की सेवा के ज़रिए उपलब्ध कराए गए एक या उससे ज़्यादा तरीकों के बारे में बताता है.

सेवा की परिभाषा में RPC तरीके तय करते समय, उनके अनुरोध और जवाब के टाइप तय किए जाते हैं. इस कोडलैब के इस सेक्शन में, हम इन चीज़ों के बारे में जानेंगे:

ListFeatures

यह फ़ंक्शन, दिए गए Rectangle में मौजूद Feature ऑब्जेक्ट को हासिल करता है. नतीजे एक साथ नहीं दिखाए जाते, बल्कि स्ट्रीम किए जाते हैं. ऐसा इसलिए, क्योंकि रेक्टैंगल में बहुत बड़ा इलाका शामिल हो सकता है और इसमें कई सुविधाएं हो सकती हैं.

इस ऐप्लिकेशन के लिए, सर्वर-साइड स्ट्रीमिंग आरपीसी का इस्तेमाल किया जाएगा: क्लाइंट, सर्वर को अनुरोध भेजता है और उसे मैसेज का क्रम वापस पढ़ने के लिए एक स्ट्रीम मिलती है. क्लाइंट, जवाब के तौर पर मिली स्ट्रीम से तब तक डेटा पढ़ता है, जब तक कोई और मैसेज नहीं मिलता. हमारे उदाहरण में दिखाया गया है कि रिस्पॉन्स टाइप से पहले स्ट्रीम कीवर्ड रखकर, सर्वर-साइड स्ट्रीमिंग के तरीके के बारे में बताया जाता है.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

यह फ़ंक्शन, तय किए गए रास्ते पर मौजूद पॉइंट की स्ट्रीम को स्वीकार करता है. साथ ही, रास्ता पूरा होने पर RouteSummary दिखाता है.

इस मामले में, क्लाइंट-साइड स्ट्रीमिंग आरपीसी सही है: क्लाइंट, मैसेज का क्रम लिखता है और उन्हें सर्वर को भेजता है. इसके लिए, वह उपलब्ध स्ट्रीम का फिर से इस्तेमाल करता है. क्लाइंट के मैसेज लिखने के बाद, वह सर्वर के उन सभी मैसेज को पढ़ने और जवाब देने का इंतज़ार करता है. क्लाइंट-साइड स्ट्रीमिंग के तरीके के बारे में बताने के लिए, स्ट्रीम कीवर्ड को अनुरोध के टाइप से पहले रखा जाता है.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

यह कुकी, रास्ते पर चलते समय भेजे गए RouteNotes की स्ट्रीम को स्वीकार करती है. साथ ही, अन्य RouteNotes (जैसे, अन्य उपयोगकर्ताओं से मिले RouteNotes) को भी स्वीकार करती है.

दोनों दिशाओं में स्ट्रीमिंग का इस्तेमाल इसी तरह के मामलों में किया जाता है. यह दोनों दिशाओं में स्ट्रीम करने वाला आरपीसी है. इसमें दोनों पक्ष, पढ़ने और लिखने की स्ट्रीम का इस्तेमाल करके मैसेज का क्रम भेजते हैं. ये दोनों स्ट्रीम अलग-अलग काम करती हैं. इसलिए, क्लाइंट और सर्वर अपनी पसंद के हिसाब से किसी भी क्रम में पढ़ और लिख सकते हैं. उदाहरण के लिए, सर्वर अपने जवाब लिखने से पहले, क्लाइंट के सभी मैसेज पाने का इंतज़ार कर सकता है. इसके अलावा, वह बारी-बारी से एक मैसेज पढ़ सकता है और फिर एक मैसेज लिख सकता है. इसके अलावा, वह पढ़ने और लिखने के किसी अन्य कॉम्बिनेशन का इस्तेमाल कर सकता है. हर स्ट्रीम में मैसेज का क्रम बना रहता है. इस तरह के तरीके को तय करने के लिए, अनुरोध और जवाब, दोनों से पहले स्ट्रीम कीवर्ड रखा जाता है.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. क्लाइंट और सर्वर कोड जनरेट करना

इसके बाद, हमें अपनी .proto सेवा की परिभाषा से gRPC क्लाइंट और सर्वर इंटरफ़ेस जनरेट करने होंगे. हम ऐसा, खास gRPC Java प्लगिन के साथ प्रोटोकॉल बफ़र कंपाइलर protoc का इस्तेमाल करके करते हैं. gRPC सेवाएं जनरेट करने के लिए, आपको proto3 कंपाइलर का इस्तेमाल करना होगा. यह कंपाइलर, proto2 और proto3, दोनों सिंटैक्स के साथ काम करता है.

Gradle या Maven का इस्तेमाल करते समय, protoc बिल्ड प्लगिन, बिल्ड के हिस्से के तौर पर ज़रूरी कोड जनरेट कर सकता है. अपनी .proto फ़ाइलों से कोड जनरेट करने का तरीका जानने के लिए, grpc-java README पढ़ें.

हमने Gradle कॉन्फ़िगरेशन दिया है.

streaming-grpc-java-getting-started डायरेक्ट्री में जाकर

$ chmod +x gradlew
$ ./gradlew generateProto

नीचे दी गई क्लास, हमारी सेवा की परिभाषा (build/generated/sources/proto/main/java में दी गई) से जनरेट की जाती हैं:

  • हर मैसेज टाइप के लिए एक: Feature.java, Rectangle.java, .... इनमें प्रोटोकॉल बफ़र का पूरा कोड होता है. इसका इस्तेमाल, अनुरोध और जवाब के मैसेज टाइप को भरने, क्रम से लगाने, और वापस पाने के लिए किया जाता है.
  • RouteGuideGrpc.java जिसमें (कुछ अन्य काम के कोड के साथ) RouteGuide सर्वर के लिए एक बेसिक क्लास होती है, ताकि वे RouteGuideGrpc.RouteGuideImplBase को लागू कर सकें. साथ ही, इसमें RouteGuide सेवा में तय किए गए सभी तरीके और क्लाइंट के इस्तेमाल के लिए स्टब क्लास भी होती हैं

5. सेवा लागू करना

सबसे पहले, देखते हैं कि RouteGuide सर्वर कैसे बनाया जाता है. RouteGuide सेवा को काम करने के लिए, दो चीज़ें ज़रूरी होती हैं:

  • हमारी सेवा की परिभाषा से जनरेट किए गए सेवा इंटरफ़ेस को लागू करना: हमारी सेवा का असली "काम" करना.
  • क्लाइंट से मिले अनुरोधों को सुनने और उन्हें सही सेवा लागू करने के लिए, gRPC सर्वर को चालू करना.

RouteGuide को लागू करना

हम एक RouteGuideService क्लास लागू करेंगे, जो जनरेट की गई RouteGuideGrpc.RouteGuideImplBase क्लास को एक्सटेंड करेगी. लागू होने पर यह ऐसा दिखेगा.

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
        ...
}

public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {

        ...
}

public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {

        ...
}

आइए, हर आरपीसी लागू करने के तरीके के बारे में विस्तार से जानते हैं

सर्वर-साइड स्ट्रीमिंग आरपीसी

अब आइए, हमारी एक स्ट्रीमिंग आरपीसी पर नज़र डालते हैं. ListFeatures एक सर्वर-साइड स्ट्रीमिंग आरपीसी है. इसलिए, हमें अपने क्लाइंट को कई Features वापस भेजने होंगे.

private final Collection<Feature> features;

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

सामान्य आरपीसी की तरह, इस तरीके को एक अनुरोध ऑब्जेक्ट (वह Rectangle जिसमें हमारे क्लाइंट को Features ढूंढना है) और एक StreamObserver रिस्पॉन्स ऑब्ज़र्वर मिलता है.

इस बार, हमें क्लाइंट को लौटाने के लिए जितने Feature ऑब्जेक्ट की ज़रूरत होती है उतने मिल जाते हैं. इस मामले में, हम उन्हें सेवा के फ़ीचर कलेक्शन से चुनते हैं. यह इस बात पर निर्भर करता है कि वे हमारे अनुरोध Rectangle में शामिल हैं या नहीं. इसके बाद, हम उन्हें एक-एक करके रिस्पॉन्स ऑब्ज़र्वर में लिखते हैं. इसके लिए, हम उसके onNext() तरीके का इस्तेमाल करते हैं. आखिर में, सामान्य आरपीसी की तरह ही, हम रिस्पॉन्स ऑब्ज़र्वर के onCompleted() तरीके का इस्तेमाल करके gRPC को यह बताते हैं कि हमने रिस्पॉन्स लिखना पूरा कर लिया है.

क्लाइंट-साइड स्ट्रीमिंग आरपीसी

अब हम एक ऐसे तरीके के बारे में बात करते हैं जो थोड़ा मुश्किल है: क्लाइंट-साइड स्ट्रीमिंग का तरीका RecordRoute(). इसमें हमें क्लाइंट से Points की स्ट्रीम मिलती है और हम उनकी यात्रा के बारे में जानकारी के साथ एक RouteSummary वापस भेजते हैं.

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

जैसा कि आप देख सकते हैं, पिछली तरह की मेथड की तरह हमारी मेथड को भी StreamObserver responseObserver पैरामीटर मिलता है. हालांकि, इस बार यह क्लाइंट के लिए StreamObserver दिखाता है, ताकि वह अपना Points लिख सके.

इस तरीके के मुख्य हिस्से में, हम एक अनाम StreamObserver को इंस्टैंशिएट करते हैं, ताकि उसे वापस भेजा जा सके. इसमें हम यह काम करते हैं:

  • क्लाइंट के मैसेज स्ट्रीम में Point लिखने पर, हर बार सुविधाएं और अन्य जानकारी पाने के लिए, onNext() तरीके को बदलें.
  • onCompleted() तरीके को बदलें. इसे तब कॉल किया जाता है, जब क्लाइंट मैसेज लिखना पूरा कर लेता है. इससे हम RouteSummary को भर पाते हैं और बना पाते हैं. इसके बाद, हम अपने तरीके के रिस्पॉन्स ऑब्ज़र्वर के onNext() को अपने RouteSummary के साथ कॉल करते हैं. इसके बाद, सर्वर साइड से कॉल खत्म करने के लिए, इसके onCompleted() तरीके को कॉल करते हैं.

दोनों दिशाओं में डेटा ट्रांसफ़र करने वाली स्ट्रीमिंग आरपीसी

आखिर में, आइए हम दोनों दिशाओं में स्ट्रीम करने वाले आरपीसी RouteChat() पर नज़र डालें.

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

क्लाइंट-साइड स्ट्रीमिंग के उदाहरण की तरह, हमें StreamObserver मिलता है और हम इसे वापस भेजते हैं. हालांकि, इस बार हम अपने तरीके के रिस्पॉन्स ऑब्ज़र्वर के ज़रिए वैल्यू वापस भेजते हैं. वहीं, क्लाइंट अब भी अपनी मैसेज स्ट्रीम में मैसेज लिख रहा है. यहां पढ़ने और लिखने का सिंटैक्स, क्लाइंट-स्ट्रीमिंग और सर्वर-स्ट्रीमिंग के तरीकों के लिए इस्तेमाल किए जाने वाले सिंटैक्स जैसा ही है. दोनों पक्षों को एक-दूसरे के मैसेज, उसी क्रम में मिलते हैं जिस क्रम में उन्हें लिखा गया था. हालांकि, क्लाइंट और सर्वर, मैसेज को किसी भी क्रम में पढ़ और लिख सकते हैं. स्ट्रीम पूरी तरह से स्वतंत्र रूप से काम करती हैं.

सर्वर शुरू करें

सभी तरीकों को लागू करने के बाद, हमें एक gRPC सर्वर भी शुरू करना होगा, ताकि क्लाइंट हमारी सेवा का इस्तेमाल कर सकें. नीचे दिए गए स्निपेट में बताया गया है कि हम अपनी RouteGuide सेवा के लिए ऐसा कैसे करते हैं:

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
}

जैसा कि आप देख सकते हैं, हम ServerBuilder का इस्तेमाल करके अपना सर्वर बनाते हैं और उसे शुरू करते हैं.

इसके लिए:

  1. बिल्डर के forPort() तरीके का इस्तेमाल करके, क्लाइंट के अनुरोधों को सुनने के लिए, वह पता और पोर्ट तय करें जिसका हमें इस्तेमाल करना है.
  2. हमारी सेवा लागू करने वाली क्लास RouteGuideService का एक इंस्टेंस बनाएं और इसे बिल्डर के addService() तरीके से पास करें.
  3. हमारी सेवा के लिए RPC सर्वर बनाने और शुरू करने के लिए, बिल्डर पर build() और start() को कॉल करें.

ServerBuilder में पोर्ट पहले से शामिल होता है. इसलिए, हम पोर्ट को सिर्फ़ लॉगिंग के लिए पास करते हैं.

6. क्लाइंट बनाना

इस सेक्शन में, हम RouteGuide सेवा के लिए क्लाइंट बनाने का तरीका जानेंगे. ../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java में, क्लाइंट कोड का पूरा उदाहरण देखा जा सकता है.

स्टब इंस्टैंशिएट करना

सेवा के तरीकों को कॉल करने के लिए, हमें सबसे पहले एक स्टब बनाना होगा. हालांकि, हमें दो स्टब बनाने होंगे:

  • ब्लॉकिंग/सिंक्रोनस स्टब: इसका मतलब है कि आरपीसी कॉल, सर्वर के जवाब का इंतज़ार करता है. इसके बाद, यह या तो जवाब देगा या अपवाद बढ़ाएगा.
  • नॉन-ब्लॉकिंग/एसिंक्रोनस स्टब, जो सर्वर को नॉन-ब्लॉकिंग कॉल करता है. इसमें जवाब एसिंक्रोनस तरीके से मिलता है. कुछ तरह के स्ट्रीमिंग कॉल सिर्फ़ एसिंक्रोनस स्टब का इस्तेमाल करके किए जा सकते हैं.

सबसे पहले, हमें अपने स्टब के लिए एक gRPC चैनल बनाना होगा. इसके लिए, हमें सर्वर का पता और पोर्ट बताना होगा जिससे हमें कनेक्ट करना है:

  public static void main(String[] args) throws InterruptedException {
    String target = "localhost:8980";
    if (args.length > 0) {
      if ("--help".equals(args[0])) {
        System.err.println("Usage: [target]");
        System.err.println("");
        System.err.println("  target  The server to connect to. Defaults to " + target);
        System.exit(1);
      }
      target = args[0];
    }

    List<Feature> features;
    try {
      features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
        .build();
    try {
      RouteGuideClient client = new RouteGuideClient(channel);

      // Looking for features between 40, -75 and 42, -73.
      client.listFeatures(400000000, -750000000, 420000000, -730000000);

      // Record a few randomly selected points from the features file.
      client.recordRoute(features, 10);

      // Send and receive some notes.
      CountDownLatch finishLatch = client.routeChat();

      if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        client.warning("routeChat did not finish within 1 minutes");
      }
    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }

हम चैनल बनाने के लिए ManagedChannelBuilder का इस्तेमाल करते हैं.

अब हम चैनल का इस्तेमाल करके, अपने स्टब बना सकते हैं. इसके लिए, हमें .proto से जनरेट की गई RouteGuideGrpc क्लास में दिए गए newStub और newBlockingStub तरीकों का इस्तेमाल करना होगा.

public RouteGuideClient(Channel channel) {
    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
  }

ध्यान रखें कि अगर यह सुविधा ब्लॉक नहीं कर रही है, तो यह एसिंक्रोनस है

कॉल करने की सेवा के तरीके

अब देखते हैं कि हम अपनी सेवा के तरीकों को कैसे कॉल करते हैं. ध्यान दें कि ब्लॉकिंग स्टब से बनाए गए सभी आरपीसी, ब्लॉकिंग/सिंक्रोनस मोड में काम करेंगे. इसका मतलब है कि आरपीसी कॉल, सर्वर के जवाब देने का इंतज़ार करता है. इसके बाद, यह जवाब या गड़बड़ी दिखाता है.

सर्वर-साइड स्ट्रीमिंग आरपीसी

अब, आइए ListFeatures को किए गए सर्वर-साइड स्ट्रीमिंग कॉल पर एक नज़र डालते हैं. यह कॉल, भौगोलिक Feature की स्ट्रीम दिखाता है:

Rectangle request = Rectangle.newBuilder()
             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

जैसा कि आप देख सकते हैं, यह Getting_Started_With_gRPC_Java कोडलैब में बताए गए सामान्य यूनरी आरपीसी के जैसा ही है. हालांकि, इसमें एक Feature के बजाय, यह तरीका एक Iterator दिखाता है. इसका इस्तेमाल करके क्लाइंट, दिखाए गए सभी Features को पढ़ सकता है.

क्लाइंट-साइड स्ट्रीमिंग आरपीसी

अब कुछ और मुश्किल चीज़ों के बारे में जानते हैं: क्लाइंट-साइड स्ट्रीमिंग का तरीका RecordRoute. इसमें हम सर्वर को Points की एक स्ट्रीम भेजते हैं और हमें एक RouteSummary वापस मिलता है. इस तरीके के लिए, हमें एसिंक्रोनस स्टब का इस्तेमाल करना होगा. अगर आपने सर्वर बनाना लेख पढ़ लिया है, तो आपको इसमें दी गई कुछ जानकारी जानी-पहचानी लग सकती है. ऐसा इसलिए, क्योंकि दोनों तरफ़ एसिंक्रोनस स्ट्रीमिंग आरपीसी को एक ही तरीके से लागू किया जाता है.

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {

    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

जैसा कि आप देख सकते हैं, इस तरीके को कॉल करने के लिए हमें एक StreamObserver बनाना होगा. यह सर्वर के लिए एक खास इंटरफ़ेस लागू करता है, ताकि वह RouteSummary जवाब के साथ कॉल कर सके. हम StreamObserver में:

  • जब सर्वर, मैसेज स्ट्रीम में RouteSummary लिखता है, तब जवाब के तौर पर मिली जानकारी को प्रिंट करने के लिए, onNext() तरीके को बदलें.
  • onCompleted() तरीके को बदलें. इसे तब कॉल किया जाता है, जब सर्वर कॉल पूरा कर लेता है. इससे CountDownLatch को कम किया जा सकता है, ताकि हम यह देख सकें कि सर्वर ने लिखना पूरा कर लिया है या नहीं.

इसके बाद, हम StreamObserver को असिंक्रोनस स्टब के recordRoute() तरीके पर भेजते हैं. साथ ही, सर्वर को भेजने के लिए अपना Points लिखने के लिए, हमें अपना StreamObserver अनुरोध ऑब्ज़र्वर वापस मिल जाता है. पॉइंट लिखने के बाद, हम अनुरोध करने वाले व्यक्ति के onCompleted() तरीके का इस्तेमाल करके gRPC को बताते हैं कि हमने क्लाइंट साइड पर लिखना पूरा कर लिया है. इसके बाद, हम अपने CountDownLatch को देखते हैं कि सर्वर ने अपनी ओर से प्रोसेस पूरी कर ली है या नहीं.

दोनों दिशाओं में डेटा ट्रांसफ़र करने वाली स्ट्रीमिंग आरपीसी

आखिर में, आइए हम दोनों दिशाओं में स्ट्रीम करने वाले आरपीसी RouteChat() पर नज़र डालें.

public CountDownLatch routeChat() {
    info("*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat(new StreamObserver<RouteNote>() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          }

          @Override
          public void onError(Throwable t) {
            warning("RouteChat Failed: {0}", Status.fromThrowable(t));
            finishLatch.countDown();
          }

          @Override
          public void onCompleted() {
            info("Finished RouteChat");
            finishLatch.countDown();
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
              newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // return the latch while receiving happens asynchronously
    return finishLatch;
  }

क्लाइंट-साइड स्ट्रीमिंग के उदाहरण की तरह, हमें StreamObserver रिस्पॉन्स ऑब्ज़र्वर मिलता है और हम उसे वापस भेजते हैं. हालांकि, इस बार हम अपनी विधि के रिस्पॉन्स ऑब्ज़र्वर के ज़रिए वैल्यू भेजते हैं. वहीं, सर्वर अब भी अपनी मैसेज स्ट्रीम में मैसेज लिख रहा है. यहां पढ़ने और लिखने का सिंटैक्स, क्लाइंट-स्ट्रीमिंग के तरीके के लिए इस्तेमाल किए जाने वाले सिंटैक्स जैसा ही है. दोनों पक्षों को एक-दूसरे के मैसेज, उसी क्रम में मिलते हैं जिस क्रम में उन्हें लिखा गया था. हालांकि, क्लाइंट और सर्वर, मैसेज को किसी भी क्रम में पढ़ और लिख सकते हैं. स्ट्रीम पूरी तरह से स्वतंत्र रूप से काम करती हैं.

7. इसे आज़माएं!

  1. start_here डायरेक्ट्री से:
$ ./gradlew installDist

इससे आपका कोड कंपाइल हो जाएगा. साथ ही, इसे जार में पैकेज कर दिया जाएगा. इसके अलावा, उदाहरण को चलाने वाली स्क्रिप्ट भी बन जाएंगी. ये build/install/start_here/bin/ डायरेक्ट्री में बनाए जाएंगे. स्क्रिप्ट route-guide-server और route-guide-client हैं.

क्लाइंट शुरू करने से पहले, सर्वर चालू होना चाहिए.

  1. सर्वर चलाएं:
$ ./build/install/start_here/bin/route-guide-server
  1. क्लाइंट चलाएं:
$ ./build/install/start_here/bin/route-guide-client

8. आगे क्या करना है