شروع به کار با gRPC-Java - Streaming

1. مقدمه

در این کد لبه، شما از gRPC-Java برای ایجاد یک سرویس گیرنده و سرور استفاده خواهید کرد که پایه و اساس برنامه نقشه برداری مسیر نوشته شده در جاوا را تشکیل می دهد.

در پایان آموزش، شما یک کلاینت خواهید داشت که با استفاده از gRPC به یک سرور راه دور متصل می شود تا اطلاعاتی در مورد ویژگی های مسیر کلاینت به دست آورد، خلاصه ای از مسیر مشتری ایجاد کند و اطلاعات مسیر مانند به روز رسانی ترافیک را با سرور و سایر کلاینت ها مبادله کند.

این سرویس در یک فایل Protocol Buffers تعریف شده است که از آن برای تولید کد boilerplate برای کلاینت و سرور استفاده می شود تا بتوانند با یکدیگر ارتباط برقرار کنند و در زمان و تلاش شما در اجرای آن عملکرد صرفه جویی شود.

این کد تولید شده نه تنها از پیچیدگی های ارتباط بین سرور و کلاینت، بلکه سریال سازی و سریال سازی داده ها نیز مراقبت می کند.

چیزی که یاد خواهید گرفت

  • نحوه استفاده از بافرهای پروتکل برای تعریف API سرویس.
  • نحوه ساخت یک کلاینت و سرور مبتنی بر gRPC از تعریف بافرهای پروتکل با استفاده از تولید کد خودکار.
  • درک ارتباط جریان مشتری-سرور با gRPC.

این کد لبه برای توسعه دهندگان جاوا که تازه به gRPC می پردازند یا به دنبال تازه سازی gRPC هستند یا هرکس دیگری که علاقه مند به ساختن سیستم های توزیع شده است، طراحی شده است. هیچ تجربه قبلی gRPC مورد نیاز نیست.

2. قبل از شروع

پیش نیازها

  • JDK نسخه 24.

کد را دریافت کنید

برای اینکه مجبور نباشید به طور کامل از ابتدا شروع کنید، این کد لبه داربستی از کد منبع برنامه را برای شما فراهم می کند تا آن را تکمیل کنید. مراحل زیر به شما نشان می دهد که چگونه برنامه را تکمیل کنید، از جمله استفاده از افزونه های کامپایلر بافر پروتکل برای تولید کد gRPC دیگ بخار.

ابتدا پوشه کاری codelab و cd را در آن ایجاد کنید:

mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started

کد لبه را دانلود و استخراج کنید:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-java-streaming/start_here

یا می توانید فایل .zip را که فقط شامل دایرکتوری Codelab است دانلود کرده و به صورت دستی آن را از حالت فشرده خارج کنید.

اگر می خواهید از تایپ کردن در یک پیاده سازی صرفنظر کنید، کد منبع تکمیل شده در GitHub موجود است.

3. پیام ها و خدمات را تعریف کنید

اولین قدم شما این است که سرویس gRPC برنامه، روش RPC آن، و انواع پیام درخواست و پاسخ آن را با استفاده از Protocol Buffers تعریف کنید. خدمات شما ارائه خواهد کرد:

  • متدهای RPC به نام ListFeatures ، RecordRoute و RouteChat که سرور پیاده سازی می کند و کلاینت فراخوانی می کند.
  • انواع پیام Point ، Feature ، Rectangle ، RouteNote و RouteSummary که ساختارهای داده ای هستند که هنگام فراخوانی روش های بالا بین مشتری و سرور رد و بدل می شوند.

بافرهای پروتکل معمولاً به عنوان پروتوباف شناخته می شوند. برای اطلاعات بیشتر در مورد اصطلاحات gRPC، به مفاهیم اصلی، معماری و چرخه حیات gRPC مراجعه کنید.

این روش RPC و انواع پیام های آن در فایل proto/routeguide/route_guide.proto کد منبع ارائه شده تعریف می شوند.

بیایید یک فایل route_guide.proto ایجاد کنیم.

همانطور که در این مثال در حال تولید کد جاوا هستیم، یک گزینه فایل java_package در .proto خود مشخص کرده ایم:

option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";

تعریف انواع پیام

در فایل proto/routeguide/route_guide.proto کد منبع ابتدا نوع پیام Point را تعریف کنید. یک Point نشان دهنده یک جفت مختصات طول و عرض جغرافیایی بر روی نقشه است. برای این کد، از اعداد صحیح برای مختصات استفاده کنید:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

اعداد 1 و 2 شماره های شناسه منحصر به فرد برای هر یک از فیلدهای ساختار message هستند.

سپس نوع پیام Feature را تعریف کنید. یک Feature از یک فیلد string برای نام یا آدرس پستی چیزی در مکانی مشخص شده توسط یک Point استفاده می کند:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

برای اینکه چندین نقطه در یک منطقه را بتوان برای یک مشتری پخش کرد، به یک پیام Rectangle نیاز دارید که نشان دهنده یک مستطیل طول و عرض جغرافیایی است که به صورت دو نقطه مورب مخالف lo و hi نمایش داده می شود:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

همچنین، یک پیام RouteNote که نشان دهنده پیامی است که در یک نقطه مشخص ارسال شده است:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

در نهایت، به یک پیام RouteSummary نیاز دارید. این پیام در پاسخ به یک RecordRoute RPC دریافت می شود که در قسمت بعدی توضیح داده شده است. این شامل تعداد نقاط دریافتی جداگانه، تعداد ویژگی های شناسایی شده، و کل مسافت طی شده به عنوان مجموع تجمعی فاصله بین هر نقطه است.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

روش های خدمات را تعریف کنید

برای تعریف یک سرویس، یک سرویس با نام را در فایل .proto خود مشخص می کنید. فایل route_guide.proto دارای یک ساختار service به نام RouteGuide است که یک یا چند روش ارائه شده توسط سرویس برنامه را تعریف می کند.

هنگامی که روش های RPC را در تعریف سرویس خود تعریف می کنید، نوع درخواست و پاسخ آنها را مشخص می کنید. در این بخش از Codelab، اجازه دهید تعریف کنیم:

لیست ویژگی ها

اشیاء Feature موجود در Rectangle داده شده را به دست می آورد. نتایج به‌جای بازگردانی یک‌باره پخش می‌شوند، زیرا مستطیل ممکن است منطقه بزرگی را پوشش دهد و دارای تعداد زیادی ویژگی باشد.

برای این برنامه، شما از یک RPC استریم سمت سرور استفاده خواهید کرد: کلاینت درخواستی را به سرور ارسال می کند و جریانی برای خواندن دنباله ای از پیام ها دریافت می کند. مشتری از جریان برگشتی می خواند تا زمانی که پیام دیگری وجود نداشته باشد. همانطور که در مثال ما می بینید، با قرار دادن کلمه کلیدی جریان قبل از نوع پاسخ، یک روش پخش سمت سرور را مشخص می کنید.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

جریانی از نقاط را در مسیری که در حال پیمایش است می پذیرد، پس از تکمیل پیمایش، یک RouteSummary برمی گرداند.

یک RPC استریم سمت کلاینت در این مورد مناسب است: مشتری دنباله ای از پیام ها را می نویسد و آنها را دوباره با استفاده از جریان ارائه شده به سرور ارسال می کند. هنگامی که مشتری نوشتن پیام ها را به پایان رساند، منتظر می ماند تا سرور همه آنها را بخواند و پاسخ خود را برگرداند. شما با قرار دادن کلمه کلیدی جریان قبل از نوع درخواست، یک روش پخش سمت مشتری را مشخص می کنید.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

روت چت

جریانی از RouteNotes را می‌پذیرد که هنگام عبور از یک مسیر، در حالی که RouteNotes دیگر (مثلاً از سایر کاربران) دریافت می‌کند.

این دقیقاً همان موردی است که برای پخش جریانی دو طرفه استفاده می شود. یک جریان RPC دو طرفه که در آن هر دو طرف دنباله ای از پیام ها را با استفاده از جریان خواندن و نوشتن ارسال می کنند. این دو جریان به طور مستقل عمل می‌کنند، بنابراین کلاینت‌ها و سرورها می‌توانند به هر ترتیبی که دوست دارند بخوانند و بنویسند: به عنوان مثال، سرور می‌تواند قبل از نوشتن پاسخ‌هایش منتظر دریافت همه پیام‌های مشتری باشد، یا می‌تواند متناوب یک پیام را بخواند و سپس یک پیام بنویسد، یا ترکیب دیگری از خواندن و نوشتن. ترتیب پیام ها در هر جریان حفظ می شود. شما این نوع روش را با قرار دادن کلمه کلیدی جریان قبل از درخواست و پاسخ مشخص می کنید.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. کد مشتری و سرور تولید کنید

در مرحله بعد باید رابط های سرویس گیرنده و سرور gRPC را از تعریف سرویس .proto خود تولید کنیم. ما این کار را با استفاده از پروتکل کامپایلر بافر protoc با یک پلاگین مخصوص gRPC جاوا انجام می دهیم. برای تولید سرویس های gRPC باید از کامپایلر proto3 (که از دستورات proto2 و proto3 پشتیبانی می کند) استفاده کنید.

هنگام استفاده از Gradle یا Maven، افزونه پروتوک بیلد می تواند کدهای لازم را به عنوان بخشی از بیلد ایجاد کند. برای نحوه تولید کد از فایل های .proto خود می توانید به grpc-java README مراجعه کنید.

ما پیکربندی Gradle را ارائه کرده ایم.

از دایرکتوری streaming-grpc-java-getting-started وارد شوید

$ chmod +x gradlew
$ ./gradlew generateProto

کلاس های زیر از تعریف سرویس ما (تحت build/generated/sources/proto/main/java ) تولید می شوند:

  • یکی برای هر نوع پیام: Feature.java ، Rectangle.java, ... که شامل تمام کدهای بافر پروتکل برای پر کردن، سریال سازی و بازیابی انواع پیام درخواست و پاسخ ما هستند.
  • RouteGuideGrpc.java که شامل (همراه با برخی کدهای مفید دیگر) یک کلاس پایه برای سرورهای RouteGuide برای پیاده سازی است، RouteGuideGrpc.RouteGuideImplBase ، با تمام روش های تعریف شده در سرویس RouteGuide و کلاس های خرد برای استفاده از کلاینت ها

5. سرویس را پیاده سازی کنید

ابتدا بیایید نحوه ایجاد سرور RouteGuide را بررسی کنیم. دو بخش برای اینکه سرویس RouteGuide ما کار خود را انجام دهد وجود دارد:

  • پیاده سازی رابط سرویس ایجاد شده از تعریف سرویس ما: انجام "کار" واقعی سرویس ما.
  • اجرای یک سرور gRPC برای گوش دادن به درخواست های مشتریان و ارسال آنها به اجرای سرویس مناسب.

RouteGuide را پیاده سازی کنید

ما یک کلاس RouteGuideService را پیاده سازی خواهیم کرد که کلاس RouteGuideGrpc.RouteGuideImplBase ایجاد شده را گسترش می دهد. اجرا اینگونه به نظر می رسد.

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
        ...
}

public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {

        ...
}

public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {

        ...
}

اجازه دهید هر پیاده سازی RPC را با جزئیات بررسی کنیم

RPC استریم سمت سرور

در ادامه به یکی از RPC های جریانی خود نگاه می کنیم. ListFeatures یک RPC استریم سمت سرور است، بنابراین باید چندین Features به مشتری خود بازگردانیم.

private final Collection<Feature> features;

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

مانند RPC ساده، این روش یک شی درخواست ( Rectangle که مشتری ما می‌خواهد در آن Features پیدا کند) و یک مشاهده‌گر پاسخ StreamObserver دریافت می‌کند.

این بار، به تعداد Feature نیاز برای بازگشت به سرویس گیرنده، ما آنها را از مجموعه ویژگی های سرویس بر اساس اینکه آیا در داخل Rectangle درخواست ما هستند انتخاب می کنیم، و هر کدام را به نوبه خود برای مشاهده گر پاسخ با استفاده از متد onNext() می نویسیم. در نهایت، مانند RPC ساده ما، از متد onCompleted() مشاهده‌گر پاسخ استفاده می‌کنیم تا به gRPC بگوییم که نوشتن پاسخ‌ها به پایان رسیده است.

جریان RPC سمت مشتری

حالا بیایید به چیز کمی پیچیده‌تر نگاه کنیم: روش پخش سمت مشتری RecordRoute() ، که در آن جریانی از Points را از مشتری دریافت می‌کنیم و یک RouteSummary با اطلاعاتی در مورد سفر آن‌ها برمی‌گردانیم.

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

همانطور که می بینید، مانند انواع روش های قبلی، روش ما یک پارامتر StreamObserver responseObserver دریافت می کند، اما این بار یک StreamObserver برای مشتری برمی گرداند تا Points خود را بنویسد.

در بدنه متد، ما یک StreamObserver ناشناس را برای بازگشت نمونه می کنیم، که در آن:

  • روش onNext() را نادیده بگیرید تا هر بار که کلاینت Point در جریان پیام می نویسد ویژگی ها و سایر اطلاعات را دریافت کنید.
  • برای پر کردن و ساخت RouteSummary متد onCompleted() (که زمانی که کلاینت نوشتن پیام‌ها را به پایان رساند فراخوانی می‌شود) را لغو کنید. سپس با RouteSummary ، ناظر پاسخ خود متد خود، onNext() و سپس متد onCompleted() آن را فراخوانی می کنیم تا تماس از سمت سرور به پایان برسد.

جریان دو طرفه RPC

در نهایت، اجازه دهید به جریان دو طرفه RPC RouteChat() نگاه کنیم.

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

مانند مثال پخش جریانی سمت کلاینت، ما هم یک StreamObserver را دریافت می‌کنیم و هم آن را برمی‌گردانیم، با این تفاوت که این بار مقادیر را از طریق مشاهده‌گر پاسخ متدمان برمی‌گردانیم در حالی که مشتری همچنان در حال نوشتن پیام‌ها به جریان پیام خود است. نحو برای خواندن و نوشتن در اینجا دقیقاً مشابه روش‌های پخش جریانی مشتری و استریم سرور ما است. اگرچه هر یک از طرفین همیشه پیام های طرف مقابل را به ترتیبی که نوشته شده دریافت می کند، هم مشتری و هم سرور می توانند به هر ترتیبی بخوانند و بنویسند - جریان ها کاملاً مستقل عمل می کنند.

سرور را راه اندازی کنید

هنگامی که همه روش‌های خود را پیاده‌سازی کردیم، همچنین باید یک سرور gRPC راه‌اندازی کنیم تا مشتریان بتوانند واقعاً از خدمات ما استفاده کنند. قطعه زیر نشان می دهد که چگونه این کار را برای سرویس RouteGuide خود انجام می دهیم:

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
}

همانطور که می بینید، ما سرور خود را با استفاده از یک ServerBuilder می سازیم و راه اندازی می کنیم.

برای انجام این کار، ما:

  1. آدرس و پورتی را که می‌خواهیم برای گوش دادن به درخواست‌های مشتری با استفاده از متد forPort() سازنده استفاده کنیم، مشخص کنید.
  2. نمونه ای از کلاس پیاده سازی سرویس ما RouteGuideService ایجاد کنید و آن را به متد addService() سازنده ارسال کنید.
  3. برای ایجاد و راه اندازی یک سرور RPC برای سرویس ما، build() و start() در سازنده فراخوانی کنید.

از آنجایی که ServerBuilder در حال حاضر پورت را در خود جای داده است، تنها دلیلی که یک پورت را پاس می کنیم استفاده از آن برای ورود به سیستم است.

6. مشتری ایجاد کنید

در این بخش، ایجاد یک کلاینت برای سرویس RouteGuide خود را بررسی خواهیم کرد. می توانید نمونه کامل کد کلاینت ما را در ../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java ببینید.

نمونه خرد

برای فراخوانی متدهای سرویس، ابتدا باید یک خرد یا بهتر بگوییم دو خرد ایجاد کنیم:

  • یک خرد مسدود/همگام : این بدان معناست که فراخوانی RPC منتظر پاسخ سرور می‌ماند و یا پاسخی را برمی‌گرداند یا استثنایی ایجاد می‌کند.
  • یک خرد غیر مسدود/ناهمزمان که تماس‌های غیرانسدادی را با سرور برقرار می‌کند، جایی که پاسخ به صورت ناهمزمان برگردانده می‌شود. می‌توانید انواع خاصی از تماس‌های جریانی را فقط با استفاده از یک خرد ناهمزمان برقرار کنید.

ابتدا باید یک کانال gRPC برای خرد خود ایجاد کنیم و آدرس سرور و پورتی را که می‌خواهیم به آن وصل شویم را مشخص کنیم:

  public static void main(String[] args) throws InterruptedException {
    String target = "localhost:8980";
    if (args.length > 0) {
      if ("--help".equals(args[0])) {
        System.err.println("Usage: [target]");
        System.err.println("");
        System.err.println("  target  The server to connect to. Defaults to " + target);
        System.exit(1);
      }
      target = args[0];
    }

    List<Feature> features;
    try {
      features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
        .build();
    try {
      RouteGuideClient client = new RouteGuideClient(channel);

      // Looking for features between 40, -75 and 42, -73.
      client.listFeatures(400000000, -750000000, 420000000, -730000000);

      // Record a few randomly selected points from the features file.
      client.recordRoute(features, 10);

      // Send and receive some notes.
      CountDownLatch finishLatch = client.routeChat();

      if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        client.warning("routeChat did not finish within 1 minutes");
      }
    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }

برای ایجاد کانال از ManagedChannelBuilder استفاده می کنیم.

اکنون می‌توانیم از کانال برای ایجاد خرده‌های خود با استفاده از متدهای newStub و newBlockingStub ارائه‌شده در کلاس RouteGuideGrpc که از .proto خود تولید کردیم، استفاده کنیم.

public RouteGuideClient(Channel channel) {
    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
  }

به یاد داشته باشید، اگر مسدود نیست، ناهمگام است

روش های خدمات تماس

حال بیایید ببینیم که چگونه روش‌های خدمات خود را فراخوانی می‌کنیم. توجه داشته باشید که هر RPC ایجاد شده از خرد مسدود کننده در حالت مسدود کردن/همگام عمل می کند، به این معنی که تماس RPC منتظر پاسخ سرور می ماند و یا یک پاسخ یا یک خطا را برمی گرداند.

RPC استریم سمت سرور

در مرحله بعد، بیایید به یک تماس جریان سمت سرور با ListFeatures نگاه کنیم، که جریانی از Feature جغرافیایی را برمی گرداند:

Rectangle request = Rectangle.newBuilder()
             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

همانطور که می بینید، این روش بسیار شبیه به RPC یکنواخت ساده است که ما در Codelab Getting_Started_With_gRPC_Java به آن نگاه کردیم، با این تفاوت که به جای بازگرداندن یک Feature ، متد یک Iterator را برمی گرداند که کلاینت می تواند از آن برای خواندن همه Features بازگشتی استفاده کند.

جریان RPC سمت مشتری

اکنون برای چیز کمی پیچیده‌تر: روش پخش سمت سرویس گیرنده RecordRoute ، که در آن جریانی از Points را به سرور ارسال می‌کنیم و یک RouteSummary برمی‌گردانیم. برای این روش باید از خرد ناهمزمان استفاده کنیم. اگر قبلاً «ایجاد سرور» را خوانده‌اید، ممکن است برخی از این موارد بسیار آشنا به نظر برسند - RPCهای جریان ناهمزمان به روشی مشابه در هر دو طرف پیاده‌سازی می‌شوند.

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {

    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

همانطور که می بینید، برای فراخوانی این روش باید یک StreamObserver ایجاد کنیم که یک رابط ویژه برای فراخوانی سرور با پاسخ RouteSummary خود پیاده سازی می کند. در StreamObserver ما:

  • هنگامی که سرور یک RouteSummary در جریان پیام می نویسد، روش onNext() را لغو کنید تا اطلاعات برگشتی را چاپ کنید.
  • برای کاهش یک CountDownLatch روش onCompleted() را لغو کنید (زمانی که سرور فراخوانی را در سمت خود انجام داد فراخوانی می‌شود) تا بتوانیم بررسی کنیم که آیا سرور نوشتن را به پایان رسانده است یا خیر.

سپس StreamObserver به متد recordRoute() stub ناهمزمان می‌دهیم و ناظر درخواست StreamObserver خود را برمی‌گردانیم تا Points ما را برای ارسال به سرور بنویسد. پس از پایان نوشتن نقاط، از متد onCompleted() مشاهده‌گر درخواست استفاده می‌کنیم تا به gRPC بگوییم که نوشتن در سمت کلاینت را به پایان رسانده‌ایم. هنگامی که کارمان تمام شد، CountDownLatch خود را بررسی می کنیم تا ببینیم آیا سرور در سمت خود تکمیل شده است یا خیر.

جریان دو طرفه RPC

در نهایت، اجازه دهید به جریان دو طرفه RPC RouteChat() نگاه کنیم.

public CountDownLatch routeChat() {
    info("*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat(new StreamObserver<RouteNote>() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          }

          @Override
          public void onError(Throwable t) {
            warning("RouteChat Failed: {0}", Status.fromThrowable(t));
            finishLatch.countDown();
          }

          @Override
          public void onCompleted() {
            info("Finished RouteChat");
            finishLatch.countDown();
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
              newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // return the latch while receiving happens asynchronously
    return finishLatch;
  }

مانند مثال استریم سمت کلاینت، ما هم یک ناظر پاسخ StreamObserver را دریافت می‌کنیم و هم برمی‌گردانیم، با این تفاوت که این بار مقادیر را از طریق مشاهده‌گر پاسخ متد خود ارسال می‌کنیم در حالی که سرور همچنان در حال نوشتن پیام‌ها به جریان پیام خود است. نحو برای خواندن و نوشتن در اینجا دقیقاً مشابه روش پخش جریانی مشتری ما است. اگرچه هر یک از طرفین همیشه پیام های طرف مقابل را به ترتیبی که نوشته شده دریافت می کند، هم مشتری و هم سرور می توانند به هر ترتیبی بخوانند و بنویسند - جریان ها کاملاً مستقل عمل می کنند.

7. آن را امتحان کنید!

  1. از دایرکتوری start_here :
$ ./gradlew installDist

این کد شما را کامپایل می کند، آن را در یک شیشه بسته بندی می کند و اسکریپت هایی را ایجاد می کند که نمونه را اجرا می کنند. آنها در دایرکتوری build/install/start_here/bin/ ایجاد خواهند شد. اسکریپت ها عبارتند از: route-guide-server و route-guide-client .

سرور باید قبل از راه اندازی کلاینت در حال اجرا باشد.

  1. سرور را اجرا کنید:
$ ./build/install/start_here/bin/route-guide-server
  1. کلاینت را اجرا کنید:
$ ./build/install/start_here/bin/route-guide-client

8. بعدی چه خواهد شد