1. Giriş
Bu codelab'de, Java ile yazılmış bir rota eşleme uygulamasının temelini oluşturan bir istemci ve sunucu oluşturmak için gRPC-Java'yı kullanacaksınız.
Eğitimin sonunda, bir müşterinin rotasındaki özellikler hakkında bilgi almak, müşterinin rotasının özetini oluşturmak ve trafik güncellemeleri gibi rota bilgilerini sunucu ve diğer müşterilerle paylaşmak için gRPC kullanarak uzak bir sunucuya bağlanan bir istemciniz olacak.
Hizmet, istemci ve sunucu için standart kod oluşturmak üzere kullanılacak bir Protocol Buffers dosyasında tanımlanır. Böylece, bu işlevselliği uygularken zamandan ve emekten tasarruf edersiniz.
Oluşturulan bu kod, yalnızca sunucu ile istemci arasındaki iletişimin karmaşıklıklarını değil, aynı zamanda veri serileştirme ve seri durumdan çıkarma işlemlerini de ele alır.
Neler öğreneceksiniz?
- Hizmet API'sini tanımlamak için Protocol Buffers'ı kullanma.
- Otomatik kod oluşturma kullanarak bir Protokol Arabellek tanımından gRPC tabanlı istemci ve sunucu oluşturma.
- gRPC ile istemci-sunucu akışı iletişimi hakkında bilgi sahibi olmanız gerekir.
Bu codelab, gRPC'ye yeni başlayan veya gRPC ile ilgili bilgilerini tazelemek isteyen Java geliştiricilerin yanı sıra dağıtılmış sistemler oluşturmakla ilgilenen herkes için hazırlanmıştır. Daha önce gRPC deneyimi gerekmez.
2. Başlamadan önce
Ön koşullar
- JDK sürümü 24.
Kodu alın
Bu codelab, tamamen sıfırdan başlamanız gerekmemesi için uygulamanın kaynak kodunun tamamlayabileceğiniz bir iskeletini sağlar. Aşağıdaki adımlarda, protokol arabellek derleyici eklentilerini kullanarak standart gRPC kodu oluşturma da dahil olmak üzere uygulamanın nasıl tamamlanacağı gösterilmektedir.
Öncelikle codelab çalışma dizinini oluşturun ve bu dizine gidin:
mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started
Codelab'i indirip ayıklayın:
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
| tar xvz --strip-components=4 \
grpc-codelabs-1/codelabs/grpc-java-streaming/start_here
Alternatif olarak, yalnızca codelab dizinini içeren .zip dosyasını indirebilir ve manuel olarak sıkıştırmasını açabilirsiniz.
Bir uygulamayı yazmayı atlamak isterseniz tamamlanmış kaynak kodunu GitHub'da bulabilirsiniz.
3. İletileri ve hizmetleri tanımlama
İlk adımınız, Protocol Buffers'ı kullanarak uygulamanın gRPC hizmetini, RPC yöntemini, istek ve yanıt mesajı türlerini tanımlamaktır. Hizmetinizde sunulacaklar:
- Sunucunun uyguladığı ve istemcinin çağırdığı
ListFeatures
,RecordRoute
veRouteChat
adlı RPC yöntemleri. - Yukarıdaki yöntemler çağrıldığında istemci ile sunucu arasında değiştirilen veri yapıları olan
Point
,Feature
,Rectangle
,RouteNote
veRouteSummary
mesaj türleri.
Protocol Buffers genellikle protobuf olarak bilinir. gRPC terminolojisi hakkında daha fazla bilgi için gRPC'nin Temel kavramlar, mimari ve yaşam döngüsü başlıklı makalesine bakın.
Bu RPC yöntemi ve mesaj türleri, sağlanan kaynak kodun proto/routeguide/route_guide.proto
dosyasında tanımlanır.
route_guide.proto
dosyası oluşturalım.
Bu örnekte Java kodu oluşturduğumuz için .proto
içinde bir java_package
dosya seçeneği belirledik:
option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";
İleti türlerini tanımlama
Kaynak kodun proto/routeguide/route_guide.proto
dosyasında önce Point
mesaj türünü tanımlayın. Point
, haritadaki bir enlem-boylam koordinat çiftini temsil eder. Bu codelab'de koordinatlar için tam sayıları kullanın:
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
1
ve 2
sayıları, message
yapısındaki her alan için benzersiz kimlik numaralarıdır.
Ardından, Feature
mesaj türünü tanımlayın. Bir Feature
, Point
ile belirtilen bir konumdaki bir şeyin adı veya posta adresi için string
alanını kullanır:
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
Bir alan içindeki birden fazla noktanın istemciye aktarılabilmesi için, enlem-boylam dikdörtgenini temsil eden bir Rectangle
mesajına ihtiyacınız vardır. Bu mesaj, çapraz olarak zıt iki nokta lo
ve hi
olarak gösterilir:
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
Ayrıca, belirli bir noktada gönderilen mesajı temsil eden bir RouteNote
mesajı:
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
Son olarak, RouteSummary
mesajı göndermeniz gerekir. Bu mesaj, sonraki bölümde açıklanan bir RecordRoute
RPC'ye yanıt olarak alınır. Alınan bireysel puanların sayısı, algılanan özelliklerin sayısı ve her nokta arasındaki mesafenin kümülatif toplamı olarak kapsanan toplam mesafeyi içerir.
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
Hizmet yöntemlerini tanımlama
Bir hizmeti tanımlamak için .proto
dosyanızda adlandırılmış bir hizmet belirtirsiniz. route_guide.proto
dosyasında, uygulamanın hizmeti tarafından sağlanan bir veya daha fazla yöntemi tanımlayan RouteGuide
adlı bir service
yapısı bulunur.
Hizmet tanımınızda RPC
yöntemleri tanımladığınızda, bunların istek ve yanıt türlerini belirtirsiniz. Bu codelab bölümünde şunları tanımlayalım:
ListFeatures
Belirtilen Rectangle
içinde bulunan Feature
nesnelerini alır. Dikdörtgen geniş bir alanı kaplayıp çok sayıda özellik içerebileceğinden sonuçlar tek seferde döndürülmek yerine yayınlanır.
Bu uygulamada sunucu tarafı akış RPC'si kullanacaksınız: İstemci, sunucuya bir istek gönderir ve mesaj dizisini okumak için bir akış alır. İstemci, döndürülen akıştan mesaj kalmayana kadar okur. Örneğimizde de görebileceğiniz gibi, yanıt türünden önce akış anahtar kelimesini yerleştirerek sunucu tarafı akış yöntemini belirtirsiniz.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
Geçilen bir rotadaki nokta akışını kabul eder ve geçiş tamamlandığında RouteSummary
değerini döndürür.
Bu durumda istemci tarafı akış RPC uygundur: İstemci, bir mesaj dizisi yazar ve bunları yine sağlanan bir akışı kullanarak sunucuya gönderir. İstemci, mesajları yazmayı bitirdikten sonra sunucunun hepsini okumasını ve yanıtını döndürmesini bekler. İstemci tarafı yayın yöntemini, yayın anahtar kelimesini istek türünün önüne yerleştirerek belirtirsiniz.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
Bir rota geçilirken gönderilen RouteNotes
akışını kabul ederken diğer RouteNotes
'ları (ör. diğer kullanıcılardan gelen) alır.
Bu, iki yönlü akışın tam olarak kullanım alanıdır. Her iki tarafın da okuma/yazma akışı kullanarak bir ileti dizisi gönderdiği çift yönlü akış RPC'si. İki akış bağımsız olarak çalışır. Bu nedenle, istemciler ve sunucular istedikleri sırada okuma ve yazma işlemi yapabilir. Örneğin, sunucu yanıtlarını yazmadan önce tüm istemci mesajlarını almayı bekleyebilir veya alternatif olarak bir mesajı okuyup ardından bir mesaj yazabilir ya da okuma ve yazma işlemlerini başka bir şekilde birleştirebilir. Her akıştaki iletilerin sırası korunur. Bu tür bir yöntemi, hem istekten hem de yanıttan önce akış anahtar kelimesini yerleştirerek belirtirsiniz.
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. İstemci ve sunucu kodu oluşturma
Ardından, .proto
hizmet tanımımızdan gRPC istemci ve sunucu arayüzlerini oluşturmamız gerekir. Bu işlemi, özel bir gRPC Java eklentisiyle birlikte protokol arabelleği derleyicisini protoc
kullanarak yaparız. gRPC hizmetleri oluşturmak için proto3 derleyicisini (hem proto2 hem de proto3 söz dizimini destekler) kullanmanız gerekir.
Gradle veya Maven kullanılırken protoc derleme eklentisi, derlemenin bir parçası olarak gerekli kodu oluşturabilir. Kendi .proto
dosyalarınızdan nasıl kod oluşturacağınız hakkında bilgi edinmek için grpc-java README dosyasına bakabilirsiniz.
Gradle yapılandırması sağladık.
streaming-grpc-java-getting-started
dizininden şu komutu girin:
$ chmod +x gradlew $ ./gradlew generateProto
Aşağıdaki sınıflar, hizmet tanımımızdan (build/generated/sources/proto/main/java
altında) oluşturulur:
- Her mesaj türü için bir tane:
Feature.java
,Rectangle.java, ...
. Bunlar, istek ve yanıt mesajı türlerimizi doldurmak, serileştirmek ve almak için gereken tüm protokol arabellek kodunu içerir. RouteGuideGrpc.java
(diğer bazı yararlı kodlarla birlikte)RouteGuide
sunucularının uygulaması için bir temel sınıf,RouteGuideGrpc.RouteGuideImplBase
ve istemcilerin kullanması içinRouteGuide
hizmetinde ve saplama sınıflarında tanımlanan tüm yöntemleri içerir.
5. Hizmeti uygulama
Öncelikle RouteGuide
sunucuyu nasıl oluşturduğumuza bakalım. RouteGuide
hizmetimizin işini yapması için iki bölüm vardır:
- Hizmet tanımımızdan oluşturulan hizmet arayüzünü uygulama: Hizmetimizin asıl "işini" yapma.
- İstemcilerden gelen istekleri dinlemek ve bunları doğru hizmet uygulamasına göndermek için bir gRPC sunucusu çalıştırma.
RouteGuide'ı uygulama
Oluşturulan RouteGuideGrpc.RouteGuideImplBase sınıfını genişletecek bir RouteGuideService
sınıfı uygulayacağız. Uygulama bu şekilde görünür.
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
...
}
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
...
}
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
...
}
Her RPC uygulamasını ayrıntılı olarak inceleyelim.
Sunucu tarafında yayın RPC'si
Şimdi de akış RPC'lerimizden birine bakalım. ListFeatures
, sunucu tarafında yayınlanan bir RPC olduğundan istemcimize birden fazla Features
geri göndermemiz gerekir.
private final Collection<Feature> features;
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
Basit RPC'ye benzer şekilde, bu yöntem bir istek nesnesi (istemcimizin Features
bulmak istediği Rectangle
) ve bir StreamObserver
yanıt gözlemcisi alır.
Bu sefer, istemciye döndürmemiz gereken sayıda Feature
nesne alırız (bu durumda, bunları isteğimizin Rectangle
içinde olup olmadıklarına göre hizmetin özellik koleksiyonundan seçeriz) ve her birini sırayla onNext()
yöntemini kullanarak yanıt gözlemcisine yazarız. Son olarak, basit RPC'mizde olduğu gibi, yanıtları yazmayı bitirdiğimizi gRPC'ye bildirmek için yanıt gözlemcisinin onCompleted()
yöntemini kullanırız.
İstemci tarafı yayın RPC'si
Şimdi biraz daha karmaşık bir yönteme, istemci tarafı akış yöntemine RecordRoute()
bakalım. Bu yöntemde istemciden bir Points
akışı alıp yolculuklarıyla ilgili bilgileri içeren tek bir RouteSummary
döndürüyoruz.
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
Gördüğünüz gibi, yöntemimiz önceki yöntem türleri gibi bir StreamObserver
responseObserver
parametresi alıyor ancak bu kez istemcinin Points
yazması için bir StreamObserver
döndürüyor.
Yöntem gövdesinde, döndürülecek anonim bir StreamObserver
oluştururuz. Bu işlemde:
- İstemci, ileti akışına her
Point
yazdığında özellikleri ve diğer bilgileri almak içinonNext()
yöntemini geçersiz kılın. onCompleted()
yöntemini (istemci mesaj yazmayı bitirdiğinde çağrılır) geçersiz kılarakRouteSummary
öğemizi doldurup oluşturun. Ardından, yöntemimizin kendi yanıt gözlemcisininonNext()
yönteminiRouteSummary
ile çağırırız ve sunucu tarafındaki aramayı sonlandırmak içinonCompleted()
yöntemini çağırırız.
Çift yönlü akış RPC'si
Son olarak, çift yönlü akış RPC'mize RouteChat()
göz atalım.
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
İstemci tarafı akış örneğimizde olduğu gibi, bu kez de StreamObserver
alıp döndürüyoruz. Ancak bu kez, istemci kendi mesaj akışına mesaj yazmaya devam ederken değerleri yöntemimizin yanıt gözlemcisi aracılığıyla döndürüyoruz. Burada okuma ve yazma söz dizimi, istemci akışı ve sunucu akışı yöntemlerimizle tamamen aynıdır. Her iki taraf da diğer tarafın iletilerini her zaman yazıldıkları sırayla alsa da hem istemci hem de sunucu, iletileri herhangi bir sırada okuyup yazabilir. Akışlar tamamen bağımsız olarak çalışır.
Sunucuyu başlatın.
Tüm yöntemlerimizi uyguladıktan sonra, istemcilerin hizmetimizi kullanabilmesi için bir gRPC sunucusu da başlatmamız gerekir. Aşağıdaki snippet'te, RouteGuide
hizmetimiz için bunu nasıl yaptığımız gösterilmektedir:
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}
/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
}
Gördüğünüz gibi, sunucumuzu ServerBuilder
kullanarak oluşturup başlatıyoruz.
Bunu yapmak için:
- Oluşturucunun
forPort()
yöntemini kullanarak istemci isteklerini dinlemek için kullanmak istediğimiz adresi ve bağlantı noktasını belirtin. - Hizmet uygulama sınıfımızın
RouteGuideService
bir örneğini oluşturun ve bunu oluşturucununaddService()
yöntemine iletin. - Hizmetimiz için bir UPÇ sunucusu oluşturup başlatmak üzere oluşturucuda
build()
vestart()
işlevlerini çağırın.
ServerBuilder bağlantı noktasını zaten içerdiğinden, bağlantı noktasını yalnızca günlük kaydı için iletiyoruz.
6. İstemciyi oluşturma
Bu bölümde, RouteGuide
hizmetimiz için istemci oluşturma konusunu ele alacağız. Tam örnek istemci kodumuzu ../complete/src/main/java/io/grpc/complete/routeguide/
RouteGuideClient.java
adresinde görebilirsiniz.
Bir saplama oluşturma
Hizmet yöntemlerini çağırmak için önce bir taslak oluşturmamız gerekir. Daha doğrusu iki taslak oluşturmamız gerekir:
- Engelleme/eşzamanlı saplama: Bu, RPC çağrısının sunucunun yanıt vermesini beklediği ve yanıt döndüreceği veya istisna oluşturacağı anlamına gelir.
- Yanıtın eşzamansız olarak döndürüldüğü, sunucuya engellemeyen çağrılar yapan engellemeyen/eşzamansız bir saplama. Belirli türlerdeki akış aramalarını yalnızca eşzamansız bir saplama kullanarak yapabilirsiniz.
Öncelikle, bağlanmak istediğimiz sunucu adresini ve bağlantı noktasını belirterek taslağımız için bir gRPC kanalı oluşturmamız gerekir:
public static void main(String[] args) throws InterruptedException {
String target = "localhost:8980";
if (args.length > 0) {
if ("--help".equals(args[0])) {
System.err.println("Usage: [target]");
System.err.println("");
System.err.println(" target The server to connect to. Defaults to " + target);
System.exit(1);
}
target = args[0];
}
List<Feature> features;
try {
features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
} catch (IOException ex) {
ex.printStackTrace();
return;
}
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
.build();
try {
RouteGuideClient client = new RouteGuideClient(channel);
// Looking for features between 40, -75 and 42, -73.
client.listFeatures(400000000, -750000000, 420000000, -730000000);
// Record a few randomly selected points from the features file.
client.recordRoute(features, 10);
// Send and receive some notes.
CountDownLatch finishLatch = client.routeChat();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
client.warning("routeChat did not finish within 1 minutes");
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
Kanalı oluşturmak için ManagedChannelBuilder
kullanırız.
Artık .proto
'ümüzden oluşturduğumuz RouteGuideGrpc
sınıfında sağlanan newStub
ve newBlockingStub
yöntemlerini kullanarak kanalımızı oluşturabiliriz.
public RouteGuideClient(Channel channel) {
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
Engellemiyorsa asenkron olduğunu unutmayın.
Hizmet yöntemlerini çağırma
Şimdi de hizmet yöntemlerimizi nasıl çağırdığımıza bakalım. Engelleme sapından oluşturulan tüm RPC'lerin engelleme/eşzamanlı modda çalışacağını unutmayın. Bu, RPC çağrısının sunucunun yanıt vermesini bekleyeceği ve yanıt ya da hata döndüreceği anlamına gelir.
Sunucu tarafında yayın RPC'si
Şimdi de ListFeatures
için sunucu tarafı bir akış çağrısına bakalım. Bu çağrı, coğrafi Feature
akışı döndürür:
Rectangle request = Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
Gördüğünüz gibi, Getting_Started_With_gRPC_Java codelab'inde incelediğimiz basit tekli RPC'ye çok benziyor. Tek farkı, tek bir Feature
döndürmek yerine, istemcinin döndürülen tüm Features
'leri okumak için kullanabileceği bir Iterator
döndürmesi.
İstemci tarafı yayın RPC'si
Şimdi biraz daha karmaşık bir konuya geçelim: istemci tarafı yayın yöntemi RecordRoute
. Bu yöntemde, sunucuya bir Points
akışı göndeririz ve tek bir RouteSummary
alırız. Bu yöntem için eşzamansız saplamayı kullanmamız gerekir. Sunucuyu oluşturma başlıklı makaleyi daha önce okuduysanız bu bölümdeki bazı bilgiler size tanıdık gelebilir. Asenkron akış RPC'leri her iki tarafta da benzer şekilde uygulanır.
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
Gördüğünüz gibi, bu yöntemi çağırmak için sunucunun RouteSummary
yanıtıyla çağırması için özel bir arayüz uygulayan bir StreamObserver
oluşturmamız gerekiyor. StreamObserver
bölgemizde:
- Sunucu, ileti akışına
RouteSummary
yazdığında döndürülen bilgileri yazdırmak içinonNext()
yöntemini geçersiz kılın. - Sunucu kendi tarafında aramayı tamamladığında çağrılan
onCompleted()
yöntemini geçersiz kılarakCountDownLatch
değerini azaltın. Böylece sunucunun yazma işlemini bitirip bitirmediğini kontrol edebiliriz.
Ardından, StreamObserver
öğesini eşzamansız saplamanın recordRoute()
yöntemine iletiriz ve sunucuya göndermek üzere Points
yazmak için kendi StreamObserver
istek gözlemcimizi geri alırız. Puan yazmayı bitirdikten sonra, istemci tarafında yazmayı bitirdiğimizi gRPC'ye bildirmek için istek gözlemcisinin onCompleted()
yöntemini kullanırız. İşlem tamamlandığında, sunucunun kendi tarafında tamamlanıp tamamlanmadığını görmek için CountDownLatch
kontrol ederiz.
Çift yönlü akış RPC'si
Son olarak, çift yönlü akış RPC'mize RouteChat()
göz atalım.
public CountDownLatch routeChat() {
info("*** RouteChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@Override
public void onError(Throwable t) {
warning("RouteChat Failed: {0}", Status.fromThrowable(t));
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// return the latch while receiving happens asynchronously
return finishLatch;
}
İstemci tarafı akış örneğimizde olduğu gibi, bu kez de StreamObserver
yanıt gözlemcisi alıp döndürüyoruz. Ancak bu kez, sunucu kendi ileti akışına ileti yazmaya devam ederken değerleri yöntemimizin yanıt gözlemcisi aracılığıyla gönderiyoruz. Burada okuma ve yazma söz dizimi, istemci akışı yöntemimizle tamamen aynıdır. Her iki taraf da diğer tarafın iletilerini her zaman yazıldıkları sırayla alsa da hem istemci hem de sunucu, iletileri herhangi bir sırada okuyup yazabilir. Akışlar tamamen bağımsız olarak çalışır.
7. Yenilikleri inceleyin.
start_here
dizininden:
$ ./gradlew installDist
Bu işlem, kodunuzu derler, jar dosyası olarak paketler ve örneği çalıştıran komut dosyalarını oluşturur. Bu dosyalar build/install/start_here/bin/
dizininde oluşturulur. Komut dosyaları: route-guide-server
ve route-guide-client
.
İstemci başlatılmadan önce sunucunun çalışıyor olması gerekir.
- Sunucuyu çalıştırın:
$ ./build/install/start_here/bin/route-guide-server
- İstemciyi çalıştırın:
$ ./build/install/start_here/bin/route-guide-client
8. Sırada ne var?
- gRPC'nin işleyiş şeklini Introduction to gRPC (gRPC'ye Giriş) ve Core concepts (Temel kavramlar) başlıklı makalelerden öğrenebilirsiniz.
- Temel bilgiler eğitimini inceleyin.
- API referansını inceleyin.