1. 소개
이 Codelab에서는 gRPC-Java를 사용하여 Java로 작성된 경로 매핑 애플리케이션의 기반을 형성하는 클라이언트와 서버를 만듭니다.
튜토리얼이 끝나면 gRPC를 사용하여 원격 서버에 연결하여 클라이언트 경로의 기능에 관한 정보를 가져오고, 클라이언트 경로의 요약을 만들고, 서버 및 다른 클라이언트와 트래픽 업데이트와 같은 경로 정보를 교환하는 클라이언트가 있습니다.
서비스는 프로토콜 버퍼 파일에 정의되며, 이 파일은 클라이언트와 서버가 서로 통신할 수 있도록 상용구 코드를 생성하는 데 사용되므로 이 기능을 구현하는 데 드는 시간과 노력을 절약할 수 있습니다.
이 생성된 코드는 서버와 클라이언트 간의 복잡한 통신뿐만 아니라 데이터 직렬화 및 역직렬화도 처리합니다.
학습할 내용
- 프로토콜 버퍼를 사용하여 서비스 API를 정의하는 방법
- 자동 코드 생성을 사용하여 프로토콜 버퍼 정의에서 gRPC 기반 클라이언트와 서버를 빌드하는 방법
- gRPC를 사용한 클라이언트-서버 스트리밍 통신에 대한 이해
이 Codelab은 gRPC를 처음 접하거나 gRPC를 다시 배우려는 Java 개발자 또는 분산 시스템을 빌드하는 데 관심이 있는 모든 사용자를 대상으로 합니다. 이전 gRPC 경험은 필요하지 않습니다.
2. 시작하기 전에
기본 요건
- JDK 버전 24
코드 가져오기
처음부터 완전히 시작하지 않아도 되도록 이 Codelab에서는 애플리케이션 소스 코드의 스캐폴드를 제공하여 이를 완성할 수 있습니다. 다음 단계에서는 프로토콜 버퍼 컴파일러 플러그인을 사용하여 상용구 gRPC 코드를 생성하는 등 애플리케이션을 완료하는 방법을 보여줍니다.
먼저 Codelab 작업 디렉터리를 만들고 해당 디렉터리로 이동합니다.
mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started
Codelab을 다운로드하고 압축을 풉니다.
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
| tar xvz --strip-components=4 \
grpc-codelabs-1/codelabs/grpc-java-streaming/start_here
또는 codelab 디렉터리만 포함된 .zip 파일을 다운로드하고 직접 압축을 해제할 수 있습니다.
구현을 입력하는 것을 건너뛰려면 완료된 소스 코드를 GitHub에서 확인하세요.
3. 메시지 및 서비스 정의
첫 번째 단계는 프로토콜 버퍼를 사용하여 애플리케이션의 gRPC 서비스, RPC 메서드, 요청 및 응답 메시지 유형을 정의하는 것입니다. 서비스에서 제공하는 기능:
- 서버가 구현하고 클라이언트가 호출하는 RPC 메서드
ListFeatures
,RecordRoute
,RouteChat
- 위의 메서드를 호출할 때 클라이언트와 서버 간에 교환되는 데이터 구조인 메시지 유형
Point
,Feature
,Rectangle
,RouteNote
,RouteSummary
프로토콜 버퍼는 일반적으로 protobufs로 알려져 있습니다. gRPC 용어에 대한 자세한 내용은 gRPC의 핵심 개념, 아키텍처, 수명 주기를 참고하세요.
이 RPC 메서드와 메시지 유형은 모두 제공된 소스 코드의 proto/routeguide/route_guide.proto
파일에 정의됩니다.
route_guide.proto
파일을 만들어 보겠습니다.
이 예에서는 Java 코드를 생성하므로 .proto
에서 java_package
파일 옵션을 지정했습니다.
option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";
메시지 유형 정의
소스 코드의 proto/routeguide/route_guide.proto
파일에서 먼저 Point
메시지 유형을 정의합니다. Point
는 지도상의 위도-경도 좌표 쌍을 나타냅니다. 이 Codelab에서는 좌표에 정수를 사용합니다.
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
1
및 2
은 message
구조의 각 필드에 대한 고유 ID 번호입니다.
다음으로 Feature
메시지 유형을 정의합니다. Feature
는 Point
로 지정된 위치에 있는 항목의 이름이나 우편 주소에 string
필드를 사용합니다.
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
영역 내 여러 지점을 클라이언트로 스트리밍하려면 대각선으로 반대되는 두 점 lo
및 hi
로 표시되는 위도-경도 직사각형을 나타내는 Rectangle
메시지가 필요합니다.
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
또한 특정 시점에 전송된 메시지를 나타내는 RouteNote
메시지:
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
마지막으로 RouteSummary
메시지가 필요합니다. 이 메시지는 다음 섹션에서 설명하는 RecordRoute
RPC에 대한 응답으로 수신됩니다. 여기에는 수신된 개별 포인트 수, 감지된 특징 수, 각 포인트 간 거리의 누적 합계로 계산된 총 이동 거리가 포함됩니다.
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
서비스 메서드 정의
서비스를 정의하려면 .proto
파일에서 명명된 서비스를 지정합니다. route_guide.proto
파일에는 애플리케이션 서비스에서 제공하는 하나 이상의 메서드를 정의하는 RouteGuide
라는 service
구조가 있습니다.
서비스 정의 내에서 RPC
메서드를 정의할 때 요청 및 응답 유형을 지정합니다. Codelab의 이 섹션에서는 다음을 정의합니다.
ListFeatures
지정된 Rectangle
내에서 사용할 수 있는 Feature
객체를 가져옵니다. 결과는 한 번에 반환되지 않고 스트리밍됩니다. 사각형이 넓은 영역을 포함하고 많은 수의 기능을 포함할 수 있기 때문입니다.
이 애플리케이션에서는 서버 측 스트리밍 RPC를 사용합니다. 클라이언트는 서버에 요청을 보내고 일련의 메시지를 다시 읽는 스트림을 가져옵니다. 클라이언트는 메시지가 더 이상 없을 때까지 반환된 스트림을 읽습니다. 예에서 볼 수 있듯이 응답 유형 앞에 stream 키워드를 배치하여 서버 측 스트리밍 메서드를 지정합니다.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
이동 중인 경로의 포인트 스트림을 허용하고 이동이 완료되면 RouteSummary
를 반환합니다.
이 경우 클라이언트 측 스트리밍 RPC가 적합합니다. 클라이언트는 일련의 메시지를 작성하고 제공된 스트림을 사용하여 서버에 보냅니다. 클라이언트는 메시지 작성을 완료한 후 서버가 메시지를 모두 읽고 응답을 반환할 때까지 대기합니다. 요청 유형 앞에 스트림 키워드를 배치하여 클라이언트 측 스트리밍 메서드를 지정합니다.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
경로를 이동하는 동안 전송된 RouteNotes
스트림을 수락하면서 다른 RouteNotes
(예: 다른 사용자로부터)를 수신합니다.
이것이 바로 양방향 스트리밍의 사용 사례입니다. 양쪽에서 읽기-쓰기 스트림을 사용하여 일련의 메시지를 보내는 양방향 스트리밍 RPC입니다. 두 스트림은 독립적으로 작동하므로 클라이언트와 서버는 원하는 순서로 읽고 쓸 수 있습니다. 예를 들어 서버는 클라이언트 메시지를 모두 수신할 때까지 기다린 후 응답을 작성할 수도 있고, 메시지를 읽은 후 메시지를 작성하거나 읽기와 쓰기의 다른 조합을 사용할 수도 있습니다. 각 스트림의 메시지 순서는 유지됩니다. 요청과 응답 앞에 모두 스트림 키워드를 배치하여 이 유형의 메서드를 지정합니다.
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. 클라이언트 및 서버 코드 생성
다음으로 .proto
서비스 정의에서 gRPC 클라이언트 및 서버 인터페이스를 생성해야 합니다. 이를 위해 특수 gRPC Java 플러그인과 함께 프로토콜 버퍼 컴파일러 protoc
를 사용합니다. gRPC 서비스를 생성하려면 proto3 컴파일러 (proto2 및 proto3 구문 모두 지원)를 사용해야 합니다.
Gradle 또는 Maven을 사용하는 경우 protoc 빌드 플러그인이 빌드의 일부로 필요한 코드를 생성할 수 있습니다. 자체 .proto
파일에서 코드를 생성하는 방법은 grpc-java README를 참고하세요.
Gradle 구성이 제공되었습니다.
streaming-grpc-java-getting-started
디렉터리에서 다음을 입력합니다.
$ chmod +x gradlew $ ./gradlew generateProto
다음 클래스는 서비스 정의 (build/generated/sources/proto/main/java
아래)에서 생성됩니다.
- 각 메시지 유형에 하나씩 있습니다.
Feature.java
,Rectangle.java, ...
에는 요청 및 응답 메시지 유형을 채우고, 직렬화하고, 검색하는 모든 프로토콜 버퍼 코드가 포함되어 있습니다. RouteGuideGrpc.java
:RouteGuide
서버가 구현할 기본 클래스인RouteGuideGrpc.RouteGuideImplBase
와 클라이언트가 사용할RouteGuide
서비스 및 스텁 클래스에 정의된 모든 메서드가 포함되어 있습니다 (유용한 다른 코드와 함께).
5. 서비스 구현
먼저 RouteGuide
서버를 만드는 방법을 살펴보겠습니다. RouteGuide
서비스가 작업을 수행하도록 하는 방법에는 두 가지가 있습니다.
- 서비스 정의에서 생성된 서비스 인터페이스 구현: 서비스의 실제 '작업' 실행
- 클라이언트의 요청을 수신하고 올바른 서비스 구현으로 디스패치하는 gRPC 서버를 실행합니다.
RouteGuide 구현
생성된 RouteGuideGrpc.RouteGuideImplBase 클래스를 확장하는 RouteGuideService
클래스를 구현합니다. 구현은 다음과 같습니다.
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
...
}
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
...
}
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
...
}
각 RPC 구현을 자세히 살펴보겠습니다.
서버 측 스트리밍 RPC
이제 스트리밍 RPC를 살펴보겠습니다. ListFeatures
은 서버 측 스트리밍 RPC이므로 클라이언트에 여러 Features
을 다시 보내야 합니다.
private final Collection<Feature> features;
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
간단한 RPC와 마찬가지로 이 메서드는 요청 객체 (클라이언트가 Features
을 찾으려는 Rectangle
)와 StreamObserver
응답 관찰자를 가져옵니다.
이번에는 클라이언트에 반환해야 하는 Feature
객체를 필요한 만큼 가져오고 (이 경우 요청 Rectangle
내부에 있는지에 따라 서비스의 기능 모음에서 선택) 각 객체를 onNext()
메서드를 사용하여 응답 관찰자에 차례로 작성합니다. 마지막으로 간단한 RPC에서와 마찬가지로 응답 관찰자의 onCompleted()
메서드를 사용하여 응답 작성을 완료했음을 gRPC에 알립니다.
클라이언트 측 스트리밍 RPC
이제 좀 더 복잡한 클라이언트 측 스트리밍 메서드 RecordRoute()
를 살펴보겠습니다. 여기서는 클라이언트에서 Points
스트림을 가져와 여행에 관한 정보가 포함된 단일 RouteSummary
를 반환합니다.
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
보시다시피 이전 메서드 유형과 마찬가지로 메서드는 StreamObserver
responseObserver
매개변수를 가져오지만 이번에는 클라이언트가 Points
을 작성할 수 있도록 StreamObserver
을 반환합니다.
메서드 본문에서 반환할 익명 StreamObserver
를 인스턴스화합니다.
- 클라이언트가 메시지 스트림에
Point
을 쓸 때마다 기능과 기타 정보를 가져오도록onNext()
메서드를 재정의합니다. - 클라이언트가 메시지 작성을 완료할 때 호출되는
onCompleted()
메서드를 재정의하여RouteSummary
를 채우고 빌드합니다. 그런 다음 메서드의 자체 응답 관찰자의onNext()
를RouteSummary
로 호출하고onCompleted()
메서드를 호출하여 서버 측에서 호출을 완료합니다.
양방향 스트리밍 RPC
마지막으로 양방향 스트리밍 RPC RouteChat()
를 살펴보겠습니다.
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
클라이언트 측 스트리밍 예와 마찬가지로 StreamObserver
를 가져오고 반환합니다. 단, 이번에는 클라이언트가 자체 메시지 스트림에 메시지를 쓰는 동안 메서드의 응답 관찰자를 통해 값을 반환합니다. 여기서 읽기 및 쓰기 구문은 클라이언트 스트리밍 및 서버 스트리밍 메서드와 정확히 동일합니다. 각 측면은 항상 작성된 순서대로 다른 측면의 메시지를 받지만 클라이언트와 서버는 어떤 순서로든 읽고 쓸 수 있습니다. 스트림은 완전히 독립적으로 작동합니다.
서버를 시작합니다.
모든 메서드를 구현한 후에는 클라이언트가 실제로 서비스를 사용할 수 있도록 gRPC 서버도 시작해야 합니다. 다음 스니펫은 RouteGuide
서비스에서 이 작업을 수행하는 방법을 보여줍니다.
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}
/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
}
보시다시피 ServerBuilder
를 사용하여 서버를 빌드하고 시작합니다.
이를 위해 Google에서는 다음 작업을 실행합니다.
- 빌더의
forPort()
메서드를 사용하여 클라이언트 요청을 수신하는 데 사용할 주소와 포트를 지정합니다. - 서비스 구현 클래스
RouteGuideService
의 인스턴스를 만들고 빌더의addService()
메서드에 전달합니다. - 빌더에서
build()
및start()
를 호출하여 서비스의 RPC 서버를 만들고 시작합니다.
ServerBuilder에 이미 포트가 통합되어 있으므로 포트를 전달하는 유일한 이유는 로깅에 사용하기 위해서입니다.
6. 클라이언트 만들기
이 섹션에서는 RouteGuide
서비스의 클라이언트를 만드는 방법을 살펴봅니다. ../complete/src/main/java/io/grpc/complete/routeguide/
RouteGuideClient.java
에서 전체 예시 클라이언트 코드를 확인할 수 있습니다.
스텁 인스턴스화
서비스 메서드를 호출하려면 먼저 스텁, 즉 두 개의 스텁을 만들어야 합니다.
- 차단/동기 스텁: RPC 호출이 서버의 응답을 기다리며 응답을 반환하거나 예외를 발생시킵니다.
- 서버에 비차단 호출을 실행하는 비차단/비동기 스텁. 응답은 비동기식으로 반환됩니다. 비동기 스텁을 사용해야만 특정 유형의 스트리밍 호출을 할 수 있습니다.
먼저 연결할 서버 주소와 포트를 지정하여 스텁의 gRPC 채널을 만들어야 합니다.
public static void main(String[] args) throws InterruptedException {
String target = "localhost:8980";
if (args.length > 0) {
if ("--help".equals(args[0])) {
System.err.println("Usage: [target]");
System.err.println("");
System.err.println(" target The server to connect to. Defaults to " + target);
System.exit(1);
}
target = args[0];
}
List<Feature> features;
try {
features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
} catch (IOException ex) {
ex.printStackTrace();
return;
}
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
.build();
try {
RouteGuideClient client = new RouteGuideClient(channel);
// Looking for features between 40, -75 and 42, -73.
client.listFeatures(400000000, -750000000, 420000000, -730000000);
// Record a few randomly selected points from the features file.
client.recordRoute(features, 10);
// Send and receive some notes.
CountDownLatch finishLatch = client.routeChat();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
client.warning("routeChat did not finish within 1 minutes");
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
ManagedChannelBuilder
를 사용하여 채널을 만듭니다.
이제 채널을 사용하여 .proto
에서 생성한 RouteGuideGrpc
클래스에 제공된 newStub
및 newBlockingStub
메서드를 사용하여 스텁을 만들 수 있습니다.
public RouteGuideClient(Channel channel) {
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
차단되지 않으면 비동기입니다.
서비스 메서드 호출
이제 서비스 메서드를 호출하는 방법을 살펴보겠습니다. 차단 스텁에서 생성된 RPC는 차단/동기 모드로 작동합니다. 즉, RPC 호출이 서버의 응답을 기다리며 응답 또는 오류를 반환합니다.
서버 측 스트리밍 RPC
다음으로 지리적 Feature
스트림을 반환하는 ListFeatures
에 대한 서버 측 스트리밍 호출을 살펴보겠습니다.
Rectangle request = Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
보시다시피 Getting_Started_With_gRPC_Java Codelab에서 살펴본 간단한 단항 RPC와 매우 유사합니다. 단, 단일 Feature
를 반환하는 대신 클라이언트가 반환된 모든 Features
를 읽는 데 사용할 수 있는 Iterator
를 반환합니다.
클라이언트 측 스트리밍 RPC
이제 좀 더 복잡한 클라이언트 측 스트리밍 메서드 RecordRoute
를 살펴보겠습니다. 여기서는 Points
스트림을 서버에 전송하고 단일 RouteSummary
를 다시 가져옵니다. 이 메서드에는 비동기 스텁을 사용해야 합니다. 서버 만들기를 이미 읽었다면 이 내용이 매우 익숙할 수 있습니다. 비동기 스트리밍 RPC는 양쪽에서 비슷한 방식으로 구현됩니다.
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
보시다시피 이 메서드를 호출하려면 서버가 RouteSummary
응답으로 호출할 수 있는 특수 인터페이스를 구현하는 StreamObserver
를 만들어야 합니다. StreamObserver
에서 다음 작업을 실행합니다.
- 서버가 메시지 스트림에
RouteSummary
을 쓸 때 반환된 정보를 출력하도록onNext()
메서드를 재정의합니다. onCompleted()
메서드 (서버가 자체적으로 호출을 완료할 때 호출됨)를 재정의하여CountDownLatch
를 줄여 서버가 쓰기를 완료했는지 확인할 수 있습니다.
그런 다음 StreamObserver
를 비동기 스텁의 recordRoute()
메서드에 전달하고 자체 StreamObserver
요청 관찰자를 다시 가져와 서버로 전송할 Points
를 작성합니다. 포인트 작성을 완료하면 요청 관찰자의 onCompleted()
메서드를 사용하여 클라이언트 측에서 작성을 완료했다고 gRPC에 알립니다. 완료되면 CountDownLatch
를 확인하여 서버 측에서 완료되었는지 확인합니다.
양방향 스트리밍 RPC
마지막으로 양방향 스트리밍 RPC RouteChat()
를 살펴보겠습니다.
public CountDownLatch routeChat() {
info("*** RouteChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@Override
public void onError(Throwable t) {
warning("RouteChat Failed: {0}", Status.fromThrowable(t));
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// return the latch while receiving happens asynchronously
return finishLatch;
}
클라이언트 측 스트리밍 예와 마찬가지로 StreamObserver
응답 관찰자를 가져오고 반환합니다. 단, 이번에는 서버가 자체 메시지 스트림에 메시지를 쓰는 동안 메서드의 응답 관찰자를 통해 값을 전송합니다. 여기서 읽기 및 쓰기 구문은 클라이언트 스트리밍 메서드와 정확히 동일합니다. 각 측면은 항상 작성된 순서대로 다른 측면의 메시지를 받지만 클라이언트와 서버는 어떤 순서로든 읽고 쓸 수 있습니다. 스트림은 완전히 독립적으로 작동합니다.
7. 직접 사용해 보세요.
start_here
디렉터리에서 다음을 실행합니다.
$ ./gradlew installDist
이렇게 하면 코드가 컴파일되고, jar로 패키징되며, 예시를 실행하는 스크립트가 생성됩니다. build/install/start_here/bin/
디렉터리에 생성됩니다. 스크립트는 route-guide-server
및 route-guide-client
입니다.
클라이언트를 시작하기 전에 서버가 실행 중이어야 합니다.
- 서버를 실행합니다.
$ ./build/install/start_here/bin/route-guide-server
- 클라이언트를 실행합니다.
$ ./build/install/start_here/bin/route-guide-client