Introduzione a gRPC-Java - Streaming

1. Introduzione

In questo codelab, utilizzerai gRPC-Java per creare un client e un server che costituiscono la base di un'applicazione di mappatura di percorsi scritta in Java.

Al termine del tutorial, avrai un client che si connette a un server remoto utilizzando gRPC per ottenere informazioni sulle funzionalità di un percorso del client, creare un riepilogo di un percorso del client e scambiare informazioni sul percorso, come gli aggiornamenti sul traffico, con il server e altri client.

Il servizio è definito in un file Protocol Buffers, che verrà utilizzato per generare codice boilerplate per il client e il server in modo che possano comunicare tra loro, risparmiando tempo e fatica nell'implementazione di questa funzionalità.

Questo codice generato si occupa non solo delle complessità della comunicazione tra il server e il client, ma anche della serializzazione e della deserializzazione dei dati.

Obiettivi didattici

  • Come utilizzare i buffer di protocollo per definire un'API di servizio.
  • Come creare un client e un server basati su gRPC da una definizione di Protocol Buffers utilizzando la generazione automatica del codice.
  • Comprensione della comunicazione di streaming client-server con gRPC.

Questo codelab è rivolto agli sviluppatori Java che non hanno mai utilizzato gRPC o che vogliono ripassare le basi di gRPC, nonché a chiunque sia interessato a creare sistemi distribuiti. Non è richiesta alcuna esperienza precedente con gRPC.

2. Prima di iniziare

Prerequisiti

  • JDK versione 24.

Ottieni il codice

Per non dover iniziare da zero, questo codelab fornisce una struttura del codice sorgente dell'applicazione da completare. I passaggi seguenti mostrano come completare l'applicazione, incluso l'utilizzo dei plug-in del compilatore di protocol buffer per generare il codice gRPC boilerplate.

Innanzitutto, crea la directory di lavoro del codelab e accedi tramite cd:

mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started

Scarica ed estrai il codelab:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-java-streaming/start_here

In alternativa, puoi scaricare il file .zip contenente solo la directory del codelab e decomprimerlo manualmente.

Il codice sorgente completato è disponibile su GitHub se vuoi evitare di digitare un'implementazione.

3. Definisci messaggi e servizi

Il primo passaggio consiste nel definire il servizio gRPC dell'applicazione, il relativo metodo RPC e i tipi di messaggi di richiesta e risposta utilizzando Protocol Buffers. Il tuo servizio fornirà:

  • Metodi RPC chiamati ListFeatures, RecordRoute e RouteChat che il server implementa e il client chiama.
  • I tipi di messaggi Point, Feature, Rectangle, RouteNote e RouteSummary, che sono strutture di dati scambiate tra il client e il server quando vengono chiamati i metodi precedenti.

Protocol Buffers sono comunemente noti come protobuf. Per ulteriori informazioni sulla terminologia gRPC, consulta Concetti fondamentali, architettura e ciclo di vita di gRPC.

Questo metodo RPC e i relativi tipi di messaggio verranno definiti nel file proto/routeguide/route_guide.proto del codice sorgente fornito.

Creiamo un file route_guide.proto.

Poiché in questo esempio generiamo codice Java, abbiamo specificato un'opzione per il file java_package nel nostro .proto:

option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";

Definisci i tipi di messaggi

Nel file proto/routeguide/route_guide.proto del codice sorgente, definisci innanzitutto il tipo di messaggio Point. Un Point rappresenta una coppia di coordinate di latitudine e longitudine su una mappa. Per questo codelab, utilizza numeri interi per le coordinate:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

I numeri 1 e 2 sono numeri ID univoci per ciascuno dei campi nella struttura message.

Successivamente, definisci il tipo di messaggio Feature. Un Feature utilizza un campo string per il nome o l'indirizzo postale di un elemento in una località specificata da un Point:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

Affinché più punti all'interno di un'area possano essere trasmessi in streaming a un client, è necessario un messaggio Rectangle che rappresenti un rettangolo di latitudine e longitudine, rappresentato da due punti diagonalmente opposti lo e hi:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

Inoltre, un messaggio RouteNote che rappresenta un messaggio inviato in un determinato momento:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

Infine, ti servirà un messaggio RouteSummary. Questo messaggio viene ricevuto in risposta a una RPC RecordRoute, che viene spiegata nella sezione successiva. Contiene il numero di punti individuali ricevuti, il numero di caratteristiche rilevate e la distanza totale percorsa come somma cumulativa della distanza tra ogni punto.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

Definisci i metodi di servizio

Per definire un servizio, specifica un servizio denominato nel file .proto. Il file route_guide.proto ha una struttura service denominata RouteGuide che definisce uno o più metodi forniti dal servizio dell'applicazione.

Quando definisci i metodi RPC all'interno della definizione del servizio, specifichi i tipi di richiesta e risposta. In questa sezione del codelab, definiamo:

ListFeatures

Ottiene gli oggetti Feature disponibili all'interno del Rectangle specificato. I risultati vengono trasmessi in streaming anziché restituiti contemporaneamente, poiché il rettangolo potrebbe coprire un'area vasta e contenere un numero elevato di caratteristiche.

Per questa applicazione, utilizzerai una RPC di streaming lato server: il client invia una richiesta al server e riceve un flusso per leggere una sequenza di messaggi. Il client legge dallo stream restituito finché non ci sono più messaggi. Come puoi vedere nel nostro esempio, devi specificare un metodo di streaming lato server inserendo la parola chiave stream prima del tipo di risposta.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

Accetta un flusso di punti su un percorso in fase di attraversamento, restituendo un RouteSummary al termine dell'attraversamento.

In questo caso è appropriata una RPC di streaming lato client: il client scrive una sequenza di messaggi e li invia al server, sempre utilizzando un flusso fornito. Una volta che il client ha finito di scrivere i messaggi, attende che il server li legga tutti e restituisca la risposta. Specifichi un metodo di streaming lato client inserendo la parola chiave stream prima del tipo di richiesta.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

Accetta un flusso di RouteNotes inviati durante il percorso, mentre riceve altri RouteNotes (ad esempio da altri utenti).

Questo è esattamente il tipo di caso d'uso per lo streaming bidirezionale. Una RPC di streaming bidirezionale in cui entrambe le parti inviano una sequenza di messaggi utilizzando uno stream di lettura/scrittura. I due flussi operano in modo indipendente, quindi client e server possono leggere e scrivere nell'ordine che preferiscono: ad esempio, il server potrebbe attendere di ricevere tutti i messaggi del client prima di scrivere le risposte oppure potrebbe leggere un messaggio e poi scriverne un altro o una combinazione diversa di letture e scritture. L'ordine dei messaggi in ogni stream viene mantenuto. Specifichi questo tipo di metodo inserendo la parola chiave stream prima della richiesta e della risposta.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. Genera codice client e server

Successivamente, dobbiamo generare le interfacce client e server gRPC dalla definizione del servizio .proto. A questo scopo, utilizziamo il compilatore di protocol buffer protoc con un plug-in Java gRPC speciale. Per generare servizi gRPC, devi utilizzare il compilatore proto3 (che supporta sia la sintassi proto2 sia quella proto3).

Quando utilizzi Gradle o Maven, il plug-in di build protoc può generare il codice necessario come parte della build. Per informazioni su come generare codice dai tuoi file .proto, consulta il file README di grpc-java.

Abbiamo fornito la configurazione Gradle.

Dalla directory streaming-grpc-java-getting-started inserisci

$ chmod +x gradlew
$ ./gradlew generateProto

Le seguenti classi vengono generate dalla nostra definizione di servizio (in build/generated/sources/proto/main/java):

  • Uno per ogni tipo di messaggio: Feature.java, Rectangle.java, ..., che contengono tutto il codice del buffer di protocollo per compilare, serializzare e recuperare i nostri tipi di messaggi di richiesta e risposta.
  • RouteGuideGrpc.java che contiene (insieme ad altro codice utile) una classe base per l'implementazione dei server RouteGuide, RouteGuideGrpc.RouteGuideImplBase, con tutti i metodi definiti nel servizio RouteGuide e classi stub da utilizzare per i client

5. Implementare il servizio

Per prima cosa, vediamo come creare un server RouteGuide. Il funzionamento del nostro servizio RouteGuide si basa su due elementi:

  • Implementazione dell'interfaccia di servizio generata dalla nostra definizione di servizio: esecuzione del "lavoro" effettivo del nostro servizio.
  • Esecuzione di un server gRPC per ascoltare le richieste dei client e inviarle all'implementazione del servizio corretta.

Implementa RouteGuide

Implementeremo una classe RouteGuideService che estenderà la classe RouteGuideGrpc.RouteGuideImplBase generata. Ecco come apparirebbe l'implementazione.

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
        ...
}

public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {

        ...
}

public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {

        ...
}

Esaminiamo in dettaglio ogni implementazione RPC

RPC di streaming lato server

Ora esaminiamo una delle nostre RPC di streaming. ListFeatures è un RPC di streaming lato server, quindi dobbiamo inviare più Features al nostro client.

private final Collection<Feature> features;

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

Come la semplice RPC, questo metodo riceve un oggetto richiesta (il Rectangle in cui il nostro client vuole trovare Features) e un osservatore di risposte StreamObserver.

Questa volta, otteniamo tutti gli oggetti Feature necessari per restituirli al client (in questo caso, li selezioniamo dalla raccolta di funzionalità del servizio in base al fatto che si trovino all'interno della nostra richiesta Rectangle) e li scriviamo a turno nell'observer di risposta utilizzando il metodo onNext(). Infine, come nella nostra semplice RPC, utilizziamo il metodo onCompleted() dell'observer della risposta per comunicare a gRPC che abbiamo terminato di scrivere le risposte.

RPC di streaming lato client

Ora esaminiamo qualcosa di un po' più complicato: il metodo di streaming lato client RecordRoute(), in cui riceviamo un flusso di Points dal client e restituiamo un singolo RouteSummary con informazioni sul viaggio.

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

Come puoi vedere, come i tipi di metodi precedenti, il nostro metodo riceve un parametro StreamObserver responseObserver, ma questa volta restituisce un StreamObserver per consentire al client di scrivere il proprio Points.

Nel corpo del metodo creiamo un oggetto StreamObserver anonimo da restituire, in cui:

  • Esegui l'override del metodo onNext() per ottenere funzionalità e altre informazioni ogni volta che il client scrive un Point nel flusso di messaggi.
  • Esegui l'override del metodo onCompleted() (chiamato al termine della scrittura dei messaggi da parte del client) per compilare e creare RouteSummary. Quindi chiamiamo onNext() dell'observer di risposta del nostro metodo con il nostro RouteSummary e poi chiamiamo il suo metodo onCompleted() per completare la chiamata dal lato server.

RPC di streaming bidirezionale

Infine, diamo un'occhiata alla nostra RPC di streaming bidirezionale RouteChat().

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

Come nel nostro esempio di streaming lato client, riceviamo e restituiamo un StreamObserver, tranne che questa volta restituiamo i valori tramite l'observer di risposta del nostro metodo mentre il client sta ancora scrivendo messaggi nel suo flusso di messaggi. La sintassi per la lettura e la scrittura qui è esattamente la stessa dei nostri metodi di streaming lato client e lato server. Sebbene ogni parte riceva sempre i messaggi dell'altra nell'ordine in cui sono stati scritti, sia il client che il server possono leggere e scrivere in qualsiasi ordine. I flussi operano in modo completamente indipendente.

Avvia il server

Una volta implementati tutti i nostri metodi, dobbiamo anche avviare un server gRPC in modo che i client possano effettivamente utilizzare il nostro servizio. Il seguente snippet mostra come lo facciamo per il nostro servizio RouteGuide:

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
}

Come puoi vedere, creiamo e avviamo il nostro server utilizzando un ServerBuilder.

Per fare questo:

  1. Specifica l'indirizzo e la porta che vogliamo utilizzare per ascoltare le richieste del client utilizzando il metodo forPort() del builder.
  2. Crea un'istanza della nostra classe di implementazione del servizio RouteGuideService e passala al metodo addService() del builder.
  3. Chiama build() e start() sul builder per creare e avviare un server RPC per il nostro servizio.

Poiché ServerBuilder incorpora già la porta, l'unico motivo per cui passiamo una porta è utilizzarla per la registrazione.

6. Crea il client

In questa sezione vedremo come creare un client per il nostro servizio RouteGuide. Puoi visualizzare il nostro codice client di esempio completo in ../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java.

Creare un stub

Per chiamare i metodi di servizio, dobbiamo prima creare uno stub, o meglio, due stub:

  • uno stub bloccante/sincrono: ciò significa che la chiamata RPC attende la risposta del server e restituisce una risposta o genera un'eccezione.
  • uno stub non bloccante/asincrono che effettua chiamate non bloccanti al server, dove la risposta viene restituita in modo asincrono. Puoi effettuare determinati tipi di chiamate in streaming solo utilizzando uno stub asincrono.

Per prima cosa, dobbiamo creare un canale gRPC per il nostro stub, specificando l'indirizzo e la porta del server a cui vogliamo connetterci:

  public static void main(String[] args) throws InterruptedException {
    String target = "localhost:8980";
    if (args.length > 0) {
      if ("--help".equals(args[0])) {
        System.err.println("Usage: [target]");
        System.err.println("");
        System.err.println("  target  The server to connect to. Defaults to " + target);
        System.exit(1);
      }
      target = args[0];
    }

    List<Feature> features;
    try {
      features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
        .build();
    try {
      RouteGuideClient client = new RouteGuideClient(channel);

      // Looking for features between 40, -75 and 42, -73.
      client.listFeatures(400000000, -750000000, 420000000, -730000000);

      // Record a few randomly selected points from the features file.
      client.recordRoute(features, 10);

      // Send and receive some notes.
      CountDownLatch finishLatch = client.routeChat();

      if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        client.warning("routeChat did not finish within 1 minutes");
      }
    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }

Utilizziamo un ManagedChannelBuilder per creare il canale.

Ora possiamo utilizzare il canale per creare i nostri stub utilizzando i metodi newStub e newBlockingStub forniti nella classe RouteGuideGrpc generata dal nostro .proto.

public RouteGuideClient(Channel channel) {
    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
  }

Ricorda che se non è bloccante, è asincrono

Metodi di servizio di chiamata

Ora vediamo come chiamiamo i metodi del servizio. Tieni presente che tutte le RPC create dallo stub di blocco funzioneranno in modalità di blocco/sincrona, il che significa che la chiamata RPC attende la risposta del server e restituirà una risposta o un errore.

RPC di streaming lato server

Ora esaminiamo una chiamata di streaming lato server a ListFeatures, che restituisce un flusso di Feature geografici:

Rectangle request = Rectangle.newBuilder()
             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

Come puoi vedere, è molto simile al semplice RPC unario che abbiamo esaminato nel codelab Getting_Started_With_gRPC_Java, tranne che invece di restituire un singolo Feature, il metodo restituisce un Iterator che il client può utilizzare per leggere tutti i Features restituiti.

RPC di streaming lato client

Ora passiamo a qualcosa di un po' più complicato: il metodo di streaming lato client RecordRoute, in cui inviamo un flusso di Points al server e riceviamo un singolo RouteSummary. Per questo metodo dobbiamo utilizzare lo stub asincrono. Se hai già letto Creazione del server, alcune parti ti sembreranno molto familiari: le RPC di streaming asincrono vengono implementate in modo simile su entrambi i lati.

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {

    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

Come puoi vedere, per chiamare questo metodo dobbiamo creare un StreamObserver, che implementa un'interfaccia speciale per la chiamata del server con la sua risposta RouteSummary. Nel nostro StreamObserver:

  • Esegui l'override del metodo onNext() per stampare le informazioni restituite quando il server scrive un RouteSummary nel flusso di messaggi.
  • Esegui l'override del metodo onCompleted() (chiamato quando il server ha completato la chiamata sul suo lato) per ridurre un CountDownLatch in modo da poter verificare se il server ha terminato la scrittura.

Quindi, passiamo StreamObserver al metodo recordRoute() dello stub asincrono e recuperiamo il nostro osservatore di richieste StreamObserver per scrivere Points da inviare al server. Una volta terminata la scrittura dei punti, utilizziamo il metodo onCompleted() dell'osservatore delle richieste per comunicare a gRPC che abbiamo terminato la scrittura sul lato client. Al termine, controlliamo il nostro CountDownLatch per vedere se il server ha completato l'operazione.

RPC di streaming bidirezionale

Infine, diamo un'occhiata alla nostra RPC di streaming bidirezionale RouteChat().

public CountDownLatch routeChat() {
    info("*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat(new StreamObserver<RouteNote>() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          }

          @Override
          public void onError(Throwable t) {
            warning("RouteChat Failed: {0}", Status.fromThrowable(t));
            finishLatch.countDown();
          }

          @Override
          public void onCompleted() {
            info("Finished RouteChat");
            finishLatch.countDown();
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
              newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // return the latch while receiving happens asynchronously
    return finishLatch;
  }

Come nell'esempio di streaming lato client, otteniamo e restituiamo un observer di risposta StreamObserver, tranne che questa volta inviamo i valori tramite l'observer di risposta del nostro metodo mentre il server sta ancora scrivendo messaggi nel suo flusso di messaggi. La sintassi per la lettura e la scrittura qui è esattamente la stessa del nostro metodo di streaming lato client. Sebbene ogni parte riceva sempre i messaggi dell'altra nell'ordine in cui sono stati scritti, sia il client che il server possono leggere e scrivere in qualsiasi ordine. I flussi operano in modo completamente indipendente.

7. Prova

  1. Dalla directory start_here:
$ ./gradlew installDist

In questo modo, il codice verrà compilato, inserito in un file JAR e verranno creati gli script che eseguono l'esempio. Verranno creati nella directory build/install/start_here/bin/. I copioni sono: route-guide-server e route-guide-client.

Il server deve essere in esecuzione prima di avviare il client.

  1. Esegui il server:
$ ./build/install/start_here/bin/route-guide-server
  1. Esegui il client:
$ ./build/install/start_here/bin/route-guide-client

8. Passaggi successivi