1. Einführung
In diesem Codelab verwenden Sie gRPC-Java, um einen Client und einen Server zu erstellen, die die Grundlage einer in Java geschriebenen Routenplanungsanwendung bilden.
Am Ende des Tutorials haben Sie einen Client, der über gRPC eine Verbindung zu einem Remote-Server herstellt, um Informationen zu Funktionen auf der Route eines Clients abzurufen, eine Zusammenfassung der Route eines Clients zu erstellen und Routeninformationen wie Verkehrsaktualisierungen mit dem Server und anderen Clients auszutauschen.
Der Dienst wird in einer Protocol Buffers-Datei definiert, die zum Generieren von Boilerplate-Code für den Client und den Server verwendet wird, damit sie miteinander kommunizieren können. So sparen Sie Zeit und Aufwand bei der Implementierung dieser Funktion.
Dieser generierte Code kümmert sich nicht nur um die Komplexität der Kommunikation zwischen Server und Client, sondern auch um die Serialisierung und Deserialisierung von Daten.
Lerninhalte
- Wie Sie Protocol Buffers zum Definieren einer Dienst-API verwenden.
- Hier erfahren Sie, wie Sie einen gRPC-basierten Client und Server aus einer Protocol Buffers-Definition mithilfe der automatischen Codegenerierung erstellen.
- Sie haben ein grundlegendes Verständnis der Client-Server-Streaming-Kommunikation mit gRPC.
Dieses Codelab richtet sich an Java-Entwickler, die neu in gRPC sind oder ihr Wissen zu gRPC auffrischen möchten, sowie an alle anderen, die sich für die Entwicklung verteilter Systeme interessieren. Es sind keine Vorkenntnisse in gRPC erforderlich.
2. Hinweis
Vorbereitung
- JDK-Version 24.
Code abrufen
Damit Sie nicht ganz von vorn anfangen müssen, enthält dieses Codelab ein Gerüst des Quellcodes der Anwendung, das Sie vervollständigen können. In den folgenden Schritten erfahren Sie, wie Sie die Anwendung fertigstellen, einschließlich der Verwendung der Protocol Buffer-Compiler-Plug-ins zum Generieren des Boilerplate-gRPC-Codes.
Erstellen Sie zuerst das Arbeitsverzeichnis für das Codelab und wechseln Sie in dieses Verzeichnis:
mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started
Laden Sie das Codelab herunter und extrahieren Sie es:
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
| tar xvz --strip-components=4 \
grpc-codelabs-1/codelabs/grpc-java-streaming/start_here
Alternativ können Sie die ZIP-Datei herunterladen, die nur das Codelab-Verzeichnis enthält, und sie manuell entpacken.
Der vollständige Quellcode ist auf GitHub verfügbar, wenn Sie die Eingabe einer Implementierung überspringen möchten.
3. Nachrichten und Dienste definieren
Als Erstes müssen Sie den gRPC-Dienst der Anwendung, die RPC-Methode sowie die Anfrage- und Antwortnachrichtentypen mit Protokollpuffern definieren. Ihr Dienst bietet Folgendes:
- RPC-Methoden mit den Namen
ListFeatures
,RecordRoute
undRouteChat
, die der Server implementiert und der Client aufruft. - Die Nachrichtentypen
Point
,Feature
,Rectangle
,RouteNote
undRouteSummary
, die Datenstrukturen sind, die beim Aufrufen der oben genannten Methoden zwischen Client und Server ausgetauscht werden.
Protocol Buffers werden allgemein als Protobufs bezeichnet. Weitere Informationen zur gRPC-Terminologie finden Sie unter Core concepts, architecture, and lifecycle.
Diese RPC-Methode und ihre Nachrichtentypen werden alle in der Datei proto/routeguide/route_guide.proto
des bereitgestellten Quellcodes definiert.
Erstellen wir eine route_guide.proto
-Datei.
Da wir in diesem Beispiel Java-Code generieren, haben wir in unserer .proto
die Dateioption java_package
angegeben:
option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";
Nachrichtentypen definieren
Definieren Sie zuerst den Nachrichtentyp Point
in der Datei proto/routeguide/route_guide.proto
des Quellcodes. Ein Point
stellt ein Paar aus Breiten- und Längengrad auf einer Karte dar. Verwenden Sie für dieses Codelab Ganzzahlen für die Koordinaten:
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
Die Zahlen 1
und 2
sind eindeutige ID-Nummern für die einzelnen Felder in der message
-Struktur.
Als Nächstes definieren Sie den Nachrichtentyp Feature
. Bei einem Feature
wird ein string
-Feld für den Namen oder die Postanschrift von etwas an einem Standort verwendet, der durch ein Point
angegeben wird:
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
Damit mehrere Punkte in einem Gebiet an einen Client gestreamt werden können, benötigen Sie eine Rectangle
-Nachricht, die ein Rechteck aus Breiten- und Längengrad darstellt, das durch zwei diagonal gegenüberliegende Punkte lo
und hi
dargestellt wird:
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
Außerdem eine RouteNote
-Nachricht, die eine Nachricht darstellt, die an einem bestimmten Punkt gesendet wurde:
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
Schließlich benötigen Sie eine RouteSummary
-Meldung. Diese Nachricht wird als Antwort auf einen RecordRoute
-RPC empfangen, der im nächsten Abschnitt erläutert wird. Sie enthält die Anzahl der einzelnen empfangenen Punkte, die Anzahl der erkannten Features und die zurückgelegte Gesamtstrecke als kumulative Summe der Entfernung zwischen den einzelnen Punkten.
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
Dienstmethoden definieren
Wenn Sie einen Dienst definieren möchten, geben Sie einen benannten Dienst in der Datei .proto
an. Die Datei route_guide.proto
hat eine service
-Struktur mit dem Namen RouteGuide
, die eine oder mehrere Methoden definiert, die vom Dienst der Anwendung bereitgestellt werden.
Wenn Sie RPC
-Methoden in Ihrer Dienstdefinition definieren, geben Sie die zugehörigen Anfrage- und Antworttypen an. In diesem Abschnitt des Codelabs definieren wir Folgendes:
ListFeatures
Ruft die im angegebenen Rectangle
verfügbaren Feature
-Objekte ab. Die Ergebnisse werden gestreamt und nicht auf einmal zurückgegeben, da das Rechteck ein großes Gebiet abdecken und eine große Anzahl von Features enthalten kann.
Für diese Anwendung verwenden Sie einen serverseitigen Streaming-RPC: Der Client sendet eine Anfrage an den Server und erhält einen Stream, um eine Reihe von Nachrichten zurückzulesen. Der Client liest den zurückgegebenen Stream, bis keine Nachrichten mehr vorhanden sind. Wie Sie in unserem Beispiel sehen, geben Sie eine serverseitige Streaming-Methode an, indem Sie das Stream-Keyword vor den Antworttyp setzen.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
Akzeptiert einen Stream von Punkten auf einer zurückgelegten Route und gibt ein RouteSummary
zurück, wenn die Route zurückgelegt wurde.
In diesem Fall ist ein clientseitiger Streaming-RPC angebracht: Der Client schreibt eine Reihe von Nachrichten und sendet sie an den Server, wiederum über einen bereitgestellten Stream. Nachdem der Client mit dem Schreiben der Nachrichten fertig ist, wartet er darauf, dass der Server sie alle liest und seine Antwort zurückgibt. Sie geben eine clientseitige Streamingmethode an, indem Sie das Stream-Keyword vor den Anfragetyp setzen.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
Akzeptiert einen Stream von RouteNotes
, der während der Fahrt auf einer Route gesendet wird, und empfängt gleichzeitig andere RouteNotes
(z.B. von anderen Nutzern).
Genau das ist ein Anwendungsfall für bidirektionales Streaming. Eine bidirektionale Streaming-RPC, bei der beide Seiten eine Reihe von Nachrichten über einen Lese-/Schreib-Stream senden. Die beiden Streams funktionieren unabhängig voneinander. Clients und Server können also in beliebiger Reihenfolge lesen und schreiben. Der Server kann beispielsweise warten, bis er alle Clientnachrichten empfangen hat, bevor er seine Antworten schreibt. Er kann aber auch abwechselnd eine Nachricht lesen und dann eine Nachricht schreiben oder eine andere Kombination aus Lese- und Schreibvorgängen ausführen. Die Reihenfolge der Nachrichten in jedem Stream wird beibehalten. Sie geben diese Art von Methode an, indem Sie das Stream-Keyword sowohl vor die Anfrage als auch vor die Antwort setzen.
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. Client- und Servercode generieren
Als Nächstes müssen wir die gRPC-Client- und ‑Serverschnittstellen aus unserer .proto
-Dienstdefinition generieren. Dazu verwenden wir den Protokollpuffer-Compiler protoc
mit einem speziellen gRPC-Java-Plug-in. Sie müssen den proto3-Compiler (der sowohl die proto2- als auch die proto3-Syntax unterstützt) verwenden, um gRPC-Dienste zu generieren.
Wenn Sie Gradle oder Maven verwenden, kann das protoc-Build-Plug-in den erforderlichen Code im Rahmen des Builds generieren. In der README-Datei für grpc-java finden Sie Informationen dazu, wie Sie Code aus Ihren eigenen .proto
-Dateien generieren.
Wir haben eine Gradle-Konfiguration bereitgestellt.
Geben Sie im Verzeichnis streaming-grpc-java-getting-started
Folgendes ein:
$ chmod +x gradlew $ ./gradlew generateProto
Die folgenden Klassen werden aus unserer Dienstdefinition (unter build/generated/sources/proto/main/java
) generiert:
- Einer für jeden Nachrichtentyp:
Feature.java
,Rectangle.java, ...
. Diese enthalten den gesamten Protokollzwischenspeicher-Code zum Einfügen, Serialisieren und Abrufen unserer Anfrage- und Antwortnachrichtentypen. RouteGuideGrpc.java
, die (zusammen mit anderem nützlichen Code) eine Basisklasse für die Implementierung vonRouteGuide
-Servern,RouteGuideGrpc.RouteGuideImplBase
, mit allen imRouteGuide
-Dienst definierten Methoden und Stub-Klassen für die Verwendung durch Clients enthält
5. Dienst implementieren
Sehen wir uns zuerst an, wie wir einen RouteGuide
-Server erstellen. Damit unser RouteGuide
-Dienst seine Aufgabe erfüllen kann, sind zwei Dinge erforderlich:
- Implementieren der Dienstschnittstelle, die aus unserer Dienstdefinition generiert wurde: Hier wird die eigentliche „Arbeit“ unseres Dienstes erledigt.
- Ausführen eines gRPC-Servers, der auf Anfragen von Clients wartet und sie an die richtige Dienstimplementierung weiterleitet.
RouteGuide implementieren
Wir implementieren eine RouteGuideService
-Klasse, die die generierte RouteGuideGrpc.RouteGuideImplBase-Klasse erweitert. So würde die Implementierung aussehen.
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
...
}
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
...
}
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
...
}
Sehen wir uns die einzelnen RPC-Implementierungen genauer an.
Serverseitiger Streaming-RPC
Sehen wir uns als Nächstes einen unserer Streaming-RPCs an. ListFeatures
ist ein serverseitiger Streaming-RPC, daher müssen wir mehrere Features
an unseren Client zurücksenden.
private final Collection<Feature> features;
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
Wie beim einfachen RPC erhält diese Methode ein Anfrageobjekt (das Rectangle
, in dem unser Kunde Features
finden möchte) und einen StreamObserver
-Antwort-Observer.
Dieses Mal rufen wir so viele Feature
-Objekte ab, wie wir für die Rückgabe an den Client benötigen. In diesem Fall wählen wir sie aus der Feature-Sammlung des Dienstes aus, je nachdem, ob sie sich innerhalb unserer Anfrage Rectangle
befinden. Anschließend schreiben wir sie nacheinander in den Antwort-Observer mit seiner onNext()
-Methode. Schließlich verwenden wir wie bei unserem einfachen RPC die onCompleted()
-Methode des Antwort-Observers, um gRPC mitzuteilen, dass wir mit dem Schreiben von Antworten fertig sind.
Clientseitiger Streaming-RPC
Sehen wir uns nun etwas Komplexeres an: die clientseitige Streamingmethode RecordRoute()
, bei der wir einen Stream von Points
vom Client erhalten und ein einzelnes RouteSummary
mit Informationen zur Fahrt zurückgeben.
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
Wie Sie sehen, erhält unsere Methode wie die vorherigen Methodentypen einen StreamObserver
-Parameter responseObserver
, gibt aber diesmal einen StreamObserver
zurück, damit der Client seinen Points
schreiben kann.
Im Methodenkörper instanziieren wir ein anonymes StreamObserver
, das zurückgegeben werden soll. Dabei gehen wir so vor:
- Überschreiben Sie die Methode
onNext()
, um jedes Mal, wenn der ClientPoint
in den Nachrichtenstream schreibt, Funktionen und andere Informationen zu erhalten. - Überschreiben Sie die Methode
onCompleted()
(wird aufgerufen, wenn der Client mit dem Schreiben von Nachrichten fertig ist), um unsereRouteSummary
zu erstellen. Wir rufen dann dieonNext()
-Methode des eigenen Antwort-Observers unserer Methode mit unseremRouteSummary
auf und rufen dann dieonCompleted()
-Methode auf, um den Aufruf serverseitig zu beenden.
Bidirektionale Streaming-RPC
Sehen wir uns zum Schluss noch unseren bidirektionalen Streaming-RPC RouteChat()
an.
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
Wie bei unserem clientseitigen Streamingbeispiel erhalten und geben wir beide ein StreamObserver
zurück. Diesmal geben wir jedoch Werte über den Antwort-Observer unserer Methode zurück, während der Client weiterhin Nachrichten in seinen Nachrichtenstream schreibt. Die Syntax zum Lesen und Schreiben ist hier genau dieselbe wie bei unseren Client-Streaming- und Server-Streaming-Methoden. Obwohl jede Seite die Nachrichten der anderen Seite immer in der Reihenfolge erhält, in der sie geschrieben wurden, können sowohl der Client als auch der Server in beliebiger Reihenfolge lesen und schreiben. Die Streams funktionieren völlig unabhängig voneinander.
Server starten
Nachdem wir alle unsere Methoden implementiert haben, müssen wir auch einen gRPC-Server starten, damit Clients unseren Dienst tatsächlich nutzen können. Das folgende Snippet zeigt, wie wir das für unseren RouteGuide
-Dienst tun:
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}
/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
}
Wie Sie sehen, erstellen und starten wir unseren Server mit einem ServerBuilder
.
Um das zu erreichen,
- Geben Sie mit der
forPort()
-Methode des Builders die Adresse und den Port an, die wir verwenden möchten, um auf Clientanfragen zu warten. - Erstellen Sie eine Instanz unserer Dienstimplementierungsklasse
RouteGuideService
und übergeben Sie sie an dieaddService()
-Methode des Builders. - Rufen Sie
build()
undstart()
für den Builder auf, um einen RPC-Server für unseren Dienst zu erstellen und zu starten.
Da der Port bereits im ServerBuilder enthalten ist, übergeben wir ihn nur für die Protokollierung.
6. Client erstellen
In diesem Abschnitt sehen wir uns an, wie wir einen Client für unseren RouteGuide
-Dienst erstellen. Unser vollständiger Beispielclientcode ist unter ../complete/src/main/java/io/grpc/complete/routeguide/
RouteGuideClient.java
verfügbar.
Stub instanziieren
Um Dienstmethoden aufzurufen, müssen wir zuerst einen Stub erstellen, genauer gesagt zwei Stubs:
- Ein blockierender/synchroner Stub: Das bedeutet, dass der RPC-Aufruf auf die Antwort des Servers wartet und entweder eine Antwort zurückgibt oder eine Ausnahme auslöst.
- ein nicht blockierender/asynchroner Stub, der nicht blockierende Aufrufe an den Server ausführt, wobei die Antwort asynchron zurückgegeben wird. Bestimmte Arten von Streaminganrufen können nur über einen asynchronen Stub erfolgen.
Zuerst müssen wir einen gRPC-Kanal für unseren Stub erstellen und dabei die Serveradresse und den Port angeben, mit denen wir eine Verbindung herstellen möchten:
public static void main(String[] args) throws InterruptedException {
String target = "localhost:8980";
if (args.length > 0) {
if ("--help".equals(args[0])) {
System.err.println("Usage: [target]");
System.err.println("");
System.err.println(" target The server to connect to. Defaults to " + target);
System.exit(1);
}
target = args[0];
}
List<Feature> features;
try {
features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
} catch (IOException ex) {
ex.printStackTrace();
return;
}
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
.build();
try {
RouteGuideClient client = new RouteGuideClient(channel);
// Looking for features between 40, -75 and 42, -73.
client.listFeatures(400000000, -750000000, 420000000, -730000000);
// Record a few randomly selected points from the features file.
client.recordRoute(features, 10);
// Send and receive some notes.
CountDownLatch finishLatch = client.routeChat();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
client.warning("routeChat did not finish within 1 minutes");
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
Wir verwenden ein ManagedChannelBuilder
, um den Channel zu erstellen.
Jetzt können wir den Channel verwenden, um unsere Stubs mit den Methoden newStub
und newBlockingStub
zu erstellen, die in der Klasse RouteGuideGrpc
bereitgestellt werden, die wir aus unserem .proto
generiert haben.
public RouteGuideClient(Channel channel) {
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
Denken Sie daran: Wenn es nicht blockiert, ist es asynchron.
Dienstmethoden aufrufen
Sehen wir uns nun an, wie wir unsere Dienstmethoden aufrufen. Alle RPCs, die über den blockierenden Stub erstellt werden, werden im blockierenden/synchronen Modus ausgeführt. Das bedeutet, dass der RPC-Aufruf auf die Antwort des Servers wartet und entweder eine Antwort oder einen Fehler zurückgibt.
Serverseitiger Streaming-RPC
Sehen wir uns als Nächstes einen serverseitigen Streamingaufruf an ListFeatures
an, der einen Stream geografischer Feature
zurückgibt:
Rectangle request = Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
Wie Sie sehen, ähnelt sie dem einfachen unären RPC, den wir im Codelab Getting_Started_With_gRPC_Java betrachtet haben. Anstatt ein einzelnes Feature
zurückzugeben, gibt die Methode jedoch ein Iterator
zurück, mit dem der Client alle zurückgegebenen Features
lesen kann.
Clientseitiger Streaming-RPC
Nun zu etwas Komplizierterem: der clientseitigen Streamingmethode RecordRoute
, bei der wir einen Stream von Points
an den Server senden und eine einzelne RouteSummary
zurückerhalten. Für diese Methode müssen wir den asynchronen Stub verwenden. Wenn Sie bereits Server erstellen gelesen haben, kommt Ihnen einiges davon vielleicht bekannt vor. Asynchrone Streaming-RPCs werden auf beiden Seiten ähnlich implementiert.
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
Wie Sie sehen, müssen wir zum Aufrufen dieser Methode ein StreamObserver
erstellen, das eine spezielle Schnittstelle implementiert, die der Server mit seiner RouteSummary
-Antwort aufrufen kann. In unserem StreamObserver
:
- Überschreiben Sie die
onNext()
-Methode, um die zurückgegebenen Informationen auszugeben, wenn der Server einRouteSummary
in den Nachrichtenstream schreibt. - Überschreiben Sie die Methode
onCompleted()
(wird aufgerufen, wenn der Server den Aufruf auf seiner Seite abgeschlossen hat), um eineCountDownLatch
zu reduzieren, damit wir prüfen können, ob der Server das Schreiben abgeschlossen hat.
Wir übergeben dann die StreamObserver
an die recordRoute()
-Methode des asynchronen Stubs und erhalten unseren eigenen StreamObserver
-Anfrage-Observer zurück, um unsere Points
zu schreiben, die an den Server gesendet werden soll. Nachdem wir die Punkte geschrieben haben, verwenden wir die Methode onCompleted()
des Anforderungsbeobachters, um gRPC mitzuteilen, dass wir das Schreiben auf der Clientseite abgeschlossen haben. Wenn wir fertig sind, prüfen wir in unserem CountDownLatch
, ob der Server seine Seite abgeschlossen hat.
Bidirektionale Streaming-RPC
Sehen wir uns zum Schluss noch unseren bidirektionalen Streaming-RPC RouteChat()
an.
public CountDownLatch routeChat() {
info("*** RouteChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@Override
public void onError(Throwable t) {
warning("RouteChat Failed: {0}", Status.fromThrowable(t));
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// return the latch while receiving happens asynchronously
return finishLatch;
}
Wie bei unserem clientseitigen Streamingbeispiel erhalten und geben wir einen StreamObserver
-Antwort-Observer zurück. Dieses Mal senden wir jedoch Werte über den Antwort-Observer unserer Methode, während der Server weiterhin Nachrichten in seinen Nachrichtenstream schreibt. Die Syntax zum Lesen und Schreiben ist hier genau dieselbe wie bei unserer Client-Streaming-Methode. Obwohl jede Seite die Nachrichten der anderen Seite immer in der Reihenfolge erhält, in der sie geschrieben wurden, können sowohl der Client als auch der Server in beliebiger Reihenfolge lesen und schreiben. Die Streams funktionieren völlig unabhängig voneinander.
7. Probier es gleich aus!
- Im Verzeichnis
start_here
:
$ ./gradlew installDist
Dadurch wird Ihr Code kompiliert, in einem JAR verpackt und die Skripts zum Ausführen des Beispiels werden erstellt. Sie werden im Verzeichnis build/install/start_here/bin/
erstellt. Die Skripts sind: route-guide-server
und route-guide-client
.
Der Server muss ausgeführt werden, bevor der Client gestartet wird.
- Führen Sie den Server aus:
$ ./build/install/start_here/bin/route-guide-server
- Führen Sie den Client aus:
$ ./build/install/start_here/bin/route-guide-client
8. Nächste Schritte
- Informationen zur Funktionsweise von gRPC finden Sie unter Einführung in gRPC und Zentrale Konzepte.
- Grundlagen-Anleitung durcharbeiten
- API-Referenz