Pierwsze kroki z gRPC-Java – przesyłanie strumieniowe

1. Wprowadzenie

W tym module praktycznym użyjesz gRPC-Java do utworzenia klienta i serwera, które będą stanowić podstawę aplikacji do mapowania tras napisanej w języku Java.

Po ukończeniu tego samouczka będziesz mieć klienta, który łączy się z serwerem zdalnym za pomocą gRPC, aby uzyskiwać informacje o funkcjach na trasie klienta, tworzyć podsumowanie trasy klienta i wymieniać informacje o trasie, takie jak aktualizacje ruchu, z serwerem i innymi klientami.

Usługa jest zdefiniowana w pliku Protocol Buffers, który będzie używany do generowania kodu szablonowego dla klienta i serwera, aby mogły się ze sobą komunikować. Dzięki temu zaoszczędzisz czas i wysiłek potrzebny na wdrożenie tej funkcji.

Wygenerowany kod obsługuje nie tylko złożoność komunikacji między serwerem a klientem, ale także serializację i deserializację danych.

Czego się nauczysz

  • Jak używać buforów protokołu do definiowania interfejsu API usługi.
  • Jak utworzyć klienta i serwer oparte na gRPC na podstawie definicji Protocol Buffers za pomocą automatycznego generowania kodu.
  • znajomość komunikacji strumieniowej klient-serwer za pomocą gRPC;

Te warsztaty są przeznaczone dla programistów Java, którzy dopiero zaczynają korzystać z gRPC lub chcą sobie przypomnieć podstawy tej technologii, a także dla wszystkich innych osób zainteresowanych tworzeniem systemów rozproszonych. Nie musisz mieć wcześniejszego doświadczenia z gRPC.

2. Zanim zaczniesz

Wymagania wstępne

  • Wersja JDK 24.

Pobierz kod

Aby nie trzeba było zaczynać od zera, w tym ćwiczeniu znajdziesz szkielet kodu źródłowego aplikacji, który możesz uzupełnić. Z tych instrukcji dowiesz się, jak dokończyć aplikację, w tym jak użyć wtyczek kompilatora buforów protokołów do wygenerowania kodu gRPC.

Najpierw utwórz katalog roboczy z ćwiczeniami i przejdź do niego:

mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started

Pobierz i rozpakuj ćwiczenia z programowania:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-java-streaming/start_here

Możesz też pobrać plik ZIP zawierający tylko katalog z instrukcjami i rozpakować go ręcznie.

Jeśli nie chcesz wpisywać implementacji, gotowy kod źródłowy jest dostępny na GitHubie.

3. Określanie komunikatów i usług

Pierwszym krokiem jest zdefiniowanie usługi gRPC aplikacji, jej metody RPC oraz typów wiadomości żądania i odpowiedzi za pomocą buforów protokołu. Twoja usługa będzie zapewniać:

  • Metody RPC wywoływane przez serwer i klienta: ListFeatures, RecordRouteRouteChat.
  • Typy wiadomości Point, Feature, Rectangle, RouteNoteRouteSummary, które są strukturami danych wymienianymi między klientem a serwerem podczas wywoływania powyższych metod.

Protokoły buforów są powszechnie znane jako protobufy. Więcej informacji o terminologii gRPC znajdziesz w artykule Podstawowe koncepcje, architektura i cykl życia.

Ta metoda RPC i jej typy wiadomości będą zdefiniowane w pliku proto/routeguide/route_guide.proto podanego kodu źródłowego.

Utwórzmy plik route_guide.proto.

W tym przykładzie generujemy kod w Javie, więc w naszym pliku .proto określiliśmy opcję java_package:

option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";

Określanie typów wiadomości

proto/routeguide/route_guide.proto pliku kodu źródłowego najpierw zdefiniuj Point typ wiadomości. Symbol Point reprezentuje parę współrzędnych szerokości i długości geograficznej na mapie. W tym ćwiczeniu używaj liczb całkowitych jako współrzędnych:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

Numery 12 to unikalne identyfikatory poszczególnych pól w strukturze message.

Następnie określ Featuretyp wiadomości. Feature używa pola string na nazwę lub adres pocztowy czegoś w lokalizacji określonej przez Point:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

Aby można było przesyłać strumieniowo do klienta wiele punktów w danym obszarze, potrzebujesz wiadomości Rectangle, która reprezentuje prostokąt o określonych współrzędnych geograficznych, przedstawiony jako 2 przeciwległe punkty po przekątnej lohi:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

Oprócz tego wiadomość RouteNote, która reprezentuje wiadomość wysłaną w danym momencie:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

Na koniec potrzebujesz wiadomości RouteSummary. Ta wiadomość jest odbierana w odpowiedzi na wywołanie RPC RecordRoute, które opisujemy w następnej sekcji. Zawiera liczbę otrzymanych punktów, liczbę wykrytych cech i całkowity przebyty dystans jako sumę odległości między poszczególnymi punktami.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

Określanie metod usługi

Aby zdefiniować usługę, w pliku .proto podaj jej nazwę. Plik route_guide.proto ma strukturę service o nazwie RouteGuide, która definiuje co najmniej 1 metodę udostępnianą przez usługę aplikacji.

Gdy w definicji usługi określasz RPC metody, musisz podać typy żądań i odpowiedzi. W tej części ćwiczenia zdefiniujemy:

ListFeatures

Pobiera obiekty Feature dostępne w ramach danego Rectangle. Wyniki są przesyłane strumieniowo, a nie zwracane od razu, ponieważ prostokąt może obejmować duży obszar i zawierać ogromną liczbę obiektów.

W tej aplikacji użyjesz strumieniowego wywołania RPC po stronie serwera: klient wysyła żądanie do serwera i otrzymuje strumień, z którego może odczytywać sekwencję wiadomości. Klient odczytuje zwrócony strumień, dopóki nie będzie już żadnych wiadomości. Jak widać w naszym przykładzie, metodę przesyłania strumieniowego po stronie serwera określa się, umieszczając słowo kluczowe stream przed typem odpowiedzi.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

Akceptuje strumień punktów na pokonywanej trasie i zwraca RouteSummary po zakończeniu pokonywania trasy.

W tym przypadku odpowiednie jest wywołanie RPC strumieniowania po stronie klienta: klient zapisuje sekwencję wiadomości i wysyła je na serwer, ponownie używając udostępnionego strumienia. Gdy klient skończy pisać wiadomości, czeka, aż serwer je wszystkie odczyta i zwróci odpowiedź. Metodę przesyłania strumieniowego po stronie klienta określa się, umieszczając słowo kluczowe stream przed typem żądania.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

Akceptuje strumień RouteNotes wysyłanych podczas pokonywania trasy, a także inne RouteNotes (np. od innych użytkowników).

To jest właśnie przypadek użycia strumieniowania dwukierunkowego. Dwukierunkowe strumieniowe wywołanie RPC, w którym obie strony wysyłają sekwencję wiadomości za pomocą strumienia odczytu i zapisu. Oba strumienie działają niezależnie, więc klienci i serwery mogą odczytywać i zapisywać dane w dowolnej kolejności. Na przykład serwer może czekać na otrzymanie wszystkich wiadomości od klienta przed zapisaniem odpowiedzi lub może odczytać wiadomość, a następnie zapisać wiadomość albo użyć innej kombinacji odczytów i zapisów. Kolejność wiadomości w każdym strumieniu jest zachowana. Ten typ metody określa się, umieszczając słowo kluczowe stream przed żądaniem i odpowiedzią.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. Generowanie kodu klienta i serwera

Następnie musimy wygenerować interfejsy klienta i serwera gRPC z naszej definicji usługi .proto. W tym celu używamy kompilatora buforów protokołu protoc ze specjalną wtyczką gRPC Java. Aby wygenerować usługi gRPC, musisz użyć kompilatora proto3 (który obsługuje składnię proto2 i proto3).

Jeśli używasz Gradle lub Maven, wtyczka kompilacji protoc może wygenerować niezbędny kod w ramach kompilacji. Więcej informacji o generowaniu kodu z własnych plików .proto znajdziesz w pliku README pakietu grpc-java.

Podaliśmy konfigurację Gradle.

W katalogu streaming-grpc-java-getting-started wpisz

$ chmod +x gradlew
$ ./gradlew generateProto

Poniższe klasy są generowane na podstawie definicji usługi (w sekcji build/generated/sources/proto/main/java):

  • Po jednym dla każdego typu wiadomości: Feature.java, Rectangle.java, ..., które zawierają cały kod bufora protokołu do wypełniania, serializacji i pobierania typów wiadomości żądań i odpowiedzi.
  • RouteGuideGrpc.java, która zawiera (oprócz innego przydatnego kodu) klasę bazową do implementacji RouteGuide serwerówRouteGuideGrpc.RouteGuideImplBase ze wszystkimi metodami zdefiniowanymi w RouteGuide klasach usług i klasach stubów do użycia przez klientów.

5. Wdrażanie usługi

Najpierw zobaczmy, jak utworzyć RouteGuideserwerRouteGuide. Aby usługa RouteGuide działała prawidłowo, musisz wykonać 2 czynności:

  • Implementacja interfejsu usługi wygenerowanego na podstawie definicji usługi: wykonywanie rzeczywistej „pracy” usługi.
  • Uruchomienie serwera gRPC, który nasłuchuje żądań od klientów i przekazuje je do właściwej implementacji usługi.

Implementacja RouteGuide

Zaimplementujemy klasę RouteGuideService, która będzie rozszerzać wygenerowaną klasę RouteGuideGrpc.RouteGuideImplBase. Tak wyglądałaby implementacja.

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
        ...
}

public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {

        ...
}

public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {

        ...
}

Przyjrzyjmy się szczegółowo każdej implementacji RPC.

Strumieniowe wywołanie RPC po stronie serwera

Przyjrzyjmy się teraz jednemu z naszych strumieniowych wywołań RPC. ListFeatures to strumieniowe RPC po stronie serwera, więc musimy odesłać do klienta wiele obiektów Features.

private final Collection<Feature> features;

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

Podobnie jak w przypadku prostego wywołania RPC ta metoda pobiera obiekt żądania (Rectangle, w którym klient chce znaleźć Features) i obserwatora odpowiedzi StreamObserver.

Tym razem pobieramy tyle obiektów Feature, ile potrzebujemy, aby zwrócić je klientowi (w tym przypadku wybieramy je z kolekcji funkcji usługi na podstawie tego, czy znajdują się w naszym żądaniu Rectangle), i zapisujemy je kolejno w obserwatorze odpowiedzi za pomocą jego metody onNext(). Na koniec, podobnie jak w przypadku prostego RPC, używamy metody onCompleted() obserwatora odpowiedzi, aby poinformować gRPC, że zakończyliśmy pisanie odpowiedzi.

RPC przesyłania strumieniowego po stronie klienta

Przyjrzyjmy się teraz nieco bardziej skomplikowanej metodzie przesyłania strumieniowego po stronie klienta RecordRoute(), w której otrzymujemy strumień Points od klienta i zwracamy pojedynczy RouteSummary z informacjami o jego podróży.

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

Jak widać, podobnie jak w przypadku poprzednich typów metod nasza metoda otrzymuje parametr StreamObserver responseObserver, ale tym razem zwraca StreamObserver, aby klient mógł napisać swój Points.

W treści metody tworzymy instancję anonimowego StreamObserver, aby ją zwrócić. W tym celu:

  • Zastąp metodę onNext(), aby za każdym razem, gdy klient wpisze Point do strumienia wiadomości, otrzymywać funkcje i inne informacje.
  • Zastąp metodę onCompleted() (wywoływaną, gdy klient skończy zapisywać wiadomości), aby wypełnić i utworzyć RouteSummary. Następnie wywołujemy onNext() obserwatora odpowiedzi naszej metody z naszym RouteSummary, a potem wywołujemy jego metodę onCompleted(), aby zakończyć połączenie po stronie serwera.

Dwukierunkowe przesyłanie strumieniowe RPC

Na koniec przyjrzyjmy się dwukierunkowemu strumieniowemu wywołaniu RPC RouteChat().

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

Podobnie jak w przypadku przykładu przesyłania strumieniowego po stronie klienta, otrzymujemy i zwracamy StreamObserver, z tym że tym razem zwracamy wartości za pomocą obserwatora odpowiedzi naszej metody, podczas gdy klient nadal pisze wiadomości do swojego strumienia wiadomości. Składnia odczytu i zapisu jest tu dokładnie taka sama jak w przypadku naszych metod strumieniowania po stronie klienta i po stronie serwera. Chociaż każda ze stron zawsze będzie otrzymywać wiadomości drugiej strony w kolejności, w jakiej zostały napisane, zarówno klient, jak i serwer mogą odczytywać i zapisywać dane w dowolnej kolejności – strumienie działają całkowicie niezależnie.

Uruchom serwer.

Po zaimplementowaniu wszystkich metod musimy też uruchomić serwer gRPC, aby klienci mogli korzystać z naszej usługi. Poniższy fragment kodu pokazuje, jak to robimy w przypadku naszej usługi RouteGuide:

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
}

Jak widać, tworzymy i uruchamiamy serwer za pomocą ServerBuilder.

W tym celu możemy:

  1. Określ adres i port, których chcemy używać do nasłuchiwania żądań klientów, za pomocą metody forPort() narzędzia do tworzenia.
  2. Utwórz instancję klasy implementacji usługi RouteGuideService i przekaż ją do metody addService() konstruktora.
  3. Wywołaj build()start() w konstruktorze, aby utworzyć i uruchomić serwer RPC dla naszej usługi.

Ponieważ ServerBuilder zawiera już port, jedynym powodem, dla którego przekazujemy port, jest użycie go do logowania.

6. Tworzenie klienta

W tej sekcji zajmiemy się tworzeniem klienta dla usługi RouteGuide. Pełny przykładowy kod klienta znajdziesz w ../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java.

Tworzenie instancji stuba

Aby wywołać metody usługi, musimy najpierw utworzyć stub, a właściwie 2 stubs:

  • blokujący/synchroniczny element zastępczy: oznacza to, że wywołanie RPC czeka na odpowiedź serwera i zwraca odpowiedź lub zgłasza wyjątek;
  • nieblokujący/asynchroniczny stub, który wykonuje nieblokujące wywołania serwera, a odpowiedź jest zwracana asynchronicznie. Niektóre typy połączeń strumieniowych można wykonywać tylko za pomocą asynchronicznego elementu zastępczego.

Najpierw musimy utworzyć kanał gRPC dla naszego namiastek, określając adres serwera i port, z którymi chcemy się połączyć:

  public static void main(String[] args) throws InterruptedException {
    String target = "localhost:8980";
    if (args.length > 0) {
      if ("--help".equals(args[0])) {
        System.err.println("Usage: [target]");
        System.err.println("");
        System.err.println("  target  The server to connect to. Defaults to " + target);
        System.exit(1);
      }
      target = args[0];
    }

    List<Feature> features;
    try {
      features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
        .build();
    try {
      RouteGuideClient client = new RouteGuideClient(channel);

      // Looking for features between 40, -75 and 42, -73.
      client.listFeatures(400000000, -750000000, 420000000, -730000000);

      // Record a few randomly selected points from the features file.
      client.recordRoute(features, 10);

      // Send and receive some notes.
      CountDownLatch finishLatch = client.routeChat();

      if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        client.warning("routeChat did not finish within 1 minutes");
      }
    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }

Do utworzenia kanału używamy ManagedChannelBuilder.

Teraz możemy użyć kanału do utworzenia stubów za pomocą metod newStubnewBlockingStub udostępnionych w klasie RouteGuideGrpc wygenerowanej na podstawie pliku .proto.

public RouteGuideClient(Channel channel) {
    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
  }

Pamiętaj, że jeśli nie blokuje, jest asynchroniczne.

Metody usługi połączeń

Zobaczmy teraz, jak wywołujemy metody usługi. Pamiętaj, że wszystkie wywołania RPC utworzone na podstawie blokującego stuba będą działać w trybie blokującym/synchronicznym, co oznacza, że wywołanie RPC czeka na odpowiedź serwera i zwraca odpowiedź lub błąd.

Strumieniowe wywołanie RPC po stronie serwera

Przyjrzyjmy się teraz wywołaniu strumieniowania po stronie serwera do ListFeatures, które zwraca strumień danych geograficznych Feature:

Rectangle request = Rectangle.newBuilder()
             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

Jak widzisz, jest to bardzo podobne do prostego wywołania RPC typu unary, które omawialiśmy w samouczku Getting_Started_With_gRPC_Java. Zamiast zwracać pojedynczy obiekt Feature, metoda zwraca obiekt Iterator, którego klient może użyć do odczytania wszystkich zwróconych obiektów Features.

RPC przesyłania strumieniowego po stronie klienta

Teraz coś nieco bardziej skomplikowanego: metoda przesyłania strumieniowego po stronie klienta RecordRoute, w której wysyłamy strumień Points na serwer i otrzymujemy pojedynczy RouteSummary. W przypadku tej metody musimy użyć asynchronicznego kodu zastępczego. Jeśli przeczytasz artykuł Tworzenie serwera, niektóre z tych informacji mogą Ci się wydać znajome – asynchroniczne RPC przesyłania strumieniowego są implementowane w podobny sposób po obu stronach.

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {

    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

Jak widać, aby wywołać tę metodę, musimy utworzyć obiekt StreamObserver, który implementuje specjalny interfejs, aby serwer mógł go wywołać za pomocą odpowiedzi RouteSummary. W naszym StreamObserver:

  • Zastąp metodę onNext(), aby wydrukować zwrócone informacje, gdy serwer zapisze RouteSummary w strumieniu wiadomości.
  • Zastąp metodę onCompleted() (wywoływaną, gdy serwer zakończy wywołanie po swojej stronie), aby zmniejszyć wartość CountDownLatch, dzięki czemu będziemy mogli sprawdzić, czy serwer zakończył zapisywanie.

Następnie przekazujemy StreamObserver do metody recordRoute() asynchronicznego stuba i otrzymujemy własny obserwator żądań StreamObserver, aby zapisać Points do wysłania na serwer. Po zapisaniu punktów używamy metody onCompleted() obserwatora żądania, aby poinformować gRPC, że po stronie klienta zakończyliśmy zapisywanie. Gdy to zrobimy, sprawdzamy w CountDownLatch, czy serwer zakończył działanie po swojej stronie.

Dwukierunkowe przesyłanie strumieniowe RPC

Na koniec przyjrzyjmy się dwukierunkowemu strumieniowemu wywołaniu RPC RouteChat().

public CountDownLatch routeChat() {
    info("*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat(new StreamObserver<RouteNote>() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          }

          @Override
          public void onError(Throwable t) {
            warning("RouteChat Failed: {0}", Status.fromThrowable(t));
            finishLatch.countDown();
          }

          @Override
          public void onCompleted() {
            info("Finished RouteChat");
            finishLatch.countDown();
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
              newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // return the latch while receiving happens asynchronously
    return finishLatch;
  }

Podobnie jak w przypadku przesyłania strumieniowego po stronie klienta, otrzymujemy i zwracamy obserwatora odpowiedzi StreamObserver, z tym że tym razem wysyłamy wartości za pomocą obserwatora odpowiedzi naszej metody, podczas gdy serwer nadal zapisuje wiadomości w swoim strumieniu wiadomości. Składnia odczytu i zapisu jest tu dokładnie taka sama jak w przypadku naszej metody przesyłania strumieniowego po stronie klienta. Chociaż każda ze stron zawsze będzie otrzymywać wiadomości drugiej strony w kolejności, w jakiej zostały napisane, zarówno klient, jak i serwer mogą odczytywać i zapisywać dane w dowolnej kolejności – strumienie działają całkowicie niezależnie.

7. Spróbuj!

  1. W katalogu start_here:
$ ./gradlew installDist

Spowoduje to skompilowanie kodu, spakowanie go w plik JAR i utworzenie skryptów, które uruchamiają przykład. Zostaną one utworzone w katalogu build/install/start_here/bin/. Są to skrypty route-guide-serverroute-guide-client.

Przed uruchomieniem klienta serwer musi być włączony.

  1. Uruchom serwer:
$ ./build/install/start_here/bin/route-guide-server
  1. Uruchom klienta:
$ ./build/install/start_here/bin/route-guide-client

8. Co dalej?