Getting Started with gRPC-Java - Streaming

‫1. مقدمة

في هذا الدرس التطبيقي حول الترميز، ستستخدم gRPC-Java لإنشاء برنامج خادم وبرنامج عميل يشكّلان أساس تطبيق لربط المسارات مكتوب بلغة Java.

في نهاية هذا البرنامج التعليمي، سيكون لديك برنامج يتصل بخادم بعيد باستخدام gRPC للحصول على معلومات حول الميزات على مسار العميل، وإنشاء ملخّص لمسار العميل، وتبادل معلومات المسار، مثل آخر الأخبار عن حركة المرور، مع الخادم والبرامج الأخرى.

يتم تحديد الخدمة في ملف Protocol Buffers، وسيتم استخدام هذا الملف لإنشاء رمز نموذجي للبرنامج العميل والخادم حتى يتمكّنا من التواصل مع بعضهما البعض، ما يوفّر عليك الوقت والجهد في تنفيذ هذه الوظيفة.

لا يهتم هذا الرمز الذي تم إنشاؤه بتعقيدات الاتصال بين الخادم والعميل فحسب، بل أيضًا بتسلسل البيانات وإلغاء تسلسلها.

أهداف الدورة التعليمية

  • كيفية استخدام مخزن البروتوكولات المؤقت لتحديد واجهة برمجة تطبيقات الخدمة
  • كيفية إنشاء برنامج عميل وخادم يستندان إلى gRPC من تعريف Protocol Buffers باستخدام إنشاء الرموز البرمجية المبرمَج
  • فهم عملية نقل البيانات المتدفقة بين العميل والخادم باستخدام gRPC

هذا الدرس التطبيقي حول الترميز مخصّص لمطوّري Java الجدد في gRPC أو الذين يريدون مراجعة gRPC، أو أي شخص آخر مهتم بإنشاء أنظمة موزّعة. لا يُشترط توفّر خبرة سابقة في gRPC.

2. قبل البدء

المتطلبات الأساسية

  • الإصدار 24 من JDK

الحصول على الشفرة‏

ولكي لا تضطر إلى البدء من الصفر تمامًا، يوفّر لك هذا الدرس التطبيقي حول الترميز بنية أساسية لرمز المصدر الخاص بالتطبيق لتتمكّن من إكماله. ستوضّح لك الخطوات التالية كيفية إكمال التطبيق، بما في ذلك استخدام مكوّنات برنامج تجميع بروتوكول المخزن المؤقت لإنشاء رمز gRPC النموذجي.

أولاً، أنشئ دليل عمل الدرس التطبيقي وادخله:

mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started

نزِّل الدرس التطبيقي حول الترميز واستخرِجه:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-java-streaming/start_here

بدلاً من ذلك، يمكنك تنزيل ملف ‎ .zip الذي يحتوي على دليل الدرس العملي فقط وفك ضغطه يدويًا.

يتوفّر رمز المصدر المكتمل على GitHub إذا أردت تخطّي كتابة عملية التنفيذ.

3- تحديد الرسائل والخدمات

تتمثّل خطوتك الأولى في تحديد خدمة gRPC للتطبيق وطريقة RPC وأنواع رسائل الطلبات والاستجابات باستخدام بروتوكول المخازن المؤقتة. ستوفّر خدمتك ما يلي:

  • طُرق استدعاء الإجراء عن بُعد (RPC) التي تحمل الأسماء ListFeatures وRecordRoute وRouteChat والتي ينفّذها الخادم ويستدعيها العميل
  • أنواع الرسائل Point وFeature وRectangle وRouteNote وRouteSummary، وهي عبارة عن بنى بيانات يتم تبادلها بين العميل والخادم عند استدعاء الطرق المذكورة أعلاه.

يُشار إلى Protocol Buffers عادةً باسم protobufs. لمزيد من المعلومات عن مصطلحات gRPC، يُرجى الاطّلاع على المفاهيم الأساسية والبنية ودورة الحياة في gRPC.

سيتم تحديد طريقة RPC هذه وأنواع الرسائل الخاصة بها في ملف proto/routeguide/route_guide.proto الخاص برمز المصدر المقدَّم.

لننشئ ملف route_guide.proto.

بما أنّنا ننشئ رمز Java في هذا المثال، حدّدنا خيار الملف java_package في .proto:

option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";

تحديد أنواع الرسائل

في ملف proto/routeguide/route_guide.proto الخاص بالرمز المصدر، حدِّد أولاً نوع الرسالة Point. يمثّل Point زوج إحداثيات خط العرض وخط الطول على الخريطة. في هذا الدرس العملي، استخدِم أعدادًا صحيحة للإحداثيات:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

الرقمان 1 و2 هما رقما تعريف فريدان لكل حقل من الحقول في بنية message.

بعد ذلك، حدِّد Feature نوع الرسالة. يستخدم Feature الحقل string للاسم أو العنوان البريدي الخاص بمكان محدّد من خلال Point:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

لكي يتم بث نقاط متعددة ضمن منطقة إلى أحد العملاء، ستحتاج إلى رسالة Rectangle تمثّل مستطيلاً لخطوط الطول والعرض، ويتم تمثيله كنقطتَين متقابلتَين قطريًا lo وhi:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

بالإضافة إلى ذلك، رسالة RouteNote تمثّل رسالة تم إرسالها في وقت محدّد:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

أخيرًا، ستحتاج إلى رسالة RouteSummary. يتم تلقّي هذه الرسالة ردًا على طلب إجراء RecordRoute RPC، ويتم توضيح ذلك في القسم التالي. يحتوي هذا الملف على عدد النقاط الفردية التي تم تلقّيها، وعدد الميزات التي تم رصدها، وإجمالي المسافة المقطوعة كمجموع تراكمي للمسافة بين كل نقطة.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

تحديد طرق الخدمة

لتحديد خدمة، عليك تحديد خدمة مسماة في ملف .proto. يحتوي ملف route_guide.proto على بنية service باسم RouteGuide تحدّد طريقة واحدة أو أكثر توفّرها خدمة التطبيق.

عند تحديد طرق RPC داخل تعريف الخدمة، عليك تحديد أنواع الطلبات والاستجابات. في هذا القسم من الدرس العملي، لنحدّد ما يلي:

ListFeatures

يحصل على عناصر Feature المتاحة ضمن Rectangle المحدّد. يتم بث النتائج بدلاً من عرضها دفعة واحدة لأنّ المستطيل قد يغطي مساحة كبيرة ويحتوي على عدد كبير من العناصر.

بالنسبة إلى هذا التطبيق، ستستخدم استدعاء إجراء عن بُعد (RPC) للبث من جهة الخادم: يرسل العميل طلبًا إلى الخادم ويتلقّى بثًا لقراءة سلسلة من الرسائل. يقرأ العميل من البث الذي تم إرجاعه إلى أن لا تتبقى أي رسائل. كما ترى في مثالنا، يمكنك تحديد طريقة البث من جهة الخادم عن طريق وضع الكلمة الرئيسية stream قبل نوع الاستجابة.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

تقبل هذه السمة مجموعة من النقاط على مسار يتم اجتيازه، وتعرض RouteSummary عند اكتمال عملية الاجتياز.

في هذه الحالة، يكون طلب إجراء مكالمة RPC ببث من جهة العميل مناسبًا: يكتب العميل سلسلة من الرسائل ويرسلها إلى الخادم، وذلك باستخدام بث يتم توفيره مرة أخرى. بعد أن ينتهي العميل من كتابة الرسائل، ينتظر أن يقرأها الخادم كلها ويرسل رده. يمكنك تحديد طريقة البث من جهة العميل عن طريق وضع الكلمة الرئيسية stream قبل نوع الطلب.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

يقبل هذا النوع من الطلبات بثًا من RouteNotes يتم إرساله أثناء التنقّل في مسار، مع تلقّي RouteNotes أخرى (مثل تلك الواردة من مستخدمين آخرين).

هذا هو بالضبط نوع حالة الاستخدام للبث الثنائي الاتجاه. إجراء استدعاء إجراء عن بُعد (RPC) لبث البيانات في اتجاهين، حيث يرسل كلا الجانبين سلسلة من الرسائل باستخدام بث للقراءة والكتابة يعمل كل من هذين التدفقين بشكل مستقل، لذا يمكن للعملاء والخوادم القراءة والكتابة بأي ترتيب يريدونه: على سبيل المثال، يمكن للخادم الانتظار لتلقّي جميع رسائل العميل قبل كتابة ردوده، أو يمكنه بدلاً من ذلك قراءة رسالة ثم كتابة رسالة، أو أي مجموعة أخرى من عمليات القراءة والكتابة. يتم الحفاظ على ترتيب الرسائل في كل بث. يمكنك تحديد هذا النوع من الطرق من خلال وضع الكلمة الرئيسية stream قبل كلّ من الطلب والاستجابة.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

‫4. إنشاء رمز العميل والخادم

بعد ذلك، نحتاج إلى إنشاء واجهات خادم وعميل gRPC من تعريف خدمة .proto. وننفّذ ذلك باستخدام برنامج تجميع بروتوكول buffer protoc مع مكوّن إضافي خاص بلغة Java في gRPC. عليك استخدام برنامج التجميع proto3 (الذي يتوافق مع كل من بنية proto2 وproto3) من أجل إنشاء خدمات gRPC.

عند استخدام Gradle أو Maven، يمكن لمكوّن protoc الإضافي للإنشاء إنشاء الرمز البرمجي اللازم كجزء من عملية الإنشاء. يمكنك الرجوع إلى ملف README الخاص بـ grpc-java لمعرفة كيفية إنشاء الرمز من ملفات .proto الخاصة بك.

لقد قدّمنا إعدادات Gradle.

من دليل streaming-grpc-java-getting-started، أدخِل

$ chmod +x gradlew
$ ./gradlew generateProto

يتم إنشاء الفئات التالية من تعريف الخدمة (ضمن build/generated/sources/proto/main/java):

  • ملف واحد لكل نوع رسالة: Feature.java وRectangle.java, ...، يحتويان على كل رمز بروتوكول المخزن المؤقت لتعبئة أنواع رسائل الطلبات والردود وتسلسلها واستردادها.
  • RouteGuideGrpc.java الذي يحتوي (إلى جانب بعض الرموز المفيدة الأخرى) على فئة أساسية للخوادم RouteGuide لتنفيذها، RouteGuideGrpc.RouteGuideImplBase، مع جميع الطرق المحددة في فئات الخدمة RouteGuide وفئات الرمز الأساسي التي يمكن للعملاء استخدامها

5- تنفيذ الخدمة

لنلقِ نظرة أولاً على كيفية إنشاء خادم RouteGuide. تتضمّن عملية إنجاز خدمة RouteGuide لمهامها جزأين:

  • تنفيذ واجهة الخدمة التي تم إنشاؤها من تعريف الخدمة: تنفيذ "العمل" الفعلي لخدمتنا
  • تشغيل خادم gRPC للاستماع إلى الطلبات من العملاء وإرسالها إلى تنفيذ الخدمة المناسب

تنفيذ RouteGuide

سننفّذ فئة RouteGuideService ستوسّع فئة RouteGuideGrpc.RouteGuideImplBase التي تم إنشاؤها. إليك كيف سيبدو التنفيذ.

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
        ...
}

public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {

        ...
}

public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {

        ...
}

دعونا نلقي نظرة على كل عملية تنفيذ لطلب إجراء عن بُعد بالتفصيل.

Server-side streaming RPC

لنلقِ نظرة الآن على أحد إجراءات RPC الخاصة بالبث. ‫ListFeatures هو إجراء عن بُعد (RPC) لبث البيانات من جهة الخادم، لذا علينا إرسال عدة Features إلى العميل.

private final Collection<Feature> features;

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

كما هو الحال مع RPC البسيط، تحصل هذه الطريقة على عنصر طلب (Rectangle الذي يريد العميل العثور على Features فيه) ومراقب استجابة StreamObserver.

في هذه المرة، نحصل على أكبر عدد ممكن من عناصر Feature التي نحتاج إليها لإرجاعها إلى العميل (في هذه الحالة، نختارها من مجموعة ميزات الخدمة استنادًا إلى ما إذا كانت داخل طلبنا Rectangle)، ونكتبها بالتناوب في مراقب الاستجابة باستخدام طريقة onNext(). أخيرًا، كما هو الحال في RPC البسيط، نستخدم طريقة onCompleted() الخاصة بمراقب الردود لإعلام gRPC بأنّنا انتهينا من كتابة الردود.

RPC للبث من جهة العميل

لنلقِ نظرة الآن على طريقة أكثر تعقيدًا: طريقة البث من جهة العميل RecordRoute()، حيث نحصل على بث من Points من العميل ونعرض RouteSummary واحدًا يتضمّن معلومات عن رحلته.

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

كما ترى، مثل أنواع الطرق السابقة، تتلقّى طريقتنا المَعلمة StreamObserver responseObserver، ولكنها تعرض هذه المرة StreamObserver ليتمكّن العميل من كتابة Points.

في نص الدالة، ننشئ StreamObserver مجهول الاسم لعرضه، حيث:

  • يمكنك إلغاء طريقة onNext() للحصول على الميزات والمعلومات الأخرى في كل مرة يكتب فيها العميل Point في بث الرسائل.
  • تجاوز طريقة onCompleted() (يتم استدعاؤها عندما ينتهي العميل من كتابة الرسائل) لتعبئة وإنشاء RouteSummary. بعد ذلك، نستدعي onNext() مراقب الردود الخاص بطريقتنا مع RouteSummary، ثم نستدعي الطريقة onCompleted() لإنهاء المكالمة من جهة الخادم.

إجراءات RPC للبث الثنائي الاتجاه

أخيرًا، لنلقِ نظرة على RouteChat() RPC ثنائي الاتجاه.

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

كما هو الحال مع مثال البث من جهة العميل، نحصل على StreamObserver ونعرضه، ولكن هذه المرة نعرض القيم من خلال مراقب الردّ الخاص بالطريقة بينما لا يزال العميل يكتب الرسائل إلى بث الرسائل الخاص به. إنّ بنية القراءة والكتابة هنا هي نفسها تمامًا كما هو الحال في طريقتَي البث من الخادم والبث من العميل. على الرغم من أنّ كل طرف سيتلقّى دائمًا رسائل الطرف الآخر بالترتيب الذي تمت كتابتها به، يمكن لكل من العميل والخادم القراءة والكتابة بأي ترتيب، لأنّ عمليات البث تعمل بشكل مستقل تمامًا.

ابدأ الخادم

بعد تنفيذ جميع طرقنا، نحتاج أيضًا إلى بدء تشغيل خادم gRPC حتى يتمكّن العملاء من استخدام خدمتنا. يوضّح المقتطف التالي كيفية إجراء ذلك لخدمة RouteGuide:

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
}

كما ترى، ننشئ خادمنا ونبدأ تشغيله باستخدام ServerBuilder.

لإجراء ذلك، ننفّذ ما يلي:

  1. حدِّد العنوان والمنفذ اللذين نريد استخدامهما لتلقّي طلبات العميل باستخدام طريقة forPort() الخاصة بأداة الإنشاء.
  2. أنشئ مثيلاً لفئة تنفيذ الخدمة RouteGuideService وأدخِله في طريقة addService() الخاصة بأداة الإنشاء.
  3. استدعِ الدالتَين build() وstart() في أداة الإنشاء لإنشاء خادم استدعاء إجراء عن بُعد (RPC) وبدء تشغيله لخدمتنا.

بما أنّ ServerBuilder يتضمّن المنفذ، السبب الوحيد الذي يجعلنا نمرّر منفذًا هو استخدامه في التسجيل.

6. إنشاء العميل

في هذا القسم، سنتعرّف على كيفية إنشاء برنامج لخدمة RouteGuide. يمكنك الاطّلاع على نموذج رمز العميل الكامل في ../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java.

إنشاء نسخة من عنصر نائب

لاستدعاء طرق الخدمة، علينا أولاً إنشاء رمز بديل، أو بالأحرى رمزان بديلان:

  • برنامج حظر/تزامن: يعني ذلك أنّ طلب RPC ينتظر رد الخادم، وسيعرض إما ردًا أو سيحدث خطأ.
  • دالة غير حظر/غير متزامنة تنفّذ طلبات غير حظر إلى الخادم، ويتم عرض الردّ بشكل غير متزامن. يمكنك إجراء أنواع معيّنة من مكالمات البث فقط باستخدام رمز بديل غير متزامن.

علينا أولاً إنشاء قناة gRPC للرمز الأساسي، وتحديد عنوان الخادم والمنفذ اللذين نريد الاتصال بهما:

  public static void main(String[] args) throws InterruptedException {
    String target = "localhost:8980";
    if (args.length > 0) {
      if ("--help".equals(args[0])) {
        System.err.println("Usage: [target]");
        System.err.println("");
        System.err.println("  target  The server to connect to. Defaults to " + target);
        System.exit(1);
      }
      target = args[0];
    }

    List<Feature> features;
    try {
      features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
        .build();
    try {
      RouteGuideClient client = new RouteGuideClient(channel);

      // Looking for features between 40, -75 and 42, -73.
      client.listFeatures(400000000, -750000000, 420000000, -730000000);

      // Record a few randomly selected points from the features file.
      client.recordRoute(features, 10);

      // Send and receive some notes.
      CountDownLatch finishLatch = client.routeChat();

      if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        client.warning("routeChat did not finish within 1 minutes");
      }
    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }

نستخدم ManagedChannelBuilder لإنشاء القناة.

يمكننا الآن استخدام القناة لإنشاء رموز تجريبية باستخدام الطريقتَين newStub وnewBlockingStub المتوفّرتَين في الفئة RouteGuideGrpc التي أنشأناها من .proto.

public RouteGuideClient(Channel channel) {
    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
  }

تذكَّر أنّه إذا لم يكن الحظر متزامنًا، سيكون غير متزامن

طُرق استدعاء الخدمات

لنلقِ نظرة الآن على كيفية استدعاء طرق الخدمة. يُرجى العِلم أنّ أي استدعاءات إجراء بعيد (RPC) يتم إنشاؤها من جذع الحظر ستعمل في وضع الحظر/التزامن، ما يعني أنّ استدعاء الإجراء البعيد ينتظر استجابة الخادم، وسيعرض إما استجابة أو خطأ.

Server-side streaming RPC

بعد ذلك، لنلقِ نظرة على طلب بث من جهة الخادم إلى ListFeatures، والذي يعرض بثًا Feature جغرافيًا:

Rectangle request = Rectangle.newBuilder()
             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

كما ترى، يشبه هذا النوع إلى حد كبير عملية RPC الأحادية البسيطة التي تناولناها في برنامج Getting_Started_With_gRPC_Java التعليمي، ولكن بدلاً من عرض Feature واحدة، تعرض الطريقة Iterator يمكن للعميل استخدامها لقراءة كل Features المعروضة.

RPC للبث من جهة العميل

والآن، لننتقل إلى طريقة أكثر تعقيدًا: طريقة البث من جهة العميل RecordRoute، حيث نرسل مجموعة من Points إلى الخادم ونتلقّى RouteSummary واحدًا. بالنسبة إلى هذه الطريقة، يجب استخدام عنصر نائب غير متزامن. إذا سبق لك قراءة إنشاء الخادم، قد يبدو لك بعض هذا المحتوى مألوفًا جدًا، إذ يتم تنفيذ طلبات RPC المتدفقة غير المتزامنة بطريقة مشابهة على كلا الجانبين.

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {

    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

كما ترى، لاستدعاء هذه الطريقة، علينا إنشاء StreamObserver، والذي ينفّذ واجهة خاصة للخادم لاستدعائها باستخدام استجابته RouteSummary. في StreamObserver، نعمل على:

  • تجاهل طريقة onNext() لطباعة المعلومات التي تم إرجاعها عندما يكتب الخادم RouteSummary إلى دفق الرسائل.
  • تجاوز طريقة onCompleted() (يتم استدعاؤها عندما يكمل الخادم المكالمة من جهته) لتقليل CountDownLatch حتى نتمكّن من التحقّق مما إذا كان الخادم قد انتهى من الكتابة.

بعد ذلك، نمرّر StreamObserver إلى طريقة recordRoute() في العنصر النائب غير المتزامن ونستردّ مراقب طلب StreamObserver الخاص بنا لكتابة Points لإرساله إلى الخادم. بعد الانتهاء من كتابة النقاط، نستخدم طريقة onCompleted() الخاصة بمراقب الطلب لإعلام gRPC بأنّنا انتهينا من الكتابة على جهة العميل. بعد الانتهاء، نتحقّق من CountDownLatch لمعرفة ما إذا كان الخادم قد أكمل العملية من جهته.

إجراءات RPC للبث الثنائي الاتجاه

أخيرًا، لنلقِ نظرة على RouteChat() RPC ثنائي الاتجاه.

public CountDownLatch routeChat() {
    info("*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat(new StreamObserver<RouteNote>() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          }

          @Override
          public void onError(Throwable t) {
            warning("RouteChat Failed: {0}", Status.fromThrowable(t));
            finishLatch.countDown();
          }

          @Override
          public void onCompleted() {
            info("Finished RouteChat");
            finishLatch.countDown();
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
              newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // return the latch while receiving happens asynchronously
    return finishLatch;
  }

كما هو الحال مع مثال البث من جهة العميل، نحصل على مراقب ردود ونعرضه، ولكن هذه المرة نرسل القيم من خلال مراقب الردود الخاص بالطريقة بينما لا يزال الخادم يكتب الرسائل إلى بث الرسائل الخاص به.StreamObserver إنّ بناء الجملة الخاص بالقراءة والكتابة هنا هو نفسه تمامًا كما هو الحال في طريقة البث من جهة العميل. على الرغم من أنّ كل طرف سيتلقّى دائمًا رسائل الطرف الآخر بالترتيب الذي تمت كتابتها به، يمكن لكل من العميل والخادم القراءة والكتابة بأي ترتيب، لأنّ عمليات البث تعمل بشكل مستقل تمامًا.

7. ننصحكم بتجربتها.

  1. من دليل start_here:
$ ./gradlew installDist

سيؤدي ذلك إلى تجميع الرمز البرمجي وتعبئته في ملف jar وإنشاء البرامج النصية التي تشغّل المثال. سيتم إنشاؤها في دليل build/install/start_here/bin/. البرنامجان النصيان هما: route-guide-server وroute-guide-client.

يجب أن يكون الخادم قيد التشغيل قبل بدء تشغيل العميل.

  1. شغِّل الخادم:
$ ./build/install/start_here/bin/route-guide-server
  1. شغِّل العميل:
$ ./build/install/start_here/bin/route-guide-client

8. الخطوات التالية