gRPC-Java 入门 - 流式传输

1. 简介

在此 Codelab 中,您将使用 gRPC-Java 创建一个客户端和一个服务器,它们将构成一个用 Java 编写的路线映射应用的基础。

在本教程结束时,您将拥有一个使用 gRPC 连接到远程服务器的客户端,该客户端可以获取有关客户端路线上的功能的信息、创建客户端路线的摘要,并与服务器和其他客户端交换路线信息(例如流量更新)。

该服务在 Protocol Buffers 文件中定义,该文件将用于为客户端和服务器生成样板代码,以便它们能够相互通信,从而节省您实现该功能的时间和精力。

生成的代码不仅能处理服务器与客户端之间复杂的通信,还能处理数据序列化和反序列化。

学习内容

  • 如何使用 Protocol Buffers 定义服务 API。
  • 如何使用自动代码生成功能基于 Protocol Buffers 定义构建基于 gRPC 的客户端和服务器。
  • 了解使用 gRPC 进行的客户端-服务器流式传输通信。

此 Codelab 适合刚开始接触 gRPC 或希望复习 gRPC 的 Java 开发者,也适合任何对构建分布式系统感兴趣的人员。无需具备 gRPC 经验。

2. 准备工作

前提条件

  • JDK 版本 24。

获取代码

为了避免您完全从头开始,本 Codelab 提供了一个应用源代码框架供您完成。以下步骤将展示如何完成应用,包括使用 Protocol Buffer 编译器插件生成样板 gRPC 代码。

首先,创建 Codelab 工作目录并进入该目录:

mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started

下载并解压缩 Codelab:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-java-streaming/start_here

或者,您也可以下载仅包含 Codelab 目录的 .zip 文件,然后手动将其解压缩。

如果您想跳过输入实现代码的步骤,可以在 GitHub 上找到完整的源代码

3. 定义消息和服务

第一步是使用协议缓冲区定义应用的 gRPC 服务、RPC 方法以及请求和响应消息类型。您的服务将提供:

  • 服务器实现并由客户端调用的 RPC 方法 ListFeaturesRecordRouteRouteChat
  • 消息类型 PointFeatureRectangleRouteNoteRouteSummary,它们是调用上述方法时在客户端和服务器之间交换的数据结构。

协议缓冲区通常称为 protobuf。如需详细了解 gRPC 术语,请参阅 gRPC 的核心概念、架构和生命周期

此 RPC 方法及其消息类型都将在所提供源代码的 proto/routeguide/route_guide.proto 文件中定义。

我们来创建一个 route_guide.proto 文件。

由于我们在此示例中生成的是 Java 代码,因此我们在 .proto 中指定了 java_package 文件选项:

option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";

定义消息类型

在源代码的 proto/routeguide/route_guide.proto 文件中,首先定义 Point 消息类型。Point 表示地图上的纬度-经度坐标对。在此 Codelab 中,请使用整数作为坐标:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

数字 12message 结构中每个字段的唯一 ID 编号。

接下来,定义 Feature 消息类型。Feature 使用 string 字段来表示由 Point 指定的位置处的某个事物的名称或邮政地址:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

为了将某个区域内的多个点流式传输到客户端,您需要一条 Rectangle 消息,该消息表示一个经纬度矩形,以两个对角点 lohi 表示:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

此外,还包括表示在给定时间点发送的消息的 RouteNote 消息:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

最后,您需要收到 RouteSummary 消息。此消息是针对 RecordRoute RPC 收到的响应,下一部分将对此进行说明。它包含收到的各个点的数量、检测到的特征数量,以及作为每个点之间距离的累积总和的总覆盖距离。

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

定义服务方法

如需定义服务,请在 .proto 文件中指定命名服务。route_guide.proto 文件具有名为 RouteGuideservice 结构,用于定义应用服务提供的一个或多个方法。

在服务定义中定义 RPC 方法时,您需要指定其请求和响应类型。在此 Codelab 部分中,我们来定义以下内容:

ListFeatures

获取给定 Rectangle 内可用的 Feature 对象。由于矩形可能覆盖大面积区域并包含大量要素,因此结果会以流式传输,而不是一次性返回。

对于此应用,您将使用服务器端流式传输 RPC:客户端向服务器发送请求,并获取一个数据流来读取一系列返回的消息。客户端会从返回的数据流中读取数据,直到没有更多消息为止。如示例所示,您可以通过在响应类型之前放置 stream 关键字来指定服务器端流式传输方法。

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

接受正在遍历的路线上的点流,并在遍历完成后返回 RouteSummary

在这种情况下,客户端流式传输 RPC 非常适合:客户端写入一系列消息并使用提供的数据流将其发送到服务器。客户端完成消息写入后,会等待服务器读取所有消息并返回其响应。您可以通过在请求类型之前放置 stream 关键字来指定客户端流式传输方法。

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

接受在遍历路线时发送的 RouteNotes 流,同时接收其他 RouteNotes(例如来自其他用户的 RouteNotes)。

这正是双向流式传输的用例。一种双向流式传输 RPC,其中双方都使用读写数据流发送一系列消息。这两个数据流独立运行,因此客户端和服务器可以按任意顺序读取和写入数据:例如,服务器可以等待接收所有客户端消息,然后再写入响应;也可以交替读取消息和写入消息;或者以其他方式组合读取和写入操作。每个数据流中的消息顺序都会保留。您可以通过在请求和响应之前放置 stream 关键字来指定此类方法。

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. 生成客户端和服务器代码

接下来,我们需要根据 .proto 服务定义生成 gRPC 客户端和服务器接口。我们使用协议缓冲区编译器 protoc 和一个特殊的 gRPC Java 插件来完成此操作。您需要使用 proto3 编译器(同时支持 proto2 和 proto3 语法)才能生成 gRPC 服务。

使用 Gradle 或 Maven 时,protoc 构建插件可以在构建过程中生成必要的代码。您可以参阅 grpc-java README,了解如何从您自己的 .proto 文件生成代码。

我们已提供 Gradle 配置。

streaming-grpc-java-getting-started 目录中输入

$ chmod +x gradlew
$ ./gradlew generateProto

以下类是从我们的服务定义(位于 build/generated/sources/proto/main/java 下)生成的:

  • 每个消息类型一个:Feature.javaRectangle.java, ...,其中包含用于填充、序列化和检索请求和响应消息类型的所有协议缓冲区代码。
  • RouteGuideGrpc.java,其中包含(以及一些其他有用的代码)供 RouteGuide 服务器实现的基类 RouteGuideGrpc.RouteGuideImplBase,以及 RouteGuide 服务和存根类中定义的所有方法,供客户端使用

5. 实现服务

首先,我们来看看如何创建 RouteGuide 服务器。要让 RouteGuide 服务正常运行,需要完成以下两个部分:

  • 实现根据服务定义生成的服务接口:执行服务的实际“工作”。
  • 运行 gRPC 服务器以监听来自客户端的请求,并将这些请求分派给正确的服务实现。

实现 RouteGuide

我们将实现一个 RouteGuideService 类,该类将扩展生成的 RouteGuideGrpc.RouteGuideImplBase 类。实现方式如下所示。

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
        ...
}

public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {

        ...
}

public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {

        ...
}

让我们详细了解一下每种 RPC 实现

服务器端流式处理 RPC

接下来,我们来看一个流式 RPC。ListFeatures 是服务器端流式传输 RPC,因此我们需要向客户端发送多个 Features

private final Collection<Feature> features;

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

与简单 RPC 类似,此方法会获取一个请求对象(客户端要在其中查找 FeaturesRectangle)和一个 StreamObserver 响应观察器。

这次,我们获取需要返回给客户端的尽可能多的 Feature 对象(在本例中,我们根据这些对象是否位于请求 Rectangle 中,从服务的要素集合中选择它们),并使用响应观察器的 onNext() 方法将它们依次写入响应观察器。最后,与简单 RPC 中一样,我们使用响应观察器的 onCompleted() 方法告知 gRPC 我们已完成写入响应。

客户端流式 RPC

现在,我们来看一个稍微复杂一点的示例:客户端流式传输方法 RecordRoute(),其中我们从客户端获取 Points 的流,并返回包含客户行程信息的单个 RouteSummary

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

如您所见,与之前的方法类型一样,我们的方法会获取 StreamObserver responseObserver 参数,但这次它会返回一个 StreamObserver,供客户端写入其 Points

在方法正文中,我们实例化了一个匿名 StreamObserver 以返回,其中:

  • 替换 onNext() 方法,以便在每次客户端向消息流写入 Point 时获取特征和其他信息。
  • 替换 onCompleted() 方法(在 客户端完成消息写入时调用),以填充和构建 RouteSummary。然后,我们使用 RouteSummary 调用我们方法自己的响应观察器的 onNext(),然后调用其 onCompleted() 方法以完成来自服务器端的调用。

双向流式传输 RPC

最后,我们来看看双向流式 RPC RouteChat()

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

与客户端流式传输示例一样,我们既会获取 StreamObserver,也会返回 StreamObserver,只不过这次我们是通过方法的响应观察器返回值的,而客户端仍在向消息流写入消息。此处读取和写入的语法与客户端流式传输方法和服务器流式传输方法完全相同。虽然每一方始终会按写入顺序接收对方的消息,但客户端和服务器都可以按任意顺序读取和写入,因为数据流是完全独立运行的。

启动服务器

实现所有方法后,我们还需要启动 gRPC 服务器,以便客户端能够实际使用我们的服务。以下代码段展示了我们如何针对 RouteGuide 服务执行此操作:

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
}

如您所见,我们使用 ServerBuilder 构建并启动服务器。

为了实现该目标,我们采取了以下措施:

  1. 使用构建器的 forPort() 方法指定我们想要用于监听客户端请求的地址和端口。
  2. 创建服务实现类 RouteGuideService 的实例,并将其传递给构建器的 addService() 方法。
  3. 在构建器上调用 build()start(),以创建并启动服务的 RPC 服务器。

由于 ServerBuilder 已包含端口,因此我们传递端口的唯一原因是将其用于日志记录。

6. 创建客户端

在本部分中,我们将了解如何为 RouteGuide 服务创建客户端。您可以在 ../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java 中查看完整的客户端代码示例。

实例化存根

如需调用服务方法,我们首先需要创建 stub,或者更确切地说,是两个 stub:

  • 阻塞/同步桩:这意味着 RPC 调用会等待服务器响应,并返回响应或引发异常。
  • 一个非阻塞/异步桩,用于向服务器发出非阻塞调用,其中响应以异步方式返回。您只能通过使用异步 stub 来进行某些类型的流式调用。

首先,我们需要为桩创建 gRPC 通道,并指定要连接的服务器地址和端口:

  public static void main(String[] args) throws InterruptedException {
    String target = "localhost:8980";
    if (args.length > 0) {
      if ("--help".equals(args[0])) {
        System.err.println("Usage: [target]");
        System.err.println("");
        System.err.println("  target  The server to connect to. Defaults to " + target);
        System.exit(1);
      }
      target = args[0];
    }

    List<Feature> features;
    try {
      features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
        .build();
    try {
      RouteGuideClient client = new RouteGuideClient(channel);

      // Looking for features between 40, -75 and 42, -73.
      client.listFeatures(400000000, -750000000, 420000000, -730000000);

      // Record a few randomly selected points from the features file.
      client.recordRoute(features, 10);

      // Send and receive some notes.
      CountDownLatch finishLatch = client.routeChat();

      if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        client.warning("routeChat did not finish within 1 minutes");
      }
    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }

我们使用 ManagedChannelBuilder 创建频道。

现在,我们可以使用从 .proto 生成的 RouteGuideGrpc 类中提供的 newStubnewBlockingStub 方法,通过该渠道创建桩。

public RouteGuideClient(Channel channel) {
    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
  }

请注意,如果不是阻塞,就是异步

调用服务方法

现在,我们来看看如何调用服务方法。请注意,从阻塞 stub 创建的任何 RPC 都将以阻塞/同步模式运行,这意味着 RPC 调用会等待服务器响应,并返回响应或错误。

服务器端流式处理 RPC

接下来,我们来看一下对 ListFeatures 的服务器端流式调用,该调用会返回地理位置 Feature 的数据流:

Rectangle request = Rectangle.newBuilder()
             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

如您所见,它与我们在 Getting_Started_With_gRPC_Java Codelab 中看到的简单一元 RPC 非常相似,只不过该方法不是返回单个 Feature,而是返回一个 Iterator,客户端可以使用该 Iterator 读取所有返回的 Features

客户端流式 RPC

现在,我们来看一个稍微复杂一点的示例:客户端流式传输方法 RecordRoute,其中我们将 Points 的流发送到服务器,并返回单个 RouteSummary。对于此方法,我们需要使用异步桩。如果您已阅读创建服务器,那么您可能会觉得其中一些内容非常熟悉 - 异步流式 RPC 在客户端和服务器端以类似的方式实现。

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {

    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

如您所见,为了调用此方法,我们需要创建一个 StreamObserver,该 StreamObserver 实现了一个特殊接口,供服务器通过其 RouteSummary 响应进行调用。在我们的 StreamObserver 中,我们:

  • 重写 onNext() 方法,以便在服务器将 RouteSummary 写入消息流时打印返回的信息。
  • 替换 onCompleted() 方法(当服务器已完成其侧的调用时调用),以减少 CountDownLatch,以便我们可以检查服务器是否已完成写入。

然后,我们将 StreamObserver 传递给异步 stub 的 recordRoute() 方法,并获取我们自己的 StreamObserver 请求观察器,以写入要发送到服务器的 Points。写完点后,我们使用请求观察器的 onCompleted() 方法告知 gRPC 我们已在客户端完成写入。完成后,我们检查 CountDownLatch,看看服务器是否已完成其操作。

双向流式传输 RPC

最后,我们来看看双向流式 RPC RouteChat()

public CountDownLatch routeChat() {
    info("*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat(new StreamObserver<RouteNote>() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          }

          @Override
          public void onError(Throwable t) {
            warning("RouteChat Failed: {0}", Status.fromThrowable(t));
            finishLatch.countDown();
          }

          @Override
          public void onCompleted() {
            info("Finished RouteChat");
            finishLatch.countDown();
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
              newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // return the latch while receiving happens asynchronously
    return finishLatch;
  }

与客户端流式传输示例一样,我们同时获取并返回 StreamObserver 响应观察器,只不过这次我们在服务器仍在向消息流写入消息时,通过方法的响应观察器发送值。此处读取和写入的语法与客户端流式传输方法的语法完全相同。虽然每一方始终会按写入顺序接收对方的消息,但客户端和服务器都可以按任意顺序读取和写入,因为数据流是完全独立运行的。

7. 试试看!

  1. start_here 目录中:
$ ./gradlew installDist

此命令将编译您的代码,将其打包到 JAR 中,并创建运行示例的脚本。它们将在 build/install/start_here/bin/ 目录中创建。脚本为:route-guide-serverroute-guide-client

在启动客户端之前,服务器需要处于运行状态。

  1. 运行服务器:
$ ./build/install/start_here/bin/route-guide-server
  1. 运行客户端:
$ ./build/install/start_here/bin/route-guide-client

8. 后续步骤