Erste Schritte mit gRPC-Java – Streaming

1. Einführung

In diesem Codelab verwenden Sie gRPC-Java, um einen Client und einen Server zu erstellen, die die Grundlage einer in Java geschriebenen Routenplanungsanwendung bilden.

Am Ende des Tutorials haben Sie einen Client, der über gRPC eine Verbindung zu einem Remote-Server herstellt, um Informationen zu Funktionen auf der Route eines Clients abzurufen, eine Zusammenfassung der Route eines Clients zu erstellen und Routeninformationen wie Verkehrsaktualisierungen mit dem Server und anderen Clients auszutauschen.

Der Dienst wird in einer Protocol Buffers-Datei definiert, die zum Generieren von Boilerplate-Code für den Client und den Server verwendet wird, damit sie miteinander kommunizieren können. So sparen Sie Zeit und Aufwand bei der Implementierung dieser Funktion.

Dieser generierte Code kümmert sich nicht nur um die Komplexität der Kommunikation zwischen Server und Client, sondern auch um die Serialisierung und Deserialisierung von Daten.

Lerninhalte

  • Wie Sie Protocol Buffers zum Definieren einer Dienst-API verwenden.
  • Hier erfahren Sie, wie Sie einen gRPC-basierten Client und Server aus einer Protocol Buffers-Definition mithilfe der automatischen Codegenerierung erstellen.
  • Sie haben ein grundlegendes Verständnis der Client-Server-Streaming-Kommunikation mit gRPC.

Dieses Codelab richtet sich an Java-Entwickler, die neu in gRPC sind oder ihr Wissen zu gRPC auffrischen möchten, sowie an alle anderen, die sich für die Entwicklung verteilter Systeme interessieren. Es sind keine Vorkenntnisse in gRPC erforderlich.

2. Hinweis

Vorbereitung

  • JDK-Version 24.

Code abrufen

Damit Sie nicht ganz von vorn anfangen müssen, enthält dieses Codelab ein Gerüst des Quellcodes der Anwendung, das Sie vervollständigen können. In den folgenden Schritten erfahren Sie, wie Sie die Anwendung fertigstellen, einschließlich der Verwendung der Protocol Buffer-Compiler-Plug-ins zum Generieren des Boilerplate-gRPC-Codes.

Erstellen Sie zuerst das Arbeitsverzeichnis für das Codelab und wechseln Sie in dieses Verzeichnis:

mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started

Laden Sie das Codelab herunter und extrahieren Sie es:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-java-streaming/start_here

Alternativ können Sie die ZIP-Datei herunterladen, die nur das Codelab-Verzeichnis enthält, und sie manuell entpacken.

Der vollständige Quellcode ist auf GitHub verfügbar, wenn Sie die Eingabe einer Implementierung überspringen möchten.

3. Nachrichten und Dienste definieren

Als Erstes müssen Sie den gRPC-Dienst der Anwendung, die RPC-Methode sowie die Anfrage- und Antwortnachrichtentypen mit Protokollpuffern definieren. Ihr Dienst bietet Folgendes:

  • RPC-Methoden mit den Namen ListFeatures, RecordRoute und RouteChat, die der Server implementiert und der Client aufruft.
  • Die Nachrichtentypen Point, Feature, Rectangle, RouteNote und RouteSummary, die Datenstrukturen sind, die beim Aufrufen der oben genannten Methoden zwischen Client und Server ausgetauscht werden.

Protocol Buffers werden allgemein als Protobufs bezeichnet. Weitere Informationen zur gRPC-Terminologie finden Sie unter Core concepts, architecture, and lifecycle.

Diese RPC-Methode und ihre Nachrichtentypen werden alle in der Datei proto/routeguide/route_guide.proto des bereitgestellten Quellcodes definiert.

Erstellen wir eine route_guide.proto-Datei.

Da wir in diesem Beispiel Java-Code generieren, haben wir in unserer .proto die Dateioption java_package angegeben:

option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";

Nachrichtentypen definieren

Definieren Sie zuerst den Nachrichtentyp Point in der Datei proto/routeguide/route_guide.proto des Quellcodes. Ein Point stellt ein Paar aus Breiten- und Längengrad auf einer Karte dar. Verwenden Sie für dieses Codelab Ganzzahlen für die Koordinaten:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

Die Zahlen 1 und 2 sind eindeutige ID-Nummern für die einzelnen Felder in der message-Struktur.

Als Nächstes definieren Sie den Nachrichtentyp Feature. Bei einem Feature wird ein string-Feld für den Namen oder die Postanschrift von etwas an einem Standort verwendet, der durch ein Point angegeben wird:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

Damit mehrere Punkte in einem Gebiet an einen Client gestreamt werden können, benötigen Sie eine Rectangle-Nachricht, die ein Rechteck aus Breiten- und Längengrad darstellt, das durch zwei diagonal gegenüberliegende Punkte lo und hi dargestellt wird:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

Außerdem eine RouteNote-Nachricht, die eine Nachricht darstellt, die an einem bestimmten Punkt gesendet wurde:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

Schließlich benötigen Sie eine RouteSummary-Meldung. Diese Nachricht wird als Antwort auf einen RecordRoute-RPC empfangen, der im nächsten Abschnitt erläutert wird. Sie enthält die Anzahl der einzelnen empfangenen Punkte, die Anzahl der erkannten Features und die zurückgelegte Gesamtstrecke als kumulative Summe der Entfernung zwischen den einzelnen Punkten.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

Dienstmethoden definieren

Wenn Sie einen Dienst definieren möchten, geben Sie einen benannten Dienst in der Datei .proto an. Die Datei route_guide.proto hat eine service-Struktur mit dem Namen RouteGuide, die eine oder mehrere Methoden definiert, die vom Dienst der Anwendung bereitgestellt werden.

Wenn Sie RPC-Methoden in Ihrer Dienstdefinition definieren, geben Sie die zugehörigen Anfrage- und Antworttypen an. In diesem Abschnitt des Codelabs definieren wir Folgendes:

ListFeatures

Ruft die im angegebenen Rectangle verfügbaren Feature-Objekte ab. Die Ergebnisse werden gestreamt und nicht auf einmal zurückgegeben, da das Rechteck ein großes Gebiet abdecken und eine große Anzahl von Features enthalten kann.

Für diese Anwendung verwenden Sie einen serverseitigen Streaming-RPC: Der Client sendet eine Anfrage an den Server und erhält einen Stream, um eine Reihe von Nachrichten zurückzulesen. Der Client liest den zurückgegebenen Stream, bis keine Nachrichten mehr vorhanden sind. Wie Sie in unserem Beispiel sehen, geben Sie eine serverseitige Streaming-Methode an, indem Sie das Stream-Keyword vor den Antworttyp setzen.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

Akzeptiert einen Stream von Punkten auf einer zurückgelegten Route und gibt ein RouteSummary zurück, wenn die Route zurückgelegt wurde.

In diesem Fall ist ein clientseitiger Streaming-RPC angebracht: Der Client schreibt eine Reihe von Nachrichten und sendet sie an den Server, wiederum über einen bereitgestellten Stream. Nachdem der Client mit dem Schreiben der Nachrichten fertig ist, wartet er darauf, dass der Server sie alle liest und seine Antwort zurückgibt. Sie geben eine clientseitige Streamingmethode an, indem Sie das Stream-Keyword vor den Anfragetyp setzen.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

Akzeptiert einen Stream von RouteNotes, der während der Fahrt auf einer Route gesendet wird, und empfängt gleichzeitig andere RouteNotes (z.B. von anderen Nutzern).

Genau das ist ein Anwendungsfall für bidirektionales Streaming. Eine bidirektionale Streaming-RPC, bei der beide Seiten eine Reihe von Nachrichten über einen Lese-/Schreib-Stream senden. Die beiden Streams funktionieren unabhängig voneinander. Clients und Server können also in beliebiger Reihenfolge lesen und schreiben. Der Server kann beispielsweise warten, bis er alle Clientnachrichten empfangen hat, bevor er seine Antworten schreibt. Er kann aber auch abwechselnd eine Nachricht lesen und dann eine Nachricht schreiben oder eine andere Kombination aus Lese- und Schreibvorgängen ausführen. Die Reihenfolge der Nachrichten in jedem Stream wird beibehalten. Sie geben diese Art von Methode an, indem Sie das Stream-Keyword sowohl vor die Anfrage als auch vor die Antwort setzen.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. Client- und Servercode generieren

Als Nächstes müssen wir die gRPC-Client- und ‑Serverschnittstellen aus unserer .proto-Dienstdefinition generieren. Dazu verwenden wir den Protokollpuffer-Compiler protoc mit einem speziellen gRPC-Java-Plug-in. Sie müssen den proto3-Compiler (der sowohl die proto2- als auch die proto3-Syntax unterstützt) verwenden, um gRPC-Dienste zu generieren.

Wenn Sie Gradle oder Maven verwenden, kann das protoc-Build-Plug-in den erforderlichen Code im Rahmen des Builds generieren. In der README-Datei für grpc-java finden Sie Informationen dazu, wie Sie Code aus Ihren eigenen .proto-Dateien generieren.

Wir haben eine Gradle-Konfiguration bereitgestellt.

Geben Sie im Verzeichnis streaming-grpc-java-getting-started Folgendes ein:

$ chmod +x gradlew
$ ./gradlew generateProto

Die folgenden Klassen werden aus unserer Dienstdefinition (unter build/generated/sources/proto/main/java) generiert:

  • Einer für jeden Nachrichtentyp: Feature.java, Rectangle.java, .... Diese enthalten den gesamten Protokollzwischenspeicher-Code zum Einfügen, Serialisieren und Abrufen unserer Anfrage- und Antwortnachrichtentypen.
  • RouteGuideGrpc.java, die (zusammen mit anderem nützlichen Code) eine Basisklasse für die Implementierung von RouteGuide-Servern, RouteGuideGrpc.RouteGuideImplBase, mit allen im RouteGuide-Dienst definierten Methoden und Stub-Klassen für die Verwendung durch Clients enthält

5. Dienst implementieren

Sehen wir uns zuerst an, wie wir einen RouteGuide-Server erstellen. Damit unser RouteGuide-Dienst seine Aufgabe erfüllen kann, sind zwei Dinge erforderlich:

  • Implementieren der Dienstschnittstelle, die aus unserer Dienstdefinition generiert wurde: Hier wird die eigentliche „Arbeit“ unseres Dienstes erledigt.
  • Ausführen eines gRPC-Servers, der auf Anfragen von Clients wartet und sie an die richtige Dienstimplementierung weiterleitet.

RouteGuide implementieren

Wir implementieren eine RouteGuideService-Klasse, die die generierte RouteGuideGrpc.RouteGuideImplBase-Klasse erweitert. So würde die Implementierung aussehen.

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
        ...
}

public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {

        ...
}

public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {

        ...
}

Sehen wir uns die einzelnen RPC-Implementierungen genauer an.

Serverseitiger Streaming-RPC

Sehen wir uns als Nächstes einen unserer Streaming-RPCs an. ListFeatures ist ein serverseitiger Streaming-RPC, daher müssen wir mehrere Features an unseren Client zurücksenden.

private final Collection<Feature> features;

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

Wie beim einfachen RPC erhält diese Methode ein Anfrageobjekt (das Rectangle, in dem unser Kunde Features finden möchte) und einen StreamObserver-Antwort-Observer.

Dieses Mal rufen wir so viele Feature-Objekte ab, wie wir für die Rückgabe an den Client benötigen. In diesem Fall wählen wir sie aus der Feature-Sammlung des Dienstes aus, je nachdem, ob sie sich innerhalb unserer Anfrage Rectangle befinden. Anschließend schreiben wir sie nacheinander in den Antwort-Observer mit seiner onNext()-Methode. Schließlich verwenden wir wie bei unserem einfachen RPC die onCompleted()-Methode des Antwort-Observers, um gRPC mitzuteilen, dass wir mit dem Schreiben von Antworten fertig sind.

Clientseitiger Streaming-RPC

Sehen wir uns nun etwas Komplexeres an: die clientseitige Streamingmethode RecordRoute(), bei der wir einen Stream von Points vom Client erhalten und ein einzelnes RouteSummary mit Informationen zur Fahrt zurückgeben.

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

Wie Sie sehen, erhält unsere Methode wie die vorherigen Methodentypen einen StreamObserver-Parameter responseObserver, gibt aber diesmal einen StreamObserver zurück, damit der Client seinen Points schreiben kann.

Im Methodenkörper instanziieren wir ein anonymes StreamObserver, das zurückgegeben werden soll. Dabei gehen wir so vor:

  • Überschreiben Sie die Methode onNext(), um jedes Mal, wenn der Client Point in den Nachrichtenstream schreibt, Funktionen und andere Informationen zu erhalten.
  • Überschreiben Sie die Methode onCompleted() (wird aufgerufen, wenn der Client mit dem Schreiben von Nachrichten fertig ist), um unsere RouteSummary zu erstellen. Wir rufen dann die onNext()-Methode des eigenen Antwort-Observers unserer Methode mit unserem RouteSummary auf und rufen dann die onCompleted()-Methode auf, um den Aufruf serverseitig zu beenden.

Bidirektionale Streaming-RPC

Sehen wir uns zum Schluss noch unseren bidirektionalen Streaming-RPC RouteChat() an.

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

Wie bei unserem clientseitigen Streamingbeispiel erhalten und geben wir beide ein StreamObserver zurück. Diesmal geben wir jedoch Werte über den Antwort-Observer unserer Methode zurück, während der Client weiterhin Nachrichten in seinen Nachrichtenstream schreibt. Die Syntax zum Lesen und Schreiben ist hier genau dieselbe wie bei unseren Client-Streaming- und Server-Streaming-Methoden. Obwohl jede Seite die Nachrichten der anderen Seite immer in der Reihenfolge erhält, in der sie geschrieben wurden, können sowohl der Client als auch der Server in beliebiger Reihenfolge lesen und schreiben. Die Streams funktionieren völlig unabhängig voneinander.

Server starten

Nachdem wir alle unsere Methoden implementiert haben, müssen wir auch einen gRPC-Server starten, damit Clients unseren Dienst tatsächlich nutzen können. Das folgende Snippet zeigt, wie wir das für unseren RouteGuide-Dienst tun:

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
}

Wie Sie sehen, erstellen und starten wir unseren Server mit einem ServerBuilder.

Um das zu erreichen,

  1. Geben Sie mit der forPort()-Methode des Builders die Adresse und den Port an, die wir verwenden möchten, um auf Clientanfragen zu warten.
  2. Erstellen Sie eine Instanz unserer Dienstimplementierungsklasse RouteGuideService und übergeben Sie sie an die addService()-Methode des Builders.
  3. Rufen Sie build() und start() für den Builder auf, um einen RPC-Server für unseren Dienst zu erstellen und zu starten.

Da der Port bereits im ServerBuilder enthalten ist, übergeben wir ihn nur für die Protokollierung.

6. Client erstellen

In diesem Abschnitt sehen wir uns an, wie wir einen Client für unseren RouteGuide-Dienst erstellen. Unser vollständiger Beispielclientcode ist unter ../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java verfügbar.

Stub instanziieren

Um Dienstmethoden aufzurufen, müssen wir zuerst einen Stub erstellen, genauer gesagt zwei Stubs:

  • Ein blockierender/synchroner Stub: Das bedeutet, dass der RPC-Aufruf auf die Antwort des Servers wartet und entweder eine Antwort zurückgibt oder eine Ausnahme auslöst.
  • ein nicht blockierender/asynchroner Stub, der nicht blockierende Aufrufe an den Server ausführt, wobei die Antwort asynchron zurückgegeben wird. Bestimmte Arten von Streaminganrufen können nur über einen asynchronen Stub erfolgen.

Zuerst müssen wir einen gRPC-Kanal für unseren Stub erstellen und dabei die Serveradresse und den Port angeben, mit denen wir eine Verbindung herstellen möchten:

  public static void main(String[] args) throws InterruptedException {
    String target = "localhost:8980";
    if (args.length > 0) {
      if ("--help".equals(args[0])) {
        System.err.println("Usage: [target]");
        System.err.println("");
        System.err.println("  target  The server to connect to. Defaults to " + target);
        System.exit(1);
      }
      target = args[0];
    }

    List<Feature> features;
    try {
      features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
        .build();
    try {
      RouteGuideClient client = new RouteGuideClient(channel);

      // Looking for features between 40, -75 and 42, -73.
      client.listFeatures(400000000, -750000000, 420000000, -730000000);

      // Record a few randomly selected points from the features file.
      client.recordRoute(features, 10);

      // Send and receive some notes.
      CountDownLatch finishLatch = client.routeChat();

      if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        client.warning("routeChat did not finish within 1 minutes");
      }
    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }

Wir verwenden ein ManagedChannelBuilder, um den Channel zu erstellen.

Jetzt können wir den Channel verwenden, um unsere Stubs mit den Methoden newStub und newBlockingStub zu erstellen, die in der Klasse RouteGuideGrpc bereitgestellt werden, die wir aus unserem .proto generiert haben.

public RouteGuideClient(Channel channel) {
    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
  }

Denken Sie daran: Wenn es nicht blockiert, ist es asynchron.

Dienstmethoden aufrufen

Sehen wir uns nun an, wie wir unsere Dienstmethoden aufrufen. Alle RPCs, die über den blockierenden Stub erstellt werden, werden im blockierenden/synchronen Modus ausgeführt. Das bedeutet, dass der RPC-Aufruf auf die Antwort des Servers wartet und entweder eine Antwort oder einen Fehler zurückgibt.

Serverseitiger Streaming-RPC

Sehen wir uns als Nächstes einen serverseitigen Streamingaufruf an ListFeatures an, der einen Stream geografischer Feature zurückgibt:

Rectangle request = Rectangle.newBuilder()
             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

Wie Sie sehen, ähnelt sie dem einfachen unären RPC, den wir im Codelab Getting_Started_With_gRPC_Java betrachtet haben. Anstatt ein einzelnes Feature zurückzugeben, gibt die Methode jedoch ein Iterator zurück, mit dem der Client alle zurückgegebenen Features lesen kann.

Clientseitiger Streaming-RPC

Nun zu etwas Komplizierterem: der clientseitigen Streamingmethode RecordRoute, bei der wir einen Stream von Points an den Server senden und eine einzelne RouteSummary zurückerhalten. Für diese Methode müssen wir den asynchronen Stub verwenden. Wenn Sie bereits Server erstellen gelesen haben, kommt Ihnen einiges davon vielleicht bekannt vor. Asynchrone Streaming-RPCs werden auf beiden Seiten ähnlich implementiert.

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {

    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

Wie Sie sehen, müssen wir zum Aufrufen dieser Methode ein StreamObserver erstellen, das eine spezielle Schnittstelle implementiert, die der Server mit seiner RouteSummary-Antwort aufrufen kann. In unserem StreamObserver:

  • Überschreiben Sie die onNext()-Methode, um die zurückgegebenen Informationen auszugeben, wenn der Server ein RouteSummary in den Nachrichtenstream schreibt.
  • Überschreiben Sie die Methode onCompleted() (wird aufgerufen, wenn der Server den Aufruf auf seiner Seite abgeschlossen hat), um eine CountDownLatch zu reduzieren, damit wir prüfen können, ob der Server das Schreiben abgeschlossen hat.

Wir übergeben dann die StreamObserver an die recordRoute()-Methode des asynchronen Stubs und erhalten unseren eigenen StreamObserver-Anfrage-Observer zurück, um unsere Points zu schreiben, die an den Server gesendet werden soll. Nachdem wir die Punkte geschrieben haben, verwenden wir die Methode onCompleted() des Anforderungsbeobachters, um gRPC mitzuteilen, dass wir das Schreiben auf der Clientseite abgeschlossen haben. Wenn wir fertig sind, prüfen wir in unserem CountDownLatch, ob der Server seine Seite abgeschlossen hat.

Bidirektionale Streaming-RPC

Sehen wir uns zum Schluss noch unseren bidirektionalen Streaming-RPC RouteChat() an.

public CountDownLatch routeChat() {
    info("*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat(new StreamObserver<RouteNote>() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          }

          @Override
          public void onError(Throwable t) {
            warning("RouteChat Failed: {0}", Status.fromThrowable(t));
            finishLatch.countDown();
          }

          @Override
          public void onCompleted() {
            info("Finished RouteChat");
            finishLatch.countDown();
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
              newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // return the latch while receiving happens asynchronously
    return finishLatch;
  }

Wie bei unserem clientseitigen Streamingbeispiel erhalten und geben wir einen StreamObserver-Antwort-Observer zurück. Dieses Mal senden wir jedoch Werte über den Antwort-Observer unserer Methode, während der Server weiterhin Nachrichten in seinen Nachrichtenstream schreibt. Die Syntax zum Lesen und Schreiben ist hier genau dieselbe wie bei unserer Client-Streaming-Methode. Obwohl jede Seite die Nachrichten der anderen Seite immer in der Reihenfolge erhält, in der sie geschrieben wurden, können sowohl der Client als auch der Server in beliebiger Reihenfolge lesen und schreiben. Die Streams funktionieren völlig unabhängig voneinander.

7. Probier es gleich aus!

  1. Im Verzeichnis start_here:
$ ./gradlew installDist

Dadurch wird Ihr Code kompiliert, in einem JAR verpackt und die Skripts zum Ausführen des Beispiels werden erstellt. Sie werden im Verzeichnis build/install/start_here/bin/ erstellt. Die Skripts sind: route-guide-server und route-guide-client.

Der Server muss ausgeführt werden, bevor der Client gestartet wird.

  1. Führen Sie den Server aus:
$ ./build/install/start_here/bin/route-guide-server
  1. Führen Sie den Client aus:
$ ./build/install/start_here/bin/route-guide-client

8. Nächste Schritte