Premiers pas avec gRPC-Java : streaming

1. Introduction

Dans cet atelier de programmation, vous allez utiliser gRPC-Java pour créer un client et un serveur qui constituent la base d'une application de cartographie d'itinéraires écrite en Java.

À la fin du tutoriel, vous disposerez d'un client qui se connecte à un serveur distant à l'aide de gRPC pour obtenir des informations sur les caractéristiques d'un itinéraire client, créer un récapitulatif d'un itinéraire client et échanger des informations sur l'itinéraire, telles que les mises à jour du trafic, avec le serveur et d'autres clients.

Le service est défini dans un fichier Protocol Buffers, qui sera utilisé pour générer du code passe-partout pour le client et le serveur afin qu'ils puissent communiquer entre eux. Vous gagnerez ainsi du temps et des efforts pour implémenter cette fonctionnalité.

Ce code généré gère non seulement les complexités de la communication entre le serveur et le client, mais aussi la sérialisation et la désérialisation des données.

Points abordés

  • Utiliser Protocol Buffers pour définir une API de service.
  • Comment créer un client et un serveur basés sur gRPC à partir d'une définition Protocol Buffers à l'aide de la génération de code automatisée.
  • Comprendre la communication de streaming client-serveur avec gRPC.

Cet atelier de programmation s'adresse aux développeurs Java qui découvrent gRPC ou qui souhaitent se rafraîchir la mémoire sur gRPC, ou à toute personne intéressée par la création de systèmes distribués. Aucune expérience préalable avec gRPC n'est requise.

2. Avant de commencer

Prérequis

  • Version 24 du JDK.

Obtenir le code

Pour que vous n'ayez pas à partir de zéro, cet atelier de programmation fournit un échafaudage du code source de l'application que vous devez compléter. Les étapes suivantes vous montreront comment finaliser l'application, y compris en utilisant les plug-ins du compilateur de tampon de protocole pour générer le code gRPC standard.

Commencez par créer le répertoire de travail de l'atelier de programmation et accédez-y :

mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started

Téléchargez et extrayez l'atelier de programmation :

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-java-streaming/start_here

Vous pouvez également télécharger le fichier .zip contenant uniquement le répertoire de l'atelier de programmation et le décompresser manuellement.

Le code source complet est disponible sur GitHub si vous ne souhaitez pas saisir d'implémentation.

3. Définir les messages et les services

La première étape consiste à définir le service gRPC de l'application, sa méthode RPC, ainsi que ses types de messages de requête et de réponse à l'aide de Protocol Buffers. Votre service fournira :

  • Méthodes RPC appelées ListFeatures, RecordRoute et RouteChat que le serveur implémente et que le client appelle.
  • Les types de messages Point, Feature, Rectangle, RouteNote et RouteSummary, qui sont des structures de données échangées entre le client et le serveur lors de l'appel des méthodes ci-dessus.

Les tampons de protocole sont communément appelés "protobufs". Pour en savoir plus sur la terminologie gRPC, consultez Concepts fondamentaux, architecture et cycle de vie de gRPC.

Cette méthode RPC et ses types de messages seront tous définis dans le fichier proto/routeguide/route_guide.proto du code source fourni.

Commençons par créer un fichier route_guide.proto.

Comme nous générons du code Java dans cet exemple, nous avons spécifié une option de fichier java_package dans notre .proto :

option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";

Définir les types de messages

Dans le fichier proto/routeguide/route_guide.proto du code source, définissez d'abord le type de message Point. Un Point représente une paire de coordonnées (latitude et longitude) sur une carte. Pour cet atelier de programmation, utilisez des nombres entiers pour les coordonnées :

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

Les nombres 1 et 2 sont des ID uniques pour chacun des champs de la structure message.

Ensuite, définissez le type de message Feature. Un Feature utilise un champ string pour le nom ou l'adresse postale d'un élément à un emplacement spécifié par un Point :

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

Pour que plusieurs points d'une zone puissent être diffusés en streaming vers un client, vous avez besoin d'un message Rectangle qui représente un rectangle de latitude et de longitude, représenté par deux points diagonalement opposés lo et hi :

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

Un message RouteNote qui représente un message envoyé à un moment donné :

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

Enfin, vous aurez besoin d'un message RouteSummary. Ce message est reçu en réponse à un RPC RecordRoute, qui est expliqué dans la section suivante. Il contient le nombre de points individuels reçus, le nombre de caractéristiques détectées et la distance totale parcourue, qui correspond à la somme cumulée de la distance entre chaque point.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

Définir les méthodes de service

Pour définir un service, vous devez spécifier un service nommé dans votre fichier .proto. Le fichier route_guide.proto possède une structure service nommée RouteGuide qui définit une ou plusieurs méthodes fournies par le service de l'application.

Lorsque vous définissez des méthodes RPC dans la définition de votre service, vous spécifiez leurs types de requête et de réponse. Dans cette section de l'atelier de programmation, définissons les éléments suivants :

ListFeatures

Obtient les objets Feature disponibles dans le Rectangle donné. Les résultats sont diffusés en flux continu plutôt que renvoyés en une seule fois, car le rectangle peut couvrir une grande zone et contenir un grand nombre d'entités.

Pour cette application, vous utiliserez un RPC de streaming côté serveur : le client envoie une requête au serveur et obtient un flux pour lire une séquence de messages en retour. Le client lit le flux renvoyé jusqu'à ce qu'il n'y ait plus de messages. Comme vous pouvez le voir dans notre exemple, vous spécifiez une méthode de streaming côté serveur en plaçant le mot clé "stream" avant le type de réponse.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

Accepte un flux de points sur un itinéraire parcouru et renvoie un RouteSummary lorsque le parcours est terminé.

Un RPC de streaming côté client est approprié dans ce cas : le client écrit une séquence de messages et les envoie au serveur, à nouveau à l'aide d'un flux fourni. Une fois que le client a terminé d'écrire les messages, il attend que le serveur les lise tous et renvoie sa réponse. Pour spécifier une méthode de streaming côté client, placez le mot clé "stream" avant le type de requête.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

Accepte un flux de RouteNotes envoyé lors du parcours d'un itinéraire, tout en recevant d'autres RouteNotes (par exemple, d'autres utilisateurs).

C'est exactement le cas d'utilisation du streaming bidirectionnel. Un RPC de streaming bidirectionnel où les deux côtés envoient une séquence de messages à l'aide d'un flux en lecture-écriture. Les deux flux fonctionnent indépendamment. Les clients et les serveurs peuvent donc lire et écrire dans l'ordre de leur choix. Par exemple, le serveur peut attendre de recevoir tous les messages du client avant d'écrire ses réponses, ou il peut lire un message puis en écrire un, ou encore effectuer une autre combinaison de lectures et d'écritures. L'ordre des messages dans chaque flux est conservé. Pour spécifier ce type de méthode, placez le mot clé "stream" avant la requête et la réponse.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. Générer le code client et serveur

Ensuite, nous devons générer les interfaces client et serveur gRPC à partir de notre définition de service .proto. Pour ce faire, nous utilisons le compilateur de tampon de protocole protoc avec un plug-in gRPC Java spécial. Vous devez utiliser le compilateur proto3 (qui est compatible avec les syntaxes proto2 et proto3) pour générer des services gRPC.

Lorsque vous utilisez Gradle ou Maven, le plug-in de compilation protoc peut générer le code nécessaire lors de la compilation. Pour savoir comment générer du code à partir de vos propres fichiers .proto, consultez le fichier README de grpc-java.

Nous avons fourni la configuration Gradle.

Dans le répertoire streaming-grpc-java-getting-started, saisissez

$ chmod +x gradlew
$ ./gradlew generateProto

Les classes suivantes sont générées à partir de notre définition de service (sous build/generated/sources/proto/main/java) :

  • Un pour chaque type de message : Feature.java, Rectangle.java, ..., qui contiennent tout le code du tampon de protocole pour remplir, sérialiser et récupérer nos types de messages de requête et de réponse.
  • RouteGuideGrpc.java, qui contient (avec d'autres codes utiles) une classe de base pour l'implémentation des serveurs RouteGuide, RouteGuideGrpc.RouteGuideImplBase, avec toutes les méthodes définies dans les classes de service et de bouchon RouteGuide que les clients peuvent utiliser

5. Implémenter le service

Commençons par examiner comment créer un serveur RouteGuide. Pour que notre service RouteGuide fonctionne correctement, deux éléments sont nécessaires :

  • Implémenter l'interface de service générée à partir de notre définition de service : effectuer le "travail" réel de notre service.
  • Exécuter un serveur gRPC pour écouter les requêtes des clients et les distribuer à l'implémentation de service appropriée.

Implémenter RouteGuide

Nous allons implémenter une classe RouteGuideService qui étendra la classe RouteGuideGrpc.RouteGuideImplBase générée. Voici à quoi ressemblerait l'implémentation.

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
        ...
}

public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {

        ...
}

public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {

        ...
}

Examinons en détail chaque implémentation RPC.

RPC de streaming côté serveur

Examinons maintenant l'un de nos RPC de streaming. ListFeatures est un RPC de streaming côté serveur. Nous devons donc renvoyer plusieurs Features à notre client.

private final Collection<Feature> features;

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

Comme le RPC simple, cette méthode obtient un objet de requête (le Rectangle dans lequel notre client souhaite trouver Features) et un observateur de réponse StreamObserver.

Cette fois, nous obtenons autant d'objets Feature que nécessaire pour renvoyer au client (dans ce cas, nous les sélectionnons à partir de la collection de fonctionnalités du service en fonction de leur présence dans notre Rectangle de requête), et nous les écrivons chacun à leur tour dans l'observateur de réponse à l'aide de sa méthode onNext(). Enfin, comme dans notre RPC simple, nous utilisons la méthode onCompleted() de l'observateur de réponse pour indiquer à gRPC que nous avons terminé d'écrire les réponses.

RPC de streaming côté client

Examinons maintenant quelque chose d'un peu plus compliqué : la méthode de streaming côté client RecordRoute(), où nous obtenons un flux de Points du client et renvoyons un seul RouteSummary avec des informations sur son trajet.

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

Comme vous pouvez le voir, à l'instar des types de méthodes précédents, notre méthode obtient un paramètre StreamObserver responseObserver, mais cette fois, elle renvoie un StreamObserver pour que le client puisse écrire son Points.

Dans le corps de la méthode, nous instancions un StreamObserver anonyme à renvoyer, dans lequel nous :

  • Remplacez la méthode onNext() pour obtenir des fonctionnalités et d'autres informations chaque fois que le client écrit un Point dans le flux de messages.
  • Remplacez la méthode onCompleted() (appelée lorsque le client a fini d'écrire des messages) pour remplir et créer notre RouteSummary. Nous appelons ensuite la méthode onNext() de l'observateur de réponse de notre méthode avec notre RouteSummary, puis nous appelons sa méthode onCompleted() pour terminer l'appel côté serveur.

RPC de streaming bidirectionnel

Enfin, examinons notre RPC de streaming bidirectionnel RouteChat().

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

Comme dans notre exemple de streaming côté client, nous obtenons et renvoyons un StreamObserver, sauf que cette fois, nous renvoyons des valeurs via l'observateur de réponse de notre méthode pendant que le client écrit toujours des messages dans son flux de messages. La syntaxe de lecture et d'écriture est exactement la même que pour nos méthodes de streaming client et serveur. Bien que chaque côté reçoive toujours les messages de l'autre dans l'ordre dans lequel ils ont été écrits, le client et le serveur peuvent lire et écrire dans n'importe quel ordre. Les flux fonctionnent de manière totalement indépendante.

Démarrer le serveur

Une fois que nous avons implémenté toutes nos méthodes, nous devons également démarrer un serveur gRPC pour que les clients puissent réellement utiliser notre service. L'extrait suivant montre comment nous procédons pour notre service RouteGuide :

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
}

Comme vous pouvez le voir, nous créons et démarrons notre serveur à l'aide d'un ServerBuilder.

Pour ce faire, nous adoptons la méthode suivante :

  1. Spécifiez l'adresse et le port que nous souhaitons utiliser pour écouter les requêtes du client à l'aide de la méthode forPort() du générateur.
  2. Créez une instance de notre classe d'implémentation de service RouteGuideService et transmettez-la à la méthode addService() du générateur.
  3. Appelez build() et start() sur le générateur pour créer et démarrer un serveur RPC pour notre service.

Étant donné que ServerBuilder intègre déjà le port, la seule raison pour laquelle nous transmettons un port est de l'utiliser pour la journalisation.

6. Créer le client

Dans cette section, nous allons créer un client pour notre service RouteGuide. Vous pouvez consulter l'intégralité de notre exemple de code client dans ../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java.

Instancier un stub

Pour appeler des méthodes de service, nous devons d'abord créer un stub, ou plutôt deux stubs :

  • Un stub bloquant/synchrone : cela signifie que l'appel RPC attend la réponse du serveur et renvoie une réponse ou génère une exception.
  • un stub non bloquant/asynchrone qui effectue des appels non bloquants au serveur, où la réponse est renvoyée de manière asynchrone. Vous ne pouvez effectuer certains types d'appels de streaming qu'en utilisant un stub asynchrone.

Nous devons d'abord créer un canal gRPC pour notre stub, en spécifiant l'adresse et le port du serveur auquel nous voulons nous connecter :

  public static void main(String[] args) throws InterruptedException {
    String target = "localhost:8980";
    if (args.length > 0) {
      if ("--help".equals(args[0])) {
        System.err.println("Usage: [target]");
        System.err.println("");
        System.err.println("  target  The server to connect to. Defaults to " + target);
        System.exit(1);
      }
      target = args[0];
    }

    List<Feature> features;
    try {
      features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
        .build();
    try {
      RouteGuideClient client = new RouteGuideClient(channel);

      // Looking for features between 40, -75 and 42, -73.
      client.listFeatures(400000000, -750000000, 420000000, -730000000);

      // Record a few randomly selected points from the features file.
      client.recordRoute(features, 10);

      // Send and receive some notes.
      CountDownLatch finishLatch = client.routeChat();

      if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        client.warning("routeChat did not finish within 1 minutes");
      }
    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }

Nous utilisons un ManagedChannelBuilder pour créer le canal.

Nous pouvons maintenant utiliser le canal pour créer nos stubs à l'aide des méthodes newStub et newBlockingStub fournies dans la classe RouteGuideGrpc que nous avons générée à partir de notre .proto.

public RouteGuideClient(Channel channel) {
    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
  }

N'oubliez pas que si ce n'est pas bloquant, c'est asynchrone.

Appeler les méthodes de service

Voyons maintenant comment appeler les méthodes de service. Notez que tous les RPC créés à partir du stub de blocage fonctionneront en mode blocage/synchrone, ce qui signifie que l'appel RPC attend la réponse du serveur et renvoie une réponse ou une erreur.

RPC de streaming côté serveur

Examinons ensuite un appel de streaming côté serveur à ListFeatures, qui renvoie un flux de Feature géographiques :

Rectangle request = Rectangle.newBuilder()
             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

Comme vous pouvez le voir, il est très semblable au simple RPC unaire que nous avons examiné dans l'atelier de programmation Getting_Started_With_gRPC_Java, sauf qu'au lieu de renvoyer un seul Feature, la méthode renvoie un Iterator que le client peut utiliser pour lire tous les Features renvoyés.

RPC de streaming côté client

Passons maintenant à quelque chose d'un peu plus compliqué : la méthode de streaming côté client RecordRoute, où nous envoyons un flux de Points au serveur et recevons un seul RouteSummary en retour. Pour cette méthode, nous devons utiliser le stub asynchrone. Si vous avez déjà lu Créer le serveur, certaines parties de ce document vous sembleront très familières. En effet, les RPC de streaming asynchrone sont implémentés de manière similaire des deux côtés.

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {

    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

Comme vous pouvez le voir, pour appeler cette méthode, nous devons créer un StreamObserver qui implémente une interface spéciale pour que le serveur puisse l'appeler avec sa réponse RouteSummary. Dans notre StreamObserver, nous allons :

  • Remplacez la méthode onNext() pour imprimer les informations renvoyées lorsque le serveur écrit un RouteSummary dans le flux de messages.
  • Remplacez la méthode onCompleted() (appelée lorsque le serveur a terminé l'appel de son côté) pour réduire un CountDownLatch afin que nous puissions vérifier si le serveur a terminé l'écriture.

Nous transmettons ensuite le StreamObserver à la méthode recordRoute() du stub asynchrone et récupérons notre propre observateur de requête StreamObserver pour écrire notre Points à envoyer au serveur. Une fois que nous avons fini d'écrire des points, nous utilisons la méthode onCompleted() de l'observateur de requête pour indiquer à gRPC que nous avons fini d'écrire côté client. Une fois l'opération terminée, nous vérifions notre CountDownLatch pour voir si le serveur a terminé de son côté.

RPC de streaming bidirectionnel

Enfin, examinons notre RPC de streaming bidirectionnel RouteChat().

public CountDownLatch routeChat() {
    info("*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat(new StreamObserver<RouteNote>() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          }

          @Override
          public void onError(Throwable t) {
            warning("RouteChat Failed: {0}", Status.fromThrowable(t));
            finishLatch.countDown();
          }

          @Override
          public void onCompleted() {
            info("Finished RouteChat");
            finishLatch.countDown();
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
              newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // return the latch while receiving happens asynchronously
    return finishLatch;
  }

Comme dans notre exemple de streaming côté client, nous obtenons et renvoyons un observateur de réponse StreamObserver, sauf que cette fois, nous envoyons des valeurs via l'observateur de réponse de notre méthode pendant que le serveur écrit toujours des messages dans son flux de messages. La syntaxe de lecture et d'écriture est exactement la même que pour notre méthode de streaming client. Bien que chaque côté reçoive toujours les messages de l'autre dans l'ordre dans lequel ils ont été écrits, le client et le serveur peuvent lire et écrire dans n'importe quel ordre. Les flux fonctionnent de manière totalement indépendante.

7. Essayez !

  1. À partir du répertoire start_here :
$ ./gradlew installDist

Cela compilera votre code, l'empaquettera dans un fichier JAR et créera les scripts qui exécutent l'exemple. Ils seront créés dans le répertoire build/install/start_here/bin/. Les scripts sont les suivants : route-guide-server et route-guide-client.

Le serveur doit être en cours d'exécution avant le démarrage du client.

  1. Exécutez le serveur :
$ ./build/install/start_here/bin/route-guide-server
  1. Exécutez le client :
$ ./build/install/start_here/bin/route-guide-client

8. Étape suivante