gRPC-Java ile çalışmaya başlama - Akış

1. Giriş

Bu codelab'de, Java ile yazılmış bir rota eşleme uygulamasının temelini oluşturan bir istemci ve sunucu oluşturmak için gRPC-Java'yı kullanacaksınız.

Eğitimin sonunda, bir müşterinin rotasındaki özellikler hakkında bilgi almak, müşterinin rotasının özetini oluşturmak ve trafik güncellemeleri gibi rota bilgilerini sunucu ve diğer müşterilerle paylaşmak için gRPC kullanarak uzak bir sunucuya bağlanan bir istemciniz olacak.

Hizmet, istemci ve sunucu için standart kod oluşturmak üzere kullanılacak bir Protocol Buffers dosyasında tanımlanır. Böylece, bu işlevselliği uygularken zamandan ve emekten tasarruf edersiniz.

Oluşturulan bu kod, yalnızca sunucu ile istemci arasındaki iletişimin karmaşıklıklarını değil, aynı zamanda veri serileştirme ve seri durumdan çıkarma işlemlerini de ele alır.

Neler öğreneceksiniz?

  • Hizmet API'sini tanımlamak için Protocol Buffers'ı kullanma.
  • Otomatik kod oluşturma kullanarak bir Protokol Arabellek tanımından gRPC tabanlı istemci ve sunucu oluşturma.
  • gRPC ile istemci-sunucu akışı iletişimi hakkında bilgi sahibi olmanız gerekir.

Bu codelab, gRPC'ye yeni başlayan veya gRPC ile ilgili bilgilerini tazelemek isteyen Java geliştiricilerin yanı sıra dağıtılmış sistemler oluşturmakla ilgilenen herkes için hazırlanmıştır. Daha önce gRPC deneyimi gerekmez.

2. Başlamadan önce

Ön koşullar

  • JDK sürümü 24.

Kodu alın

Bu codelab, tamamen sıfırdan başlamanız gerekmemesi için uygulamanın kaynak kodunun tamamlayabileceğiniz bir iskeletini sağlar. Aşağıdaki adımlarda, protokol arabellek derleyici eklentilerini kullanarak standart gRPC kodu oluşturma da dahil olmak üzere uygulamanın nasıl tamamlanacağı gösterilmektedir.

Öncelikle codelab çalışma dizinini oluşturun ve bu dizine gidin:

mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started

Codelab'i indirip ayıklayın:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-java-streaming/start_here

Alternatif olarak, yalnızca codelab dizinini içeren .zip dosyasını indirebilir ve manuel olarak sıkıştırmasını açabilirsiniz.

Bir uygulamayı yazmayı atlamak isterseniz tamamlanmış kaynak kodunu GitHub'da bulabilirsiniz.

3. İletileri ve hizmetleri tanımlama

İlk adımınız, Protocol Buffers'ı kullanarak uygulamanın gRPC hizmetini, RPC yöntemini, istek ve yanıt mesajı türlerini tanımlamaktır. Hizmetinizde sunulacaklar:

  • Sunucunun uyguladığı ve istemcinin çağırdığı ListFeatures, RecordRoute ve RouteChat adlı RPC yöntemleri.
  • Yukarıdaki yöntemler çağrıldığında istemci ile sunucu arasında değiştirilen veri yapıları olan Point, Feature, Rectangle, RouteNote ve RouteSummary mesaj türleri.

Protocol Buffers genellikle protobuf olarak bilinir. gRPC terminolojisi hakkında daha fazla bilgi için gRPC'nin Temel kavramlar, mimari ve yaşam döngüsü başlıklı makalesine bakın.

Bu RPC yöntemi ve mesaj türleri, sağlanan kaynak kodun proto/routeguide/route_guide.proto dosyasında tanımlanır.

route_guide.proto dosyası oluşturalım.

Bu örnekte Java kodu oluşturduğumuz için .proto içinde bir java_package dosya seçeneği belirledik:

option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";

İleti türlerini tanımlama

Kaynak kodun proto/routeguide/route_guide.proto dosyasında önce Point mesaj türünü tanımlayın. Point, haritadaki bir enlem-boylam koordinat çiftini temsil eder. Bu codelab'de koordinatlar için tam sayıları kullanın:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

1 ve 2 sayıları, message yapısındaki her alan için benzersiz kimlik numaralarıdır.

Ardından, Feature mesaj türünü tanımlayın. Bir Feature, Point ile belirtilen bir konumdaki bir şeyin adı veya posta adresi için string alanını kullanır:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

Bir alan içindeki birden fazla noktanın istemciye aktarılabilmesi için, enlem-boylam dikdörtgenini temsil eden bir Rectangle mesajına ihtiyacınız vardır. Bu mesaj, çapraz olarak zıt iki nokta lo ve hi olarak gösterilir:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

Ayrıca, belirli bir noktada gönderilen mesajı temsil eden bir RouteNote mesajı:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

Son olarak, RouteSummary mesajı göndermeniz gerekir. Bu mesaj, sonraki bölümde açıklanan bir RecordRoute RPC'ye yanıt olarak alınır. Alınan bireysel puanların sayısı, algılanan özelliklerin sayısı ve her nokta arasındaki mesafenin kümülatif toplamı olarak kapsanan toplam mesafeyi içerir.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

Hizmet yöntemlerini tanımlama

Bir hizmeti tanımlamak için .proto dosyanızda adlandırılmış bir hizmet belirtirsiniz. route_guide.proto dosyasında, uygulamanın hizmeti tarafından sağlanan bir veya daha fazla yöntemi tanımlayan RouteGuide adlı bir service yapısı bulunur.

Hizmet tanımınızda RPC yöntemleri tanımladığınızda, bunların istek ve yanıt türlerini belirtirsiniz. Bu codelab bölümünde şunları tanımlayalım:

ListFeatures

Belirtilen Rectangle içinde bulunan Feature nesnelerini alır. Dikdörtgen geniş bir alanı kaplayıp çok sayıda özellik içerebileceğinden sonuçlar tek seferde döndürülmek yerine yayınlanır.

Bu uygulamada sunucu tarafı akış RPC'si kullanacaksınız: İstemci, sunucuya bir istek gönderir ve mesaj dizisini okumak için bir akış alır. İstemci, döndürülen akıştan mesaj kalmayana kadar okur. Örneğimizde de görebileceğiniz gibi, yanıt türünden önce akış anahtar kelimesini yerleştirerek sunucu tarafı akış yöntemini belirtirsiniz.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

Geçilen bir rotadaki nokta akışını kabul eder ve geçiş tamamlandığında RouteSummary değerini döndürür.

Bu durumda istemci tarafı akış RPC uygundur: İstemci, bir mesaj dizisi yazar ve bunları yine sağlanan bir akışı kullanarak sunucuya gönderir. İstemci, mesajları yazmayı bitirdikten sonra sunucunun hepsini okumasını ve yanıtını döndürmesini bekler. İstemci tarafı yayın yöntemini, yayın anahtar kelimesini istek türünün önüne yerleştirerek belirtirsiniz.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

Bir rota geçilirken gönderilen RouteNotes akışını kabul ederken diğer RouteNotes'ları (ör. diğer kullanıcılardan gelen) alır.

Bu, iki yönlü akışın tam olarak kullanım alanıdır. Her iki tarafın da okuma/yazma akışı kullanarak bir ileti dizisi gönderdiği çift yönlü akış RPC'si. İki akış bağımsız olarak çalışır. Bu nedenle, istemciler ve sunucular istedikleri sırada okuma ve yazma işlemi yapabilir. Örneğin, sunucu yanıtlarını yazmadan önce tüm istemci mesajlarını almayı bekleyebilir veya alternatif olarak bir mesajı okuyup ardından bir mesaj yazabilir ya da okuma ve yazma işlemlerini başka bir şekilde birleştirebilir. Her akıştaki iletilerin sırası korunur. Bu tür bir yöntemi, hem istekten hem de yanıttan önce akış anahtar kelimesini yerleştirerek belirtirsiniz.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. İstemci ve sunucu kodu oluşturma

Ardından, .proto hizmet tanımımızdan gRPC istemci ve sunucu arayüzlerini oluşturmamız gerekir. Bu işlemi, özel bir gRPC Java eklentisiyle birlikte protokol arabelleği derleyicisini protoc kullanarak yaparız. gRPC hizmetleri oluşturmak için proto3 derleyicisini (hem proto2 hem de proto3 söz dizimini destekler) kullanmanız gerekir.

Gradle veya Maven kullanılırken protoc derleme eklentisi, derlemenin bir parçası olarak gerekli kodu oluşturabilir. Kendi .proto dosyalarınızdan nasıl kod oluşturacağınız hakkında bilgi edinmek için grpc-java README dosyasına bakabilirsiniz.

Gradle yapılandırması sağladık.

streaming-grpc-java-getting-started dizininden şu komutu girin:

$ chmod +x gradlew
$ ./gradlew generateProto

Aşağıdaki sınıflar, hizmet tanımımızdan (build/generated/sources/proto/main/java altında) oluşturulur:

  • Her mesaj türü için bir tane: Feature.java, Rectangle.java, .... Bunlar, istek ve yanıt mesajı türlerimizi doldurmak, serileştirmek ve almak için gereken tüm protokol arabellek kodunu içerir.
  • RouteGuideGrpc.java (diğer bazı yararlı kodlarla birlikte) RouteGuide sunucularının uygulaması için bir temel sınıf, RouteGuideGrpc.RouteGuideImplBase ve istemcilerin kullanması için RouteGuide hizmetinde ve saplama sınıflarında tanımlanan tüm yöntemleri içerir.

5. Hizmeti uygulama

Öncelikle RouteGuide sunucuyu nasıl oluşturduğumuza bakalım. RouteGuide hizmetimizin işini yapması için iki bölüm vardır:

  • Hizmet tanımımızdan oluşturulan hizmet arayüzünü uygulama: Hizmetimizin asıl "işini" yapma.
  • İstemcilerden gelen istekleri dinlemek ve bunları doğru hizmet uygulamasına göndermek için bir gRPC sunucusu çalıştırma.

RouteGuide'ı uygulama

Oluşturulan RouteGuideGrpc.RouteGuideImplBase sınıfını genişletecek bir RouteGuideService sınıfı uygulayacağız. Uygulama bu şekilde görünür.

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
        ...
}

public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {

        ...
}

public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {

        ...
}

Her RPC uygulamasını ayrıntılı olarak inceleyelim.

Sunucu tarafında yayın RPC'si

Şimdi de akış RPC'lerimizden birine bakalım. ListFeatures, sunucu tarafında yayınlanan bir RPC olduğundan istemcimize birden fazla Features geri göndermemiz gerekir.

private final Collection<Feature> features;

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

Basit RPC'ye benzer şekilde, bu yöntem bir istek nesnesi (istemcimizin Features bulmak istediği Rectangle) ve bir StreamObserver yanıt gözlemcisi alır.

Bu sefer, istemciye döndürmemiz gereken sayıda Feature nesne alırız (bu durumda, bunları isteğimizin Rectangle içinde olup olmadıklarına göre hizmetin özellik koleksiyonundan seçeriz) ve her birini sırayla onNext() yöntemini kullanarak yanıt gözlemcisine yazarız. Son olarak, basit RPC'mizde olduğu gibi, yanıtları yazmayı bitirdiğimizi gRPC'ye bildirmek için yanıt gözlemcisinin onCompleted() yöntemini kullanırız.

İstemci tarafı yayın RPC'si

Şimdi biraz daha karmaşık bir yönteme, istemci tarafı akış yöntemine RecordRoute() bakalım. Bu yöntemde istemciden bir Points akışı alıp yolculuklarıyla ilgili bilgileri içeren tek bir RouteSummary döndürüyoruz.

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

Gördüğünüz gibi, yöntemimiz önceki yöntem türleri gibi bir StreamObserver responseObserver parametresi alıyor ancak bu kez istemcinin Points yazması için bir StreamObserver döndürüyor.

Yöntem gövdesinde, döndürülecek anonim bir StreamObserver oluştururuz. Bu işlemde:

  • İstemci, ileti akışına her Point yazdığında özellikleri ve diğer bilgileri almak için onNext() yöntemini geçersiz kılın.
  • onCompleted() yöntemini (istemci mesaj yazmayı bitirdiğinde çağrılır) geçersiz kılarak RouteSummary öğemizi doldurup oluşturun. Ardından, yöntemimizin kendi yanıt gözlemcisinin onNext() yöntemini RouteSummary ile çağırırız ve sunucu tarafındaki aramayı sonlandırmak için onCompleted() yöntemini çağırırız.

Çift yönlü akış RPC'si

Son olarak, çift yönlü akış RPC'mize RouteChat() göz atalım.

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

İstemci tarafı akış örneğimizde olduğu gibi, bu kez de StreamObserver alıp döndürüyoruz. Ancak bu kez, istemci kendi mesaj akışına mesaj yazmaya devam ederken değerleri yöntemimizin yanıt gözlemcisi aracılığıyla döndürüyoruz. Burada okuma ve yazma söz dizimi, istemci akışı ve sunucu akışı yöntemlerimizle tamamen aynıdır. Her iki taraf da diğer tarafın iletilerini her zaman yazıldıkları sırayla alsa da hem istemci hem de sunucu, iletileri herhangi bir sırada okuyup yazabilir. Akışlar tamamen bağımsız olarak çalışır.

Sunucuyu başlatın.

Tüm yöntemlerimizi uyguladıktan sonra, istemcilerin hizmetimizi kullanabilmesi için bir gRPC sunucusu da başlatmamız gerekir. Aşağıdaki snippet'te, RouteGuide hizmetimiz için bunu nasıl yaptığımız gösterilmektedir:

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
}

Gördüğünüz gibi, sunucumuzu ServerBuilder kullanarak oluşturup başlatıyoruz.

Bunu yapmak için:

  1. Oluşturucunun forPort() yöntemini kullanarak istemci isteklerini dinlemek için kullanmak istediğimiz adresi ve bağlantı noktasını belirtin.
  2. Hizmet uygulama sınıfımızın RouteGuideService bir örneğini oluşturun ve bunu oluşturucunun addService() yöntemine iletin.
  3. Hizmetimiz için bir UPÇ sunucusu oluşturup başlatmak üzere oluşturucuda build() ve start() işlevlerini çağırın.

ServerBuilder bağlantı noktasını zaten içerdiğinden, bağlantı noktasını yalnızca günlük kaydı için iletiyoruz.

6. İstemciyi oluşturma

Bu bölümde, RouteGuide hizmetimiz için istemci oluşturma konusunu ele alacağız. Tam örnek istemci kodumuzu ../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java adresinde görebilirsiniz.

Bir saplama oluşturma

Hizmet yöntemlerini çağırmak için önce bir taslak oluşturmamız gerekir. Daha doğrusu iki taslak oluşturmamız gerekir:

  • Engelleme/eşzamanlı saplama: Bu, RPC çağrısının sunucunun yanıt vermesini beklediği ve yanıt döndüreceği veya istisna oluşturacağı anlamına gelir.
  • Yanıtın eşzamansız olarak döndürüldüğü, sunucuya engellemeyen çağrılar yapan engellemeyen/eşzamansız bir saplama. Belirli türlerdeki akış aramalarını yalnızca eşzamansız bir saplama kullanarak yapabilirsiniz.

Öncelikle, bağlanmak istediğimiz sunucu adresini ve bağlantı noktasını belirterek taslağımız için bir gRPC kanalı oluşturmamız gerekir:

  public static void main(String[] args) throws InterruptedException {
    String target = "localhost:8980";
    if (args.length > 0) {
      if ("--help".equals(args[0])) {
        System.err.println("Usage: [target]");
        System.err.println("");
        System.err.println("  target  The server to connect to. Defaults to " + target);
        System.exit(1);
      }
      target = args[0];
    }

    List<Feature> features;
    try {
      features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
        .build();
    try {
      RouteGuideClient client = new RouteGuideClient(channel);

      // Looking for features between 40, -75 and 42, -73.
      client.listFeatures(400000000, -750000000, 420000000, -730000000);

      // Record a few randomly selected points from the features file.
      client.recordRoute(features, 10);

      // Send and receive some notes.
      CountDownLatch finishLatch = client.routeChat();

      if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        client.warning("routeChat did not finish within 1 minutes");
      }
    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }

Kanalı oluşturmak için ManagedChannelBuilder kullanırız.

Artık .proto'ümüzden oluşturduğumuz RouteGuideGrpc sınıfında sağlanan newStub ve newBlockingStub yöntemlerini kullanarak kanalımızı oluşturabiliriz.

public RouteGuideClient(Channel channel) {
    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
  }

Engellemiyorsa asenkron olduğunu unutmayın.

Hizmet yöntemlerini çağırma

Şimdi de hizmet yöntemlerimizi nasıl çağırdığımıza bakalım. Engelleme sapından oluşturulan tüm RPC'lerin engelleme/eşzamanlı modda çalışacağını unutmayın. Bu, RPC çağrısının sunucunun yanıt vermesini bekleyeceği ve yanıt ya da hata döndüreceği anlamına gelir.

Sunucu tarafında yayın RPC'si

Şimdi de ListFeatures için sunucu tarafı bir akış çağrısına bakalım. Bu çağrı, coğrafi Feature akışı döndürür:

Rectangle request = Rectangle.newBuilder()
             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

Gördüğünüz gibi, Getting_Started_With_gRPC_Java codelab'inde incelediğimiz basit tekli RPC'ye çok benziyor. Tek farkı, tek bir Feature döndürmek yerine, istemcinin döndürülen tüm Features'leri okumak için kullanabileceği bir Iterator döndürmesi.

İstemci tarafı yayın RPC'si

Şimdi biraz daha karmaşık bir konuya geçelim: istemci tarafı yayın yöntemi RecordRoute. Bu yöntemde, sunucuya bir Points akışı göndeririz ve tek bir RouteSummary alırız. Bu yöntem için eşzamansız saplamayı kullanmamız gerekir. Sunucuyu oluşturma başlıklı makaleyi daha önce okuduysanız bu bölümdeki bazı bilgiler size tanıdık gelebilir. Asenkron akış RPC'leri her iki tarafta da benzer şekilde uygulanır.

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {

    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

Gördüğünüz gibi, bu yöntemi çağırmak için sunucunun RouteSummary yanıtıyla çağırması için özel bir arayüz uygulayan bir StreamObserver oluşturmamız gerekiyor. StreamObserver bölgemizde:

  • Sunucu, ileti akışına RouteSummary yazdığında döndürülen bilgileri yazdırmak için onNext() yöntemini geçersiz kılın.
  • Sunucu kendi tarafında aramayı tamamladığında çağrılan onCompleted() yöntemini geçersiz kılarak CountDownLatch değerini azaltın. Böylece sunucunun yazma işlemini bitirip bitirmediğini kontrol edebiliriz.

Ardından, StreamObserver öğesini eşzamansız saplamanın recordRoute() yöntemine iletiriz ve sunucuya göndermek üzere Points yazmak için kendi StreamObserver istek gözlemcimizi geri alırız. Puan yazmayı bitirdikten sonra, istemci tarafında yazmayı bitirdiğimizi gRPC'ye bildirmek için istek gözlemcisinin onCompleted() yöntemini kullanırız. İşlem tamamlandığında, sunucunun kendi tarafında tamamlanıp tamamlanmadığını görmek için CountDownLatch kontrol ederiz.

Çift yönlü akış RPC'si

Son olarak, çift yönlü akış RPC'mize RouteChat() göz atalım.

public CountDownLatch routeChat() {
    info("*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat(new StreamObserver<RouteNote>() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          }

          @Override
          public void onError(Throwable t) {
            warning("RouteChat Failed: {0}", Status.fromThrowable(t));
            finishLatch.countDown();
          }

          @Override
          public void onCompleted() {
            info("Finished RouteChat");
            finishLatch.countDown();
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
              newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // return the latch while receiving happens asynchronously
    return finishLatch;
  }

İstemci tarafı akış örneğimizde olduğu gibi, bu kez de StreamObserver yanıt gözlemcisi alıp döndürüyoruz. Ancak bu kez, sunucu kendi ileti akışına ileti yazmaya devam ederken değerleri yöntemimizin yanıt gözlemcisi aracılığıyla gönderiyoruz. Burada okuma ve yazma söz dizimi, istemci akışı yöntemimizle tamamen aynıdır. Her iki taraf da diğer tarafın iletilerini her zaman yazıldıkları sırayla alsa da hem istemci hem de sunucu, iletileri herhangi bir sırada okuyup yazabilir. Akışlar tamamen bağımsız olarak çalışır.

7. Yenilikleri inceleyin.

  1. start_here dizininden:
$ ./gradlew installDist

Bu işlem, kodunuzu derler, jar dosyası olarak paketler ve örneği çalıştıran komut dosyalarını oluşturur. Bu dosyalar build/install/start_here/bin/ dizininde oluşturulur. Komut dosyaları: route-guide-server ve route-guide-client.

İstemci başlatılmadan önce sunucunun çalışıyor olması gerekir.

  1. Sunucuyu çalıştırın:
$ ./build/install/start_here/bin/route-guide-server
  1. İstemciyi çalıştırın:
$ ./build/install/start_here/bin/route-guide-client

8. Sırada ne var?