1. 簡介
在本程式碼研究室中,您將使用 gRPC-Java 建立用戶端和伺服器,做為以 Java 編寫的路線對應應用程式基礎。
完成本教學課程後,您將擁有一個用戶端,可使用 gRPC 連線至遠端伺服器,取得用戶端路線上的功能資訊、建立用戶端路線摘要,以及與伺服器和其他用戶端交換路線資訊 (例如交通資訊更新)。
服務定義於 Protocol Buffers 檔案中,用於產生用戶端和伺服器的樣板程式碼,以便彼此通訊,節省您實作該功能的時間和精力。
這段產生的程式碼不僅會處理伺服器與用戶端之間複雜的通訊,也會處理資料序列化和還原序列化。
課程內容
- 如何使用通訊協定緩衝區定義服務 API。
- 如何使用自動程式碼生成功能,從通訊協定緩衝區定義建構以 gRPC 為基礎的用戶端和伺服器。
- 瞭解如何使用 gRPC 進行用戶端與伺服器之間的串流通訊。
本程式碼研究室適用於剛接觸 gRPC 或想複習 gRPC 的 Java 開發人員,以及有興趣建構分散式系統的任何人。不需要有 gRPC 相關經驗。
2. 事前準備
必要條件
- JDK 版本 24。
取得程式碼
為避免您必須從頭開始,本程式碼研究室提供應用程式原始碼的架構,供您完成。下列步驟將說明如何完成應用程式,包括使用通訊協定緩衝區編譯器外掛程式產生樣板 gRPC 程式碼。
首先,請建立程式碼研究室工作目錄,然後 cd 到該目錄:
mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started
下載並擷取程式碼研究室:
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
| tar xvz --strip-components=4 \
grpc-codelabs-1/codelabs/grpc-java-streaming/start_here
或者,您也可以下載只包含 Codelab 目錄的 .zip 檔案,然後手動解壓縮。
如要略過實作的輸入作業,可以前往 GitHub 取得完整的原始碼。
3. 定義訊息和服務
首先,請使用通訊協定緩衝區定義應用程式的 gRPC 服務、RPC 方法,以及要求和回應訊息類型。你的服務將提供:
- 伺服器實作且用戶端呼叫的 RPC 方法 (
ListFeatures
、RecordRoute
和RouteChat
)。 - 訊息類型
Point
、Feature
、Rectangle
、RouteNote
和RouteSummary
,這些是呼叫上述方法時,用戶端和伺服器之間交換的資料結構。
通訊協定緩衝區通常稱為 protobuf。如要進一步瞭解 gRPC 術語,請參閱 gRPC 的「核心概念、架構和生命週期」。
這個 RPC 方法及其訊息型別都會在提供的原始碼 proto/routeguide/route_guide.proto
檔案中定義。
讓我們建立 route_guide.proto
檔案。
由於我們要在這個範例中產生 Java 程式碼,因此已在 .proto
中指定 java_package
檔案選項:
option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";
定義訊息類型
在原始碼的 proto/routeguide/route_guide.proto
檔案中,請先定義 Point
訊息型別。Point
代表地圖上的經緯度座標組合。在本程式碼研究室中,請使用整數做為座標:
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
數字 1
和 2
是 message
結構中每個欄位的專屬 ID 編號。
接著,定義 Feature
訊息類型。Feature
會使用 string
欄位,指定 Point
所指定位置的名稱或郵寄地址:
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
如要將區域內的多個點串流至用戶端,您需要 Rectangle
訊息,代表以兩個對角點 lo
和 hi
表示的經緯度矩形:
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
此外,RouteNote
訊息代表在特定時間點傳送的訊息:
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
最後,您需要 RouteSummary
訊息。這則訊息是針對 RecordRoute
RPC 收到的回應,下一節將說明這項 RPC。其中包含收到的個別點數、偵測到的特徵數量,以及每個點之間距離的累計總和,也就是涵蓋的總距離。
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
定義服務方法
如要定義服務,請在 .proto
檔案中指定具名服務。route_guide.proto
檔案具有名為 RouteGuide
的 service
結構,可定義應用程式服務提供的一或多個方法。
在服務定義中定義 RPC
方法時,請指定要求和回應類型。在本程式碼研究室的這一節中,我們將定義:
ListFeatures
取得指定 Rectangle
中可用的 Feature
物件。由於矩形可能涵蓋大範圍區域,並包含大量特徵,因此系統會串流傳回結果,而非一次傳回。
在這個應用程式中,您將使用伺服器端串流 RPC:用戶端會將要求傳送至伺服器,並取得串流來讀取一系列訊息。用戶端會從傳回的串流讀取訊息,直到沒有更多訊息為止。如範例所示,您可以在回應型別前加上 stream 關鍵字,指定伺服器端串流方法。
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
接受所遍歷路徑上的 Points 串流,並在遍歷完成時傳回 RouteSummary
。
在這種情況下,用戶端串流 RPC 是合適的選擇:用戶端會寫入一連串訊息並傳送至伺服器,同樣是使用提供的串流。用戶端寫完訊息後,會等待伺服器讀取所有訊息並傳回回應。如要指定用戶端串流方法,請在要求類型前放置串流關鍵字。
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
接受在路線遍歷期間傳送的 RouteNotes
串流,同時接收其他 RouteNotes
(例如來自其他使用者)。
這正是雙向串流的適用情境。雙向串流 RPC,雙方都會使用讀寫串流傳送一連串訊息。這兩個串流會獨立運作,因此用戶端和伺服器可以任意順序讀取和寫入資料。舉例來說,伺服器可以等待接收所有用戶端訊息,再寫入回應;也可以交替讀取和寫入訊息;或是以其他方式組合讀取和寫入作業。每個串流中的訊息順序都會保留。如要指定這類方法,請在要求和回應前加上串流關鍵字。
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. 產生用戶端和伺服器程式碼
接下來,我們需要從 .proto
服務定義產生 gRPC 用戶端和伺服器介面。我們會使用通訊協定緩衝區編譯器 protoc
和特殊的 gRPC Java 外掛程式來完成這項作業。您必須使用 proto3 編譯器 (同時支援 proto2 和 proto3 語法),才能產生 gRPC 服務。
使用 Gradle 或 Maven 時,protoc 建構外掛程式會在建構過程中產生必要程式碼。如要瞭解如何從自己的 .proto
檔案產生程式碼,請參閱 grpc-java README。
我們已提供 Gradle 設定。
從 streaming-grpc-java-getting-started
目錄輸入
$ chmod +x gradlew $ ./gradlew generateProto
下列類別是從服務定義 (位於 build/generated/sources/proto/main/java
下方) 產生:
- 每個訊息類型各有一個:
Feature.java
、Rectangle.java, ...
,其中包含所有通訊協定緩衝區程式碼,可填入、序列化及擷取要求和回應訊息類型。 RouteGuideGrpc.java
,其中包含 (以及一些其他實用程式碼) 供RouteGuide
伺服器實作的基礎類別RouteGuideGrpc.RouteGuideImplBase
,以及RouteGuide
服務中定義的所有方法,以及供用戶端使用的存根類別
5. 實作服務
首先,我們來看看如何建立 RouteGuide
伺服器。如要讓 RouteGuide
服務正常運作,需要完成以下兩個步驟:
- 實作從服務定義產生的服務介面:執行服務的實際「工作」。
- 執行 gRPC 伺服器,監聽來自用戶端的要求,並將要求分派至正確的服務實作項目。
實作 RouteGuide
我們將實作 RouteGuideService
類別,擴充產生的 RouteGuideGrpc.RouteGuideImplBase 類別。實作方式如下所示。
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
...
}
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
...
}
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
...
}
接著詳細瞭解各項 RPC 實作
伺服器端串流 RPC
接著來看其中一個串流 RPC。ListFeatures
是伺服器端串流 RPC,因此我們需要將多個 Features
傳送回用戶端。
private final Collection<Feature> features;
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
與簡單的 RPC 類似,這個方法會取得要求物件 (用戶端要尋找 Features
的 Rectangle
) 和 StreamObserver
回應觀察器。
這次,我們會取得盡可能多的 Feature
物件,以便傳回給用戶端 (在本例中,我們會根據物件是否位於要求 Rectangle
內,從服務的功能集合中選取物件),並使用回應觀察器的 onNext()
方法,依序將每個物件寫入回應觀察器。最後,如同簡單的 RPC,我們使用回應觀察程式的 onCompleted()
方法,告知 gRPC 我們已完成撰寫回應。
用戶端串流 RPC
現在來看看稍微複雜一點的內容:用戶端串流方法 RecordRoute()
,我們會從用戶端取得 Points
串流,並傳回單一 RouteSummary
,其中包含行程資訊。
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
如您所見,與先前的方法類型一樣,我們的方法會取得 StreamObserver
responseObserver
參數,但這次會傳回 StreamObserver
,供用戶端寫入 Points
。
在方法主體中,我們會例項化要傳回的匿名 StreamObserver
,其中:
- 覆寫
onNext()
方法,在用戶端每次將Point
寫入訊息串流時,取得功能和其他資訊。 - 覆寫
onCompleted()
方法 (在用戶端完成訊息寫入時呼叫),以填入及建構RouteSummary
。接著,我們使用RouteSummary
呼叫方法本身的回應觀察器onNext()
,然後呼叫其onCompleted()
方法,從伺服器端完成呼叫。
雙向串流 RPC
最後,我們來看看雙向串流 RPC RouteChat()
。
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
與用戶端串流範例相同,我們都會取得並傳回 StreamObserver
,但這次我們是透過方法的回應觀察器傳回值,而用戶端仍在將訊息寫入其訊息串流。這裡的讀取和寫入語法與用戶端串流和伺服器串流方法完全相同。雖然雙方一律會依撰寫順序收到對方的訊息,但用戶端和伺服器可以依任意順序讀取及寫入資料,因為資料串是完全獨立運作。
啟動伺服器
實作所有方法後,我們也需要啟動 gRPC 伺服器,讓用戶端實際使用服務。下列程式碼片段顯示我們如何為 RouteGuide
服務執行這項操作:
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}
/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
}
如您所見,我們使用 ServerBuilder
建構及啟動伺服器。
為提供這項服務,我們會:
- 使用建構工具的
forPort()
方法,指定要用來接聽用戶端要求的位址和連接埠。 - 建立服務實作類別
RouteGuideService
的例項,並傳遞至建構工具的addService()
方法。 - 在建構工具上呼叫
build()
和start()
,為服務建立並啟動 RPC 伺服器。
由於 ServerBuilder 已納入通訊埠,我們傳遞通訊埠的唯一原因,是為了將其用於記錄。
6. 建立用戶端
在本節中,我們將說明如何為 RouteGuide
服務建立用戶端。您可以在 ../complete/src/main/java/io/grpc/complete/routeguide/
RouteGuideClient.java
中查看完整的範例用戶端程式碼。
建立虛設常式
如要呼叫服務方法,我們首先需要建立「虛設常式」,或者說,兩個虛設常式:
- 封鎖/同步存根:這表示 RPC 呼叫會等待伺服器回應,並傳回回應或引發例外狀況。
- 非阻塞/非同步存根,可對伺服器發出非阻塞呼叫,並以非同步方式傳回回應。您只能使用非同步存根撥打特定類型的串流呼叫。
首先,我們需要為虛設常式建立 gRPC 通道,並指定要連線的伺服器位址和通訊埠:
public static void main(String[] args) throws InterruptedException {
String target = "localhost:8980";
if (args.length > 0) {
if ("--help".equals(args[0])) {
System.err.println("Usage: [target]");
System.err.println("");
System.err.println(" target The server to connect to. Defaults to " + target);
System.exit(1);
}
target = args[0];
}
List<Feature> features;
try {
features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
} catch (IOException ex) {
ex.printStackTrace();
return;
}
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
.build();
try {
RouteGuideClient client = new RouteGuideClient(channel);
// Looking for features between 40, -75 and 42, -73.
client.listFeatures(400000000, -750000000, 420000000, -730000000);
// Record a few randomly selected points from the features file.
client.recordRoute(features, 10);
// Send and receive some notes.
CountDownLatch finishLatch = client.routeChat();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
client.warning("routeChat did not finish within 1 minutes");
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
我們會使用 ManagedChannelBuilder
建立頻道。
現在,我們可以透過管道,使用從 .proto
產生的 RouteGuideGrpc
類別中提供的 newStub
和 newBlockingStub
方法,建立存根。
public RouteGuideClient(Channel channel) {
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
請注意,如果不是封鎖,就是非同步
呼叫服務方法
接著來看看如何呼叫服務方法。請注意,從封鎖存根建立的任何 RPC 都會以封鎖/同步模式運作,也就是說,RPC 呼叫會等待伺服器回應,並傳回回應或錯誤。
伺服器端串流 RPC
接著,我們來看看對 ListFeatures
的伺服器端串流呼叫,這會傳回地理位置 Feature
串流:
Rectangle request = Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
如您所見,這與我們在「Getting_Started_With_gRPC_Java」程式碼研究室中看到的簡單一元 RPC 非常相似,但方法傳回的不是單一 Feature
,而是 Iterator
,用戶端可使用這個 Iterator
讀取所有傳回的 Features
。
用戶端串流 RPC
現在要介紹稍微複雜一點的內容:用戶端串流方法 RecordRoute
,我們會將 Points
串流傳送至伺服器,並取得單一 RouteSummary
。這個方法需要使用非同步存根。如果您已閱讀「建立伺服器」,可能會覺得有些內容很熟悉,因為非同步串流 RPC 的實作方式在兩端都類似。
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
如您所見,如要呼叫這個方法,我們需要建立 StreamObserver
,其中會實作特殊介面,供伺服器使用其 RouteSummary
回應呼叫。在我們的StreamObserver
中:
- 覆寫
onNext()
方法,在伺服器將RouteSummary
寫入訊息串流時,列印傳回的資訊。 - 覆寫
onCompleted()
方法 (伺服器完成呼叫時會呼叫此方法),藉此減少CountDownLatch
,以便檢查伺服器是否已完成寫入作業。
接著,我們會將 StreamObserver
傳遞至非同步存根的 recordRoute()
方法,並取得我們自己的 StreamObserver
要求觀察器,以便編寫要傳送至伺服器的 Points
。寫入點完成後,我們會使用要求觀察器的 onCompleted()
方法,告知 gRPC 我們已在用戶端完成寫入作業。完成後,我們會檢查 CountDownLatch
,確認伺服器是否已完成作業。
雙向串流 RPC
最後,我們來看看雙向串流 RPC RouteChat()
。
public CountDownLatch routeChat() {
info("*** RouteChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@Override
public void onError(Throwable t) {
warning("RouteChat Failed: {0}", Status.fromThrowable(t));
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// return the latch while receiving happens asynchronously
return finishLatch;
}
與用戶端串流範例相同,我們也會取得並傳回 StreamObserver
回應觀察器,但這次我們是透過方法的回應觀察器傳送值,而伺服器仍會將訊息寫入其訊息串流。這裡的讀取和寫入語法與用戶端串流方法完全相同。雖然雙方一律會依撰寫順序收到對方的訊息,但用戶端和伺服器可以依任意順序讀取及寫入資料,因為資料串是完全獨立運作。
7. 快來試試!
- 從
start_here
目錄:
$ ./gradlew installDist
這會編譯您的程式碼、將其封裝在 JAR 中,並建立執行範例的指令碼。系統會在 build/install/start_here/bin/
目錄中建立這些檔案。腳本為:route-guide-server
和 route-guide-client
。
啟動用戶端前,伺服器必須先執行。
- 執行伺服器:
$ ./build/install/start_here/bin/route-guide-server
- 執行用戶端:
$ ./build/install/start_here/bin/route-guide-client