開始使用 gRPC-Java - 串流

1. 簡介

在本程式碼研究室中,您將使用 gRPC-Java 建立用戶端和伺服器,做為以 Java 編寫的路線對應應用程式基礎。

完成本教學課程後,您將擁有一個用戶端,可使用 gRPC 連線至遠端伺服器,取得用戶端路線上的功能資訊、建立用戶端路線摘要,以及與伺服器和其他用戶端交換路線資訊 (例如交通資訊更新)。

服務定義於 Protocol Buffers 檔案中,用於產生用戶端和伺服器的樣板程式碼,以便彼此通訊,節省您實作該功能的時間和精力。

這段產生的程式碼不僅會處理伺服器與用戶端之間複雜的通訊,也會處理資料序列化和還原序列化。

課程內容

  • 如何使用通訊協定緩衝區定義服務 API。
  • 如何使用自動程式碼生成功能,從通訊協定緩衝區定義建構以 gRPC 為基礎的用戶端和伺服器。
  • 瞭解如何使用 gRPC 進行用戶端與伺服器之間的串流通訊。

本程式碼研究室適用於剛接觸 gRPC 或想複習 gRPC 的 Java 開發人員,以及有興趣建構分散式系統的任何人。不需要有 gRPC 相關經驗。

2. 事前準備

必要條件

  • JDK 版本 24。

取得程式碼

為避免您必須從頭開始,本程式碼研究室提供應用程式原始碼的架構,供您完成。下列步驟將說明如何完成應用程式,包括使用通訊協定緩衝區編譯器外掛程式產生樣板 gRPC 程式碼。

首先,請建立程式碼研究室工作目錄,然後 cd 到該目錄:

mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started

下載並擷取程式碼研究室:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-java-streaming/start_here

或者,您也可以下載只包含 Codelab 目錄的 .zip 檔案,然後手動解壓縮。

如要略過實作的輸入作業,可以前往 GitHub 取得完整的原始碼

3. 定義訊息和服務

首先,請使用通訊協定緩衝區定義應用程式的 gRPC 服務、RPC 方法,以及要求和回應訊息類型。你的服務將提供:

  • 伺服器實作且用戶端呼叫的 RPC 方法 (ListFeaturesRecordRouteRouteChat)。
  • 訊息類型 PointFeatureRectangleRouteNoteRouteSummary,這些是呼叫上述方法時,用戶端和伺服器之間交換的資料結構。

通訊協定緩衝區通常稱為 protobuf。如要進一步瞭解 gRPC 術語,請參閱 gRPC 的「核心概念、架構和生命週期」。

這個 RPC 方法及其訊息型別都會在提供的原始碼 proto/routeguide/route_guide.proto 檔案中定義。

讓我們建立 route_guide.proto 檔案。

由於我們要在這個範例中產生 Java 程式碼,因此已在 .proto 中指定 java_package 檔案選項:

option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";

定義訊息類型

在原始碼的 proto/routeguide/route_guide.proto 檔案中,請先定義 Point 訊息型別。Point 代表地圖上的經緯度座標組合。在本程式碼研究室中,請使用整數做為座標:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

數字 12message 結構中每個欄位的專屬 ID 編號。

接著,定義 Feature 訊息類型。Feature 會使用 string 欄位,指定 Point 所指定位置的名稱或郵寄地址:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

如要將區域內的多個點串流至用戶端,您需要 Rectangle 訊息,代表以兩個對角點 lohi 表示的經緯度矩形:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

此外,RouteNote 訊息代表在特定時間點傳送的訊息:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

最後,您需要 RouteSummary 訊息。這則訊息是針對 RecordRoute RPC 收到的回應,下一節將說明這項 RPC。其中包含收到的個別點數、偵測到的特徵數量,以及每個點之間距離的累計總和,也就是涵蓋的總距離。

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

定義服務方法

如要定義服務,請在 .proto 檔案中指定具名服務。route_guide.proto 檔案具有名為 RouteGuideservice 結構,可定義應用程式服務提供的一或多個方法。

在服務定義中定義 RPC 方法時,請指定要求和回應類型。在本程式碼研究室的這一節中,我們將定義:

ListFeatures

取得指定 Rectangle 中可用的 Feature 物件。由於矩形可能涵蓋大範圍區域,並包含大量特徵,因此系統會串流傳回結果,而非一次傳回。

在這個應用程式中,您將使用伺服器端串流 RPC:用戶端會將要求傳送至伺服器,並取得串流來讀取一系列訊息。用戶端會從傳回的串流讀取訊息,直到沒有更多訊息為止。如範例所示,您可以在回應型別前加上 stream 關鍵字,指定伺服器端串流方法。

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

接受所遍歷路徑上的 Points 串流,並在遍歷完成時傳回 RouteSummary

在這種情況下,用戶端串流 RPC 是合適的選擇:用戶端會寫入一連串訊息並傳送至伺服器,同樣是使用提供的串流。用戶端寫完訊息後,會等待伺服器讀取所有訊息並傳回回應。如要指定用戶端串流方法,請在要求類型前放置串流關鍵字。

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

接受在路線遍歷期間傳送的 RouteNotes 串流,同時接收其他 RouteNotes (例如來自其他使用者)。

這正是雙向串流的適用情境。雙向串流 RPC,雙方都會使用讀寫串流傳送一連串訊息。這兩個串流會獨立運作,因此用戶端和伺服器可以任意順序讀取和寫入資料。舉例來說,伺服器可以等待接收所有用戶端訊息,再寫入回應;也可以交替讀取和寫入訊息;或是以其他方式組合讀取和寫入作業。每個串流中的訊息順序都會保留。如要指定這類方法,請在要求和回應前加上串流關鍵字。

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. 產生用戶端和伺服器程式碼

接下來,我們需要從 .proto 服務定義產生 gRPC 用戶端和伺服器介面。我們會使用通訊協定緩衝區編譯器 protoc 和特殊的 gRPC Java 外掛程式來完成這項作業。您必須使用 proto3 編譯器 (同時支援 proto2 和 proto3 語法),才能產生 gRPC 服務。

使用 Gradle 或 Maven 時,protoc 建構外掛程式會在建構過程中產生必要程式碼。如要瞭解如何從自己的 .proto 檔案產生程式碼,請參閱 grpc-java README

我們已提供 Gradle 設定。

streaming-grpc-java-getting-started 目錄輸入

$ chmod +x gradlew
$ ./gradlew generateProto

下列類別是從服務定義 (位於 build/generated/sources/proto/main/java 下方) 產生:

  • 每個訊息類型各有一個:Feature.javaRectangle.java, ...,其中包含所有通訊協定緩衝區程式碼,可填入、序列化及擷取要求和回應訊息類型。
  • RouteGuideGrpc.java,其中包含 (以及一些其他實用程式碼) 供 RouteGuide 伺服器實作的基礎類別 RouteGuideGrpc.RouteGuideImplBase,以及 RouteGuide 服務中定義的所有方法,以及供用戶端使用的存根類別

5. 實作服務

首先,我們來看看如何建立 RouteGuide 伺服器。如要讓 RouteGuide 服務正常運作,需要完成以下兩個步驟:

  • 實作從服務定義產生的服務介面:執行服務的實際「工作」。
  • 執行 gRPC 伺服器,監聽來自用戶端的要求,並將要求分派至正確的服務實作項目。

實作 RouteGuide

我們將實作 RouteGuideService 類別,擴充產生的 RouteGuideGrpc.RouteGuideImplBase 類別。實作方式如下所示。

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
        ...
}

public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {

        ...
}

public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {

        ...
}

接著詳細瞭解各項 RPC 實作

伺服器端串流 RPC

接著來看其中一個串流 RPC。ListFeatures 是伺服器端串流 RPC,因此我們需要將多個 Features 傳送回用戶端。

private final Collection<Feature> features;

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

與簡單的 RPC 類似,這個方法會取得要求物件 (用戶端要尋找 FeaturesRectangle) 和 StreamObserver 回應觀察器。

這次,我們會取得盡可能多的 Feature 物件,以便傳回給用戶端 (在本例中,我們會根據物件是否位於要求 Rectangle 內,從服務的功能集合中選取物件),並使用回應觀察器的 onNext() 方法,依序將每個物件寫入回應觀察器。最後,如同簡單的 RPC,我們使用回應觀察程式的 onCompleted() 方法,告知 gRPC 我們已完成撰寫回應。

用戶端串流 RPC

現在來看看稍微複雜一點的內容:用戶端串流方法 RecordRoute(),我們會從用戶端取得 Points 串流,並傳回單一 RouteSummary,其中包含行程資訊。

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

如您所見,與先前的方法類型一樣,我們的方法會取得 StreamObserver responseObserver 參數,但這次會傳回 StreamObserver,供用戶端寫入 Points

在方法主體中,我們會例項化要傳回的匿名 StreamObserver,其中:

  • 覆寫 onNext() 方法,在用戶端每次將 Point 寫入訊息串流時,取得功能和其他資訊。
  • 覆寫 onCompleted() 方法 (在用戶端完成訊息寫入時呼叫),以填入及建構 RouteSummary。接著,我們使用 RouteSummary 呼叫方法本身的回應觀察器 onNext(),然後呼叫其 onCompleted() 方法,從伺服器端完成呼叫。

雙向串流 RPC

最後,我們來看看雙向串流 RPC RouteChat()

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

與用戶端串流範例相同,我們都會取得並傳回 StreamObserver,但這次我們是透過方法的回應觀察器傳回值,而用戶端仍在將訊息寫入訊息串流。這裡的讀取和寫入語法與用戶端串流和伺服器串流方法完全相同。雖然雙方一律會依撰寫順序收到對方的訊息,但用戶端和伺服器可以依任意順序讀取及寫入資料,因為資料串是完全獨立運作。

啟動伺服器

實作所有方法後,我們也需要啟動 gRPC 伺服器,讓用戶端實際使用服務。下列程式碼片段顯示我們如何為 RouteGuide 服務執行這項操作:

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
}

如您所見,我們使用 ServerBuilder 建構及啟動伺服器。

為提供這項服務,我們會:

  1. 使用建構工具的 forPort() 方法,指定要用來接聽用戶端要求的位址和連接埠。
  2. 建立服務實作類別 RouteGuideService 的例項,並傳遞至建構工具的 addService() 方法。
  3. 在建構工具上呼叫 build()start(),為服務建立並啟動 RPC 伺服器。

由於 ServerBuilder 已納入通訊埠,我們傳遞通訊埠的唯一原因,是為了將其用於記錄。

6. 建立用戶端

在本節中,我們將說明如何為 RouteGuide 服務建立用戶端。您可以在 ../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java 中查看完整的範例用戶端程式碼。

建立虛設常式

如要呼叫服務方法,我們首先需要建立「虛設常式」,或者說,兩個虛設常式:

  • 封鎖/同步存根:這表示 RPC 呼叫會等待伺服器回應,並傳回回應或引發例外狀況。
  • 非阻塞/非同步存根,可對伺服器發出非阻塞呼叫,並以非同步方式傳回回應。您只能使用非同步存根撥打特定類型的串流呼叫。

首先,我們需要為虛設常式建立 gRPC 通道,並指定要連線的伺服器位址和通訊埠:

  public static void main(String[] args) throws InterruptedException {
    String target = "localhost:8980";
    if (args.length > 0) {
      if ("--help".equals(args[0])) {
        System.err.println("Usage: [target]");
        System.err.println("");
        System.err.println("  target  The server to connect to. Defaults to " + target);
        System.exit(1);
      }
      target = args[0];
    }

    List<Feature> features;
    try {
      features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
        .build();
    try {
      RouteGuideClient client = new RouteGuideClient(channel);

      // Looking for features between 40, -75 and 42, -73.
      client.listFeatures(400000000, -750000000, 420000000, -730000000);

      // Record a few randomly selected points from the features file.
      client.recordRoute(features, 10);

      // Send and receive some notes.
      CountDownLatch finishLatch = client.routeChat();

      if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        client.warning("routeChat did not finish within 1 minutes");
      }
    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }

我們會使用 ManagedChannelBuilder 建立頻道。

現在,我們可以透過管道,使用從 .proto 產生的 RouteGuideGrpc 類別中提供的 newStubnewBlockingStub 方法,建立存根。

public RouteGuideClient(Channel channel) {
    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
  }

請注意,如果不是封鎖,就是非同步

呼叫服務方法

接著來看看如何呼叫服務方法。請注意,從封鎖存根建立的任何 RPC 都會以封鎖/同步模式運作,也就是說,RPC 呼叫會等待伺服器回應,並傳回回應或錯誤。

伺服器端串流 RPC

接著,我們來看看對 ListFeatures 的伺服器端串流呼叫,這會傳回地理位置 Feature 串流:

Rectangle request = Rectangle.newBuilder()
             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

如您所見,這與我們在「Getting_Started_With_gRPC_Java」程式碼研究室中看到的簡單一元 RPC 非常相似,但方法傳回的不是單一 Feature,而是 Iterator,用戶端可使用這個 Iterator 讀取所有傳回的 Features

用戶端串流 RPC

現在要介紹稍微複雜一點的內容:用戶端串流方法 RecordRoute,我們會將 Points 串流傳送至伺服器,並取得單一 RouteSummary。這個方法需要使用非同步存根。如果您已閱讀「建立伺服器」,可能會覺得有些內容很熟悉,因為非同步串流 RPC 的實作方式在兩端都類似。

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {

    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

如您所見,如要呼叫這個方法,我們需要建立 StreamObserver,其中會實作特殊介面,供伺服器使用其 RouteSummary 回應呼叫。在我們的StreamObserver中:

  • 覆寫 onNext() 方法,在伺服器將 RouteSummary 寫入訊息串流時,列印傳回的資訊。
  • 覆寫 onCompleted() 方法 (伺服器完成呼叫時會呼叫此方法),藉此減少 CountDownLatch,以便檢查伺服器是否已完成寫入作業。

接著,我們會將 StreamObserver 傳遞至非同步存根的 recordRoute() 方法,並取得我們自己的 StreamObserver 要求觀察器,以便編寫要傳送至伺服器的 Points。寫入點完成後,我們會使用要求觀察器的 onCompleted() 方法,告知 gRPC 我們已在用戶端完成寫入作業。完成後,我們會檢查 CountDownLatch,確認伺服器是否已完成作業。

雙向串流 RPC

最後,我們來看看雙向串流 RPC RouteChat()

public CountDownLatch routeChat() {
    info("*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat(new StreamObserver<RouteNote>() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          }

          @Override
          public void onError(Throwable t) {
            warning("RouteChat Failed: {0}", Status.fromThrowable(t));
            finishLatch.countDown();
          }

          @Override
          public void onCompleted() {
            info("Finished RouteChat");
            finishLatch.countDown();
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
              newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // return the latch while receiving happens asynchronously
    return finishLatch;
  }

與用戶端串流範例相同,我們也會取得並傳回 StreamObserver 回應觀察器,但這次我們是透過方法的回應觀察器傳送值,而伺服器仍會將訊息寫入訊息串流。這裡的讀取和寫入語法與用戶端串流方法完全相同。雖然雙方一律會依撰寫順序收到對方的訊息,但用戶端和伺服器可以依任意順序讀取及寫入資料,因為資料串是完全獨立運作。

7. 快來試試!

  1. start_here 目錄:
$ ./gradlew installDist

這會編譯您的程式碼、將其封裝在 JAR 中,並建立執行範例的指令碼。系統會在 build/install/start_here/bin/ 目錄中建立這些檔案。腳本為:route-guide-serverroute-guide-client

啟動用戶端前,伺服器必須先執行。

  1. 執行伺服器:
$ ./build/install/start_here/bin/route-guide-server
  1. 執行用戶端:
$ ./build/install/start_here/bin/route-guide-client

8. 後續步驟