1. Giới thiệu
Trong lớp học lập trình này, bạn sẽ sử dụng gRPC-Java để tạo một máy khách và máy chủ tạo thành nền tảng của một ứng dụng lập bản đồ tuyến đường được viết bằng Java.
Khi kết thúc hướng dẫn này, bạn sẽ có một ứng dụng kết nối với một máy chủ từ xa bằng gRPC để lấy thông tin về các đối tượng trên tuyến đường của ứng dụng, tạo bản tóm tắt về tuyến đường của ứng dụng và trao đổi thông tin về tuyến đường (chẳng hạn như thông tin cập nhật về tình trạng giao thông) với máy chủ và các ứng dụng khác.
Dịch vụ này được xác định trong một tệp Protocol Buffers. Tệp này sẽ được dùng để tạo mã chuẩn cho ứng dụng và máy chủ để chúng có thể giao tiếp với nhau, giúp bạn tiết kiệm thời gian và công sức khi triển khai chức năng đó.
Mã được tạo này không chỉ xử lý sự phức tạp của việc giao tiếp giữa máy chủ và ứng dụng mà còn xử lý quá trình chuyển đổi dữ liệu thành chuỗi và chuyển đổi chuỗi thành dữ liệu.
Kiến thức bạn sẽ học được
- Cách sử dụng Protocol Buffers để xác định một API dịch vụ.
- Cách tạo máy khách và máy chủ dựa trên gRPC từ một định nghĩa Protocol Buffers bằng cách sử dụng tính năng tạo mã tự động.
- Hiểu rõ về giao tiếp truyền phát trực tiếp giữa máy khách và máy chủ bằng gRPC.
Lớp học lập trình này dành cho các nhà phát triển Java mới làm quen với gRPC hoặc muốn tìm hiểu lại về gRPC, hoặc bất kỳ ai khác quan tâm đến việc xây dựng hệ thống phân tán. Bạn không cần có kinh nghiệm sử dụng gRPC.
2. Trước khi bắt đầu
Điều kiện tiên quyết
- JDK phiên bản 24.
Lấy mã
Để bạn không phải bắt đầu hoàn toàn từ đầu, lớp học lập trình này cung cấp một khung mã nguồn ứng dụng để bạn hoàn tất. Các bước sau đây sẽ hướng dẫn bạn cách hoàn tất ứng dụng, bao gồm cả việc sử dụng trình bổ trợ trình biên dịch vùng đệm giao thức để tạo mã gRPC chung.
Trước tiên, hãy tạo thư mục làm việc cho lớp học lập trình rồi chuyển đến thư mục đó:
mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started
Tải và giải nén lớp học lập trình:
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
| tar xvz --strip-components=4 \
grpc-codelabs-1/codelabs/grpc-java-streaming/start_here
Ngoài ra, bạn có thể tải tệp .zip chỉ chứa thư mục codelab xuống rồi giải nén theo cách thủ công.
Bạn có thể xem mã nguồn hoàn chỉnh trên GitHub nếu muốn bỏ qua bước nhập nội dung triển khai.
3. Xác định thông báo và dịch vụ
Bước đầu tiên là xác định dịch vụ gRPC của ứng dụng, phương thức RPC và các loại thông báo yêu cầu và phản hồi bằng cách sử dụng Protocol Buffers. Dịch vụ của bạn sẽ cung cấp:
- Các phương thức RPC được gọi là
ListFeatures
,RecordRoute
vàRouteChat
mà máy chủ triển khai và ứng dụng gọi. - Các loại thông báo
Point
,Feature
,Rectangle
,RouteNote
vàRouteSummary
là các cấu trúc dữ liệu được trao đổi giữa máy khách và máy chủ khi gọi các phương thức ở trên.
Vùng đệm giao thức thường được gọi là protobuf. Để biết thêm thông tin về thuật ngữ gRPC, hãy xem phần Các khái niệm, cấu trúc và vòng đời cốt lõi của gRPC.
Phương thức RPC này và các loại thông báo của phương thức này sẽ được xác định trong tệp proto/routeguide/route_guide.proto
của mã nguồn được cung cấp.
Hãy tạo một tệp route_guide.proto
.
Vì chúng ta đang tạo mã Java trong ví dụ này, nên chúng ta đã chỉ định một lựa chọn tệp java_package
trong .proto
:
option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";
Xác định các loại thông báo
Trong tệp proto/routeguide/route_guide.proto
của mã nguồn, trước tiên hãy xác định kiểu thông báo Point
. Point
biểu thị một cặp toạ độ vĩ độ và kinh độ trên bản đồ. Trong lớp học lập trình này, hãy sử dụng số nguyên cho toạ độ:
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
Các số 1
và 2
là số nhận dạng duy nhất cho từng trường trong cấu trúc message
.
Tiếp theo, hãy xác định loại thông báo Feature
. Feature
sử dụng trường string
cho tên hoặc địa chỉ bưu chính của một thứ gì đó tại một vị trí do Point
chỉ định:
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
Để truyền trực tuyến nhiều điểm trong một khu vực đến một ứng dụng, bạn sẽ cần một thông báo Rectangle
đại diện cho hình chữ nhật vĩ độ – kinh độ, được biểu thị dưới dạng hai điểm đối diện theo đường chéo lo
và hi
:
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
Ngoài ra, một thông báo RouteNote
đại diện cho một thông báo được gửi tại một thời điểm nhất định:
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
Cuối cùng, bạn sẽ cần một thông báo RouteSummary
. Thông báo này được nhận để phản hồi một RPC RecordRoute
, được giải thích trong phần tiếp theo. Tệp này chứa số lượng điểm riêng lẻ nhận được, số lượng đối tượng được phát hiện và tổng khoảng cách đã đi được tính bằng tổng tích luỹ khoảng cách giữa mỗi điểm.
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
Xác định các phương thức dịch vụ
Để xác định một dịch vụ, bạn chỉ định một dịch vụ có tên trong tệp .proto
. Tệp route_guide.proto
có cấu trúc service
tên là RouteGuide
, xác định một hoặc nhiều phương thức do dịch vụ của ứng dụng cung cấp.
Khi xác định các phương thức RPC
trong định nghĩa dịch vụ, bạn sẽ chỉ định các loại yêu cầu và phản hồi của các phương thức đó. Trong phần này của lớp học lập trình, hãy xác định:
ListFeatures
Lấy các đối tượng Feature
có trong Rectangle
đã cho. Kết quả được truyền trực tuyến thay vì trả về ngay lập tức vì hình chữ nhật có thể bao phủ một khu vực rộng lớn và chứa một số lượng lớn các đối tượng.
Đối với ứng dụng này, bạn sẽ sử dụng một RPC truyền phát trực tiếp phía máy chủ: ứng dụng gửi một yêu cầu đến máy chủ và nhận một luồng để đọc lại một chuỗi thông báo. Ứng dụng đọc từ luồng được trả về cho đến khi không còn thông báo nào nữa. Như bạn có thể thấy trong ví dụ của chúng tôi, bạn chỉ định một phương thức truyền phát trực tiếp phía máy chủ bằng cách đặt từ khoá truyền phát trực tiếp trước loại phản hồi.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
Chấp nhận một luồng Điểm trên tuyến đường đang được đi qua, trả về RouteSummary
khi quá trình đi qua hoàn tất.
RPC truyền trực tuyến phía máy khách là phù hợp trong trường hợp này: máy khách ghi một chuỗi thông báo và gửi chúng đến máy chủ, một lần nữa bằng cách sử dụng một luồng được cung cấp. Sau khi hoàn tất việc ghi các thông báo, ứng dụng sẽ đợi máy chủ đọc tất cả các thông báo đó và trả về phản hồi. Bạn chỉ định phương thức phát trực tuyến phía máy khách bằng cách đặt từ khoá stream trước loại yêu cầu.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
Chấp nhận luồng RouteNotes
được gửi trong khi một tuyến đường đang được đi qua, đồng thời nhận các RouteNotes
khác (ví dụ: từ những người dùng khác).
Đây chính xác là trường hợp sử dụng cho truyền phát trực tiếp hai chiều. Một RPC truyền trực tuyến hai chiều, trong đó cả hai bên đều gửi một chuỗi thông báo bằng cách sử dụng luồng đọc-ghi. Hai luồng này hoạt động độc lập, vì vậy, các ứng dụng và máy chủ có thể đọc và ghi theo bất kỳ thứ tự nào mà chúng muốn: ví dụ: máy chủ có thể đợi nhận tất cả tin nhắn của ứng dụng trước khi ghi phản hồi hoặc có thể lần lượt đọc một tin nhắn rồi ghi một tin nhắn, hoặc một số tổ hợp khác giữa đọc và ghi. Thứ tự của các thông báo trong mỗi luồng được giữ nguyên. Bạn chỉ định loại phương thức này bằng cách đặt từ khoá luồng trước cả yêu cầu và phản hồi.
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. Tạo mã máy khách và mã máy chủ
Tiếp theo, chúng ta cần tạo giao diện máy chủ và máy khách gRPC từ định nghĩa dịch vụ .proto
. Chúng tôi thực hiện việc này bằng cách sử dụng trình biên dịch vùng đệm giao thức protoc
với một trình bổ trợ Java gRPC đặc biệt. Bạn cần sử dụng trình biên dịch proto3 (hỗ trợ cả cú pháp proto2 và proto3) để tạo các dịch vụ gRPC.
Khi sử dụng Gradle hoặc Maven, trình bổ trợ bản dựng protoc có thể tạo mã cần thiết trong quá trình tạo. Bạn có thể tham khảo README của grpc-java để biết cách tạo mã từ các tệp .proto
của riêng bạn.
Chúng tôi đã cung cấp cấu hình Gradle.
Từ danh bạ streaming-grpc-java-getting-started
, hãy nhập
$ chmod +x gradlew $ ./gradlew generateProto
Các lớp sau được tạo từ định nghĩa dịch vụ của chúng tôi (trong build/generated/sources/proto/main/java
):
- Một cho mỗi loại thông báo:
Feature.java
,Rectangle.java, ...
chứa tất cả mã vùng đệm giao thức để điền, chuyển đổi tuần tự và truy xuất các loại thông báo yêu cầu và phản hồi của chúng ta. RouteGuideGrpc.java
chứa (cùng với một số mã hữu ích khác) một lớp cơ sở để các máy chủRouteGuide
triển khai,RouteGuideGrpc.RouteGuideImplBase
, với tất cả các phương thức được xác định trong dịch vụRouteGuide
và các lớp gốc để máy khách sử dụng
5. Triển khai dịch vụ
Trước tiên, hãy xem cách chúng ta tạo một máy chủ RouteGuide
. Có hai phần để dịch vụ RouteGuide
thực hiện công việc của mình:
- Triển khai giao diện dịch vụ được tạo từ định nghĩa dịch vụ của chúng ta: thực hiện "công việc" thực tế của dịch vụ.
- Chạy một máy chủ gRPC để lắng nghe các yêu cầu từ ứng dụng và gửi các yêu cầu đó đến đúng quy trình triển khai dịch vụ.
Triển khai RouteGuide
Chúng ta sẽ triển khai một lớp RouteGuideService
mở rộng lớp RouteGuideGrpc.RouteGuideImplBase đã tạo. Đây là cách triển khai.
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
...
}
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
...
}
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
...
}
Hãy cùng tìm hiểu chi tiết từng cách triển khai RPC
RPC truyền trực tuyến phía máy chủ
Tiếp theo, hãy xem xét một trong các RPC truyền phát trực tiếp của chúng ta. ListFeatures
là một RPC truyền trực tuyến phía máy chủ, vì vậy chúng ta cần gửi lại nhiều Features
cho ứng dụng.
private final Collection<Feature> features;
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
Giống như RPC đơn giản, phương thức này nhận một đối tượng yêu cầu (Rectangle
mà ứng dụng khách của chúng ta muốn tìm Features
) và một trình quan sát phản hồi StreamObserver
.
Lần này, chúng ta sẽ lấy nhiều đối tượng Feature
nhất có thể để trả về cho ứng dụng (trong trường hợp này, chúng ta sẽ chọn các đối tượng đó từ bộ sưu tập đối tượng của dịch vụ dựa trên việc các đối tượng đó có nằm trong Rectangle
yêu cầu của chúng ta hay không), rồi lần lượt ghi từng đối tượng vào trình theo dõi phản hồi bằng phương thức onNext()
của trình theo dõi. Cuối cùng, như trong RPC đơn giản, chúng ta sử dụng phương thức onCompleted()
của trình theo dõi phản hồi để cho gRPC biết rằng chúng ta đã hoàn tất việc ghi phản hồi.
RPC truyền phát trực tiếp phía máy khách
Bây giờ, hãy xem xét một phương thức phức tạp hơn một chút: phương thức truyền trực tuyến phía máy khách RecordRoute()
, trong đó chúng ta nhận được một luồng Points
từ máy khách và trả về một RouteSummary
duy nhất có thông tin về chuyến đi của họ.
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
Như bạn thấy, giống như các loại phương thức trước, phương thức của chúng ta nhận được một tham số StreamObserver
responseObserver
, nhưng lần này, phương thức đó trả về một StreamObserver
để ứng dụng có thể ghi Points
.
Trong phần nội dung phương thức, chúng ta sẽ tạo một StreamObserver
ẩn danh để trả về, trong đó chúng ta:
- Ghi đè phương thức
onNext()
để nhận các tính năng và thông tin khác mỗi khi máy khách ghiPoint
vào luồng thông báo. - Ghi đè phương thức
onCompleted()
(được gọi khi máy khách đã ghi xong các thông báo) để điền sẵn và tạoRouteSummary
. Sau đó, chúng ta gọionNext()
của trình theo dõi phản hồi riêng của phương thức bằngRouteSummary
, rồi gọi phương thứconCompleted()
để hoàn tất lệnh gọi từ phía máy chủ.
RPC truyền trực tuyến hai chiều
Cuối cùng, hãy xem RPC truyền phát trực tiếp hai chiều RouteChat()
.
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
Giống như ví dụ về truyền phát trực tiếp phía máy khách, cả hai chúng ta đều nhận và trả về một StreamObserver
, ngoại trừ lần này chúng ta trả về các giá trị thông qua trình quan sát phản hồi của phương thức trong khi máy khách vẫn đang ghi tin nhắn vào luồng tin nhắn của họ. Cú pháp để đọc và ghi ở đây hoàn toàn giống như cú pháp cho các phương thức truyền trực tuyến phía máy khách và truyền trực tuyến phía máy chủ. Mặc dù mỗi bên sẽ luôn nhận được tin nhắn của bên kia theo thứ tự chúng được viết, nhưng cả máy khách và máy chủ đều có thể đọc và ghi theo bất kỳ thứ tự nào – các luồng hoạt động hoàn toàn độc lập.
Khởi động máy chủ
Sau khi triển khai tất cả các phương thức, chúng ta cũng cần khởi động một máy chủ gRPC để máy khách có thể thực sự sử dụng dịch vụ của chúng ta. Đoạn mã sau đây cho biết cách chúng tôi thực hiện việc này cho dịch vụ RouteGuide
:
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}
/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
}
Như bạn có thể thấy, chúng ta tạo và khởi động máy chủ bằng cách sử dụng ServerBuilder
.
Để làm việc này, chúng tôi cần:
- Chỉ định địa chỉ và cổng mà chúng ta muốn dùng để theo dõi các yêu cầu của ứng dụng bằng phương thức
forPort()
của trình tạo. - Tạo một thực thể của lớp triển khai dịch vụ
RouteGuideService
và truyền thực thể đó đến phương thứcaddService()
của trình tạo. - Gọi
build()
vàstart()
trên trình tạo để tạo và khởi động một máy chủ RPC cho dịch vụ của chúng ta.
Vì ServerBuilder đã kết hợp cổng, nên lý do duy nhất chúng ta truyền cổng là để sử dụng cổng đó cho việc ghi nhật ký.
6. Tạo ứng dụng
Trong phần này, chúng ta sẽ xem xét việc tạo một ứng dụng cho dịch vụ RouteGuide
. Bạn có thể xem mã ví dụ hoàn chỉnh của ứng dụng khách trong ../complete/src/main/java/io/grpc/complete/routeguide/
RouteGuideClient.java
.
Tạo thực thể cho một phần giữ chỗ
Để gọi các phương thức dịch vụ, trước tiên, chúng ta cần tạo một stub, hay đúng hơn là hai stub:
- một phần chặn/đồng bộ: điều này có nghĩa là lệnh gọi RPC sẽ đợi máy chủ phản hồi và sẽ trả về một phản hồi hoặc gửi một ngoại lệ.
- một phần không chặn/không đồng bộ tạo các lệnh gọi không chặn đến máy chủ, trong đó phản hồi được trả về không đồng bộ. Bạn chỉ có thể thực hiện một số loại cuộc gọi truyền trực tuyến nhất định bằng cách sử dụng một phần giữ chỗ không đồng bộ.
Trước tiên, chúng ta cần tạo một kênh gRPC cho mã giả lập, chỉ định địa chỉ máy chủ và cổng mà chúng ta muốn kết nối:
public static void main(String[] args) throws InterruptedException {
String target = "localhost:8980";
if (args.length > 0) {
if ("--help".equals(args[0])) {
System.err.println("Usage: [target]");
System.err.println("");
System.err.println(" target The server to connect to. Defaults to " + target);
System.exit(1);
}
target = args[0];
}
List<Feature> features;
try {
features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
} catch (IOException ex) {
ex.printStackTrace();
return;
}
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
.build();
try {
RouteGuideClient client = new RouteGuideClient(channel);
// Looking for features between 40, -75 and 42, -73.
client.listFeatures(400000000, -750000000, 420000000, -730000000);
// Record a few randomly selected points from the features file.
client.recordRoute(features, 10);
// Send and receive some notes.
CountDownLatch finishLatch = client.routeChat();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
client.warning("routeChat did not finish within 1 minutes");
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
Chúng tôi dùng ManagedChannelBuilder
để tạo kênh.
Giờ đây, chúng ta có thể dùng kênh này để tạo các phần giữ chỗ bằng cách sử dụng các phương thức newStub
và newBlockingStub
có trong lớp RouteGuideGrpc
mà chúng ta đã tạo từ .proto
.
public RouteGuideClient(Channel channel) {
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
Hãy nhớ rằng nếu không chặn thì đó là hoạt động không đồng bộ
Gọi các phương thức dịch vụ
Bây giờ, hãy xem cách chúng ta gọi các phương thức dịch vụ. Xin lưu ý rằng mọi RPC được tạo từ phần giữ chỗ chặn sẽ hoạt động ở chế độ chặn/đồng bộ, tức là lệnh gọi RPC sẽ đợi máy chủ phản hồi và sẽ trả về phản hồi hoặc lỗi.
RPC truyền trực tuyến phía máy chủ
Tiếp theo, hãy xem một lệnh gọi truyền trực tuyến phía máy chủ đến ListFeatures
, lệnh này sẽ trả về một luồng Feature
địa lý:
Rectangle request = Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
Như bạn thấy, phương thức này rất giống với RPC đơn giản mà chúng ta đã xem trong lớp học lập trình Getting_Started_With_gRPC_Java, ngoại trừ việc thay vì trả về một Feature
, phương thức này sẽ trả về một Iterator
mà máy khách có thể dùng để đọc tất cả Features
đã trả về.
RPC truyền phát trực tiếp phía máy khách
Bây giờ, chúng ta sẽ xem xét một phương thức phức tạp hơn một chút: phương thức truyền phát trực tiếp phía máy khách RecordRoute
, trong đó chúng ta gửi một luồng Points
đến máy chủ và nhận lại một RouteSummary
duy nhất. Đối với phương thức này, chúng ta cần sử dụng phần giữ chỗ không đồng bộ. Nếu bạn đã đọc phần Tạo máy chủ, thì một số nội dung trong phần này có thể rất quen thuộc với bạn – các RPC truyền trực tuyến không đồng bộ được triển khai theo cách tương tự ở cả hai phía.
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
Như bạn có thể thấy, để gọi phương thức này, chúng ta cần tạo một StreamObserver
. Phương thức này triển khai một giao diện đặc biệt để máy chủ gọi bằng phản hồi RouteSummary
. Trong StreamObserver
, chúng ta:
- Ghi đè phương thức
onNext()
để in thông tin được trả về khi máy chủ ghiRouteSummary
vào luồng thông báo. - Ghi đè phương thức
onCompleted()
(được gọi khi máy chủ đã hoàn tất lệnh gọi ở phía máy chủ) để giảmCountDownLatch
để chúng ta có thể kiểm tra xem máy chủ đã hoàn tất việc ghi hay chưa.
Sau đó, chúng ta sẽ truyền StreamObserver
đến phương thức recordRoute()
của phần giữ chỗ không đồng bộ và nhận lại trình quan sát yêu cầu StreamObserver
của riêng mình để ghi Points
nhằm gửi đến máy chủ. Sau khi viết xong các điểm, chúng ta sẽ sử dụng phương thức onCompleted()
của trình theo dõi yêu cầu để cho gRPC biết rằng chúng ta đã viết xong ở phía máy khách. Sau khi hoàn tất, chúng ta sẽ kiểm tra CountDownLatch
để xem máy chủ đã hoàn tất ở phía máy chủ hay chưa.
RPC truyền trực tuyến hai chiều
Cuối cùng, hãy xem RPC truyền phát trực tiếp hai chiều RouteChat()
.
public CountDownLatch routeChat() {
info("*** RouteChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@Override
public void onError(Throwable t) {
warning("RouteChat Failed: {0}", Status.fromThrowable(t));
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// return the latch while receiving happens asynchronously
return finishLatch;
}
Giống như ví dụ về truyền phát trực tiếp phía máy khách, chúng ta đều nhận và trả về một trình quan sát phản hồi StreamObserver
, ngoại trừ lần này chúng ta gửi các giá trị thông qua trình quan sát phản hồi của phương thức trong khi máy chủ vẫn đang ghi các thông báo vào luồng thông báo của chúng. Cú pháp để đọc và ghi ở đây hoàn toàn giống như cú pháp cho phương thức truyền phát trực tiếp của ứng dụng. Mặc dù mỗi bên sẽ luôn nhận được tin nhắn của bên kia theo thứ tự chúng được viết, nhưng cả máy khách và máy chủ đều có thể đọc và ghi theo bất kỳ thứ tự nào – các luồng hoạt động hoàn toàn độc lập.
7. Hãy dùng thử ngay!
- Từ thư mục
start_here
:
$ ./gradlew installDist
Thao tác này sẽ biên dịch mã của bạn, đóng gói mã đó trong một tệp jar và tạo các tập lệnh chạy ví dụ. Các tệp này sẽ được tạo trong thư mục build/install/start_here/bin/
. Các tập lệnh là: route-guide-server
và route-guide-client
.
Máy chủ cần phải đang chạy trước khi khởi động máy khách.
- Chạy máy chủ:
$ ./build/install/start_here/bin/route-guide-server
- Chạy ứng dụng khách:
$ ./build/install/start_here/bin/route-guide-client
8. Bước tiếp theo
- Tìm hiểu cách gRPC hoạt động trong phần Giới thiệu về gRPC và Các khái niệm cốt lõi
- Xem Hướng dẫn cơ bản
- Khám phá tài liệu tham khảo về API.