1. مقدمه
در این کد لبه، شما از gRPC-Java برای ایجاد یک سرویس گیرنده و سرور استفاده خواهید کرد که پایه و اساس برنامه نقشه برداری مسیر نوشته شده در جاوا را تشکیل می دهد.
در پایان آموزش، شما یک کلاینت خواهید داشت که با استفاده از gRPC به یک سرور راه دور متصل می شود تا اطلاعاتی در مورد ویژگی های مسیر کلاینت به دست آورد، خلاصه ای از مسیر مشتری ایجاد کند و اطلاعات مسیر مانند به روز رسانی ترافیک را با سرور و سایر کلاینت ها مبادله کند.
این سرویس در یک فایل Protocol Buffers تعریف شده است که از آن برای تولید کد boilerplate برای کلاینت و سرور استفاده می شود تا بتوانند با یکدیگر ارتباط برقرار کنند و در زمان و تلاش شما در اجرای آن عملکرد صرفه جویی شود.
این کد تولید شده نه تنها از پیچیدگی های ارتباط بین سرور و کلاینت، بلکه سریال سازی و سریال سازی داده ها نیز مراقبت می کند.
چیزی که یاد خواهید گرفت
- نحوه استفاده از بافرهای پروتکل برای تعریف API سرویس.
- نحوه ساخت یک کلاینت و سرور مبتنی بر gRPC از تعریف بافرهای پروتکل با استفاده از تولید کد خودکار.
- درک ارتباط جریان مشتری-سرور با gRPC.
این کد لبه برای توسعه دهندگان جاوا که تازه به gRPC می پردازند یا به دنبال تازه سازی gRPC هستند یا هرکس دیگری که علاقه مند به ساختن سیستم های توزیع شده است، طراحی شده است. هیچ تجربه قبلی gRPC مورد نیاز نیست.
2. قبل از شروع
پیش نیازها
- JDK نسخه 24.
کد را دریافت کنید
برای اینکه مجبور نباشید به طور کامل از ابتدا شروع کنید، این کد لبه داربستی از کد منبع برنامه را برای شما فراهم می کند تا آن را تکمیل کنید. مراحل زیر به شما نشان می دهد که چگونه برنامه را تکمیل کنید، از جمله استفاده از افزونه های کامپایلر بافر پروتکل برای تولید کد gRPC دیگ بخار.
ابتدا پوشه کاری codelab و cd را در آن ایجاد کنید:
mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started
کد لبه را دانلود و استخراج کنید:
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
| tar xvz --strip-components=4 \
grpc-codelabs-1/codelabs/grpc-java-streaming/start_here
یا می توانید فایل .zip را که فقط شامل دایرکتوری Codelab است دانلود کرده و به صورت دستی آن را از حالت فشرده خارج کنید.
اگر می خواهید از تایپ کردن در یک پیاده سازی صرفنظر کنید، کد منبع تکمیل شده در GitHub موجود است.
3. پیام ها و خدمات را تعریف کنید
اولین قدم شما این است که سرویس gRPC برنامه، روش RPC آن، و انواع پیام درخواست و پاسخ آن را با استفاده از Protocol Buffers تعریف کنید. خدمات شما ارائه خواهد کرد:
- متدهای RPC به نام
ListFeatures
،RecordRoute
وRouteChat
که سرور پیاده سازی می کند و کلاینت فراخوانی می کند. - انواع پیام
Point
،Feature
،Rectangle
،RouteNote
وRouteSummary
که ساختارهای داده ای هستند که هنگام فراخوانی روش های بالا بین مشتری و سرور رد و بدل می شوند.
بافرهای پروتکل معمولاً به عنوان پروتوباف شناخته می شوند. برای اطلاعات بیشتر در مورد اصطلاحات gRPC، به مفاهیم اصلی، معماری و چرخه حیات gRPC مراجعه کنید.
این روش RPC و انواع پیام های آن در فایل proto/routeguide/route_guide.proto
کد منبع ارائه شده تعریف می شوند.
بیایید یک فایل route_guide.proto
ایجاد کنیم.
همانطور که در این مثال در حال تولید کد جاوا هستیم، یک گزینه فایل java_package
در .proto
خود مشخص کرده ایم:
option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";
تعریف انواع پیام
در فایل proto/routeguide/route_guide.proto
کد منبع ابتدا نوع پیام Point
را تعریف کنید. یک Point
نشان دهنده یک جفت مختصات طول و عرض جغرافیایی بر روی نقشه است. برای این کد، از اعداد صحیح برای مختصات استفاده کنید:
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
اعداد 1
و 2
شماره های شناسه منحصر به فرد برای هر یک از فیلدهای ساختار message
هستند.
سپس نوع پیام Feature
را تعریف کنید. یک Feature
از یک فیلد string
برای نام یا آدرس پستی چیزی در مکانی مشخص شده توسط یک Point
استفاده می کند:
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
برای اینکه چندین نقطه در یک منطقه را بتوان برای یک مشتری پخش کرد، به یک پیام Rectangle
نیاز دارید که نشان دهنده یک مستطیل طول و عرض جغرافیایی است که به صورت دو نقطه مورب مخالف lo
و hi
نمایش داده می شود:
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
همچنین، یک پیام RouteNote
که نشان دهنده پیامی است که در یک نقطه مشخص ارسال شده است:
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
در نهایت، به یک پیام RouteSummary
نیاز دارید. این پیام در پاسخ به یک RecordRoute
RPC دریافت می شود که در قسمت بعدی توضیح داده شده است. این شامل تعداد نقاط دریافتی جداگانه، تعداد ویژگی های شناسایی شده، و کل مسافت طی شده به عنوان مجموع تجمعی فاصله بین هر نقطه است.
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
روش های خدمات را تعریف کنید
برای تعریف یک سرویس، یک سرویس با نام را در فایل .proto
خود مشخص می کنید. فایل route_guide.proto
دارای یک ساختار service
به نام RouteGuide
است که یک یا چند روش ارائه شده توسط سرویس برنامه را تعریف می کند.
هنگامی که روش های RPC
را در تعریف سرویس خود تعریف می کنید، نوع درخواست و پاسخ آنها را مشخص می کنید. در این بخش از Codelab، اجازه دهید تعریف کنیم:
لیست ویژگی ها
اشیاء Feature
موجود در Rectangle
داده شده را به دست می آورد. نتایج بهجای بازگردانی یکباره پخش میشوند، زیرا مستطیل ممکن است منطقه بزرگی را پوشش دهد و دارای تعداد زیادی ویژگی باشد.
برای این برنامه، شما از یک RPC استریم سمت سرور استفاده خواهید کرد: کلاینت درخواستی را به سرور ارسال می کند و جریانی برای خواندن دنباله ای از پیام ها دریافت می کند. مشتری از جریان برگشتی می خواند تا زمانی که پیام دیگری وجود نداشته باشد. همانطور که در مثال ما می بینید، با قرار دادن کلمه کلیدی جریان قبل از نوع پاسخ، یک روش پخش سمت سرور را مشخص می کنید.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
جریانی از نقاط را در مسیری که در حال پیمایش است می پذیرد، پس از تکمیل پیمایش، یک RouteSummary
برمی گرداند.
یک RPC استریم سمت کلاینت در این مورد مناسب است: مشتری دنباله ای از پیام ها را می نویسد و آنها را دوباره با استفاده از جریان ارائه شده به سرور ارسال می کند. هنگامی که مشتری نوشتن پیام ها را به پایان رساند، منتظر می ماند تا سرور همه آنها را بخواند و پاسخ خود را برگرداند. شما با قرار دادن کلمه کلیدی جریان قبل از نوع درخواست، یک روش پخش سمت مشتری را مشخص می کنید.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
روت چت
جریانی از RouteNotes
را میپذیرد که هنگام عبور از یک مسیر، در حالی که RouteNotes
دیگر (مثلاً از سایر کاربران) دریافت میکند.
این دقیقاً همان موردی است که برای پخش جریانی دو طرفه استفاده می شود. یک جریان RPC دو طرفه که در آن هر دو طرف دنباله ای از پیام ها را با استفاده از جریان خواندن و نوشتن ارسال می کنند. این دو جریان به طور مستقل عمل میکنند، بنابراین کلاینتها و سرورها میتوانند به هر ترتیبی که دوست دارند بخوانند و بنویسند: به عنوان مثال، سرور میتواند قبل از نوشتن پاسخهایش منتظر دریافت همه پیامهای مشتری باشد، یا میتواند متناوب یک پیام را بخواند و سپس یک پیام بنویسد، یا ترکیب دیگری از خواندن و نوشتن. ترتیب پیام ها در هر جریان حفظ می شود. شما این نوع روش را با قرار دادن کلمه کلیدی جریان قبل از درخواست و پاسخ مشخص می کنید.
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. کد مشتری و سرور تولید کنید
در مرحله بعد باید رابط های سرویس گیرنده و سرور gRPC را از تعریف سرویس .proto
خود تولید کنیم. ما این کار را با استفاده از پروتکل کامپایلر بافر protoc
با یک پلاگین مخصوص gRPC جاوا انجام می دهیم. برای تولید سرویس های gRPC باید از کامپایلر proto3 (که از دستورات proto2 و proto3 پشتیبانی می کند) استفاده کنید.
هنگام استفاده از Gradle یا Maven، افزونه پروتوک بیلد می تواند کدهای لازم را به عنوان بخشی از بیلد ایجاد کند. برای نحوه تولید کد از فایل های .proto
خود می توانید به grpc-java README مراجعه کنید.
ما پیکربندی Gradle را ارائه کرده ایم.
از دایرکتوری streaming-grpc-java-getting-started
وارد شوید
$ chmod +x gradlew $ ./gradlew generateProto
کلاس های زیر از تعریف سرویس ما (تحت build/generated/sources/proto/main/java
) تولید می شوند:
- یکی برای هر نوع پیام:
Feature.java
،Rectangle.java, ...
که شامل تمام کدهای بافر پروتکل برای پر کردن، سریال سازی و بازیابی انواع پیام درخواست و پاسخ ما هستند. -
RouteGuideGrpc.java
که شامل (همراه با برخی کدهای مفید دیگر) یک کلاس پایه برای سرورهایRouteGuide
برای پیاده سازی است،RouteGuideGrpc.RouteGuideImplBase
، با تمام روش های تعریف شده در سرویسRouteGuide
و کلاس های خرد برای استفاده از کلاینت ها
5. سرویس را پیاده سازی کنید
ابتدا بیایید نحوه ایجاد سرور RouteGuide
را بررسی کنیم. دو بخش برای اینکه سرویس RouteGuide
ما کار خود را انجام دهد وجود دارد:
- پیاده سازی رابط سرویس ایجاد شده از تعریف سرویس ما: انجام "کار" واقعی سرویس ما.
- اجرای یک سرور gRPC برای گوش دادن به درخواست های مشتریان و ارسال آنها به اجرای سرویس مناسب.
RouteGuide را پیاده سازی کنید
ما یک کلاس RouteGuideService
را پیاده سازی خواهیم کرد که کلاس RouteGuideGrpc.RouteGuideImplBase ایجاد شده را گسترش می دهد. اجرا اینگونه به نظر می رسد.
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
...
}
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
...
}
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
...
}
اجازه دهید هر پیاده سازی RPC را با جزئیات بررسی کنیم
RPC استریم سمت سرور
در ادامه به یکی از RPC های جریانی خود نگاه می کنیم. ListFeatures
یک RPC استریم سمت سرور است، بنابراین باید چندین Features
به مشتری خود بازگردانیم.
private final Collection<Feature> features;
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
مانند RPC ساده، این روش یک شی درخواست ( Rectangle
که مشتری ما میخواهد در آن Features
پیدا کند) و یک مشاهدهگر پاسخ StreamObserver
دریافت میکند.
این بار، به تعداد Feature
نیاز برای بازگشت به سرویس گیرنده، ما آنها را از مجموعه ویژگی های سرویس بر اساس اینکه آیا در داخل Rectangle
درخواست ما هستند انتخاب می کنیم، و هر کدام را به نوبه خود برای مشاهده گر پاسخ با استفاده از متد onNext()
می نویسیم. در نهایت، مانند RPC ساده ما، از متد onCompleted()
مشاهدهگر پاسخ استفاده میکنیم تا به gRPC بگوییم که نوشتن پاسخها به پایان رسیده است.
جریان RPC سمت مشتری
حالا بیایید به چیز کمی پیچیدهتر نگاه کنیم: روش پخش سمت مشتری RecordRoute()
، که در آن جریانی از Points
را از مشتری دریافت میکنیم و یک RouteSummary
با اطلاعاتی در مورد سفر آنها برمیگردانیم.
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
همانطور که می بینید، مانند انواع روش های قبلی، روش ما یک پارامتر StreamObserver
responseObserver
دریافت می کند، اما این بار یک StreamObserver
برای مشتری برمی گرداند تا Points
خود را بنویسد.
در بدنه متد، ما یک StreamObserver
ناشناس را برای بازگشت نمونه می کنیم، که در آن:
- روش
onNext()
را نادیده بگیرید تا هر بار که کلاینتPoint
در جریان پیام می نویسد ویژگی ها و سایر اطلاعات را دریافت کنید. - برای پر کردن و ساخت
RouteSummary
متدonCompleted()
(که زمانی که کلاینت نوشتن پیامها را به پایان رساند فراخوانی میشود) را لغو کنید. سپس باRouteSummary
، ناظر پاسخ خود متد خود،onNext()
و سپس متدonCompleted()
آن را فراخوانی می کنیم تا تماس از سمت سرور به پایان برسد.
جریان دو طرفه RPC
در نهایت، اجازه دهید به جریان دو طرفه RPC RouteChat()
نگاه کنیم.
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
مانند مثال پخش جریانی سمت کلاینت، ما هم یک StreamObserver
را دریافت میکنیم و هم آن را برمیگردانیم، با این تفاوت که این بار مقادیر را از طریق مشاهدهگر پاسخ متدمان برمیگردانیم در حالی که مشتری همچنان در حال نوشتن پیامها به جریان پیام خود است. نحو برای خواندن و نوشتن در اینجا دقیقاً مشابه روشهای پخش جریانی مشتری و استریم سرور ما است. اگرچه هر یک از طرفین همیشه پیام های طرف مقابل را به ترتیبی که نوشته شده دریافت می کند، هم مشتری و هم سرور می توانند به هر ترتیبی بخوانند و بنویسند - جریان ها کاملاً مستقل عمل می کنند.
سرور را راه اندازی کنید
هنگامی که همه روشهای خود را پیادهسازی کردیم، همچنین باید یک سرور gRPC راهاندازی کنیم تا مشتریان بتوانند واقعاً از خدمات ما استفاده کنند. قطعه زیر نشان می دهد که چگونه این کار را برای سرویس RouteGuide
خود انجام می دهیم:
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}
/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
}
همانطور که می بینید، ما سرور خود را با استفاده از یک ServerBuilder
می سازیم و راه اندازی می کنیم.
برای انجام این کار، ما:
- آدرس و پورتی را که میخواهیم برای گوش دادن به درخواستهای مشتری با استفاده از متد
forPort()
سازنده استفاده کنیم، مشخص کنید. - نمونه ای از کلاس پیاده سازی سرویس ما
RouteGuideService
ایجاد کنید و آن را به متدaddService()
سازنده ارسال کنید. - برای ایجاد و راه اندازی یک سرور RPC برای سرویس ما،
build()
وstart()
در سازنده فراخوانی کنید.
از آنجایی که ServerBuilder در حال حاضر پورت را در خود جای داده است، تنها دلیلی که یک پورت را پاس می کنیم استفاده از آن برای ورود به سیستم است.
6. مشتری ایجاد کنید
در این بخش، ایجاد یک کلاینت برای سرویس RouteGuide
خود را بررسی خواهیم کرد. می توانید نمونه کامل کد کلاینت ما را در ../complete/src/main/java/io/grpc/complete/routeguide/
RouteGuideClient.java
ببینید.
نمونه خرد
برای فراخوانی متدهای سرویس، ابتدا باید یک خرد یا بهتر بگوییم دو خرد ایجاد کنیم:
- یک خرد مسدود/همگام : این بدان معناست که فراخوانی RPC منتظر پاسخ سرور میماند و یا پاسخی را برمیگرداند یا استثنایی ایجاد میکند.
- یک خرد غیر مسدود/ناهمزمان که تماسهای غیرانسدادی را با سرور برقرار میکند، جایی که پاسخ به صورت ناهمزمان برگردانده میشود. میتوانید انواع خاصی از تماسهای جریانی را فقط با استفاده از یک خرد ناهمزمان برقرار کنید.
ابتدا باید یک کانال gRPC برای خرد خود ایجاد کنیم و آدرس سرور و پورتی را که میخواهیم به آن وصل شویم را مشخص کنیم:
public static void main(String[] args) throws InterruptedException {
String target = "localhost:8980";
if (args.length > 0) {
if ("--help".equals(args[0])) {
System.err.println("Usage: [target]");
System.err.println("");
System.err.println(" target The server to connect to. Defaults to " + target);
System.exit(1);
}
target = args[0];
}
List<Feature> features;
try {
features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
} catch (IOException ex) {
ex.printStackTrace();
return;
}
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
.build();
try {
RouteGuideClient client = new RouteGuideClient(channel);
// Looking for features between 40, -75 and 42, -73.
client.listFeatures(400000000, -750000000, 420000000, -730000000);
// Record a few randomly selected points from the features file.
client.recordRoute(features, 10);
// Send and receive some notes.
CountDownLatch finishLatch = client.routeChat();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
client.warning("routeChat did not finish within 1 minutes");
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
برای ایجاد کانال از ManagedChannelBuilder
استفاده می کنیم.
اکنون میتوانیم از کانال برای ایجاد خردههای خود با استفاده از متدهای newStub
و newBlockingStub
ارائهشده در کلاس RouteGuideGrpc
که از .proto
خود تولید کردیم، استفاده کنیم.
public RouteGuideClient(Channel channel) {
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
به یاد داشته باشید، اگر مسدود نیست، ناهمگام است
روش های خدمات تماس
حال بیایید ببینیم که چگونه روشهای خدمات خود را فراخوانی میکنیم. توجه داشته باشید که هر RPC ایجاد شده از خرد مسدود کننده در حالت مسدود کردن/همگام عمل می کند، به این معنی که تماس RPC منتظر پاسخ سرور می ماند و یا یک پاسخ یا یک خطا را برمی گرداند.
RPC استریم سمت سرور
در مرحله بعد، بیایید به یک تماس جریان سمت سرور با ListFeatures
نگاه کنیم، که جریانی از Feature
جغرافیایی را برمی گرداند:
Rectangle request = Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
همانطور که می بینید، این روش بسیار شبیه به RPC یکنواخت ساده است که ما در Codelab Getting_Started_With_gRPC_Java به آن نگاه کردیم، با این تفاوت که به جای بازگرداندن یک Feature
، متد یک Iterator
را برمی گرداند که کلاینت می تواند از آن برای خواندن همه Features
بازگشتی استفاده کند.
جریان RPC سمت مشتری
اکنون برای چیز کمی پیچیدهتر: روش پخش سمت سرویس گیرنده RecordRoute
، که در آن جریانی از Points
را به سرور ارسال میکنیم و یک RouteSummary
برمیگردانیم. برای این روش باید از خرد ناهمزمان استفاده کنیم. اگر قبلاً «ایجاد سرور» را خواندهاید، ممکن است برخی از این موارد بسیار آشنا به نظر برسند - RPCهای جریان ناهمزمان به روشی مشابه در هر دو طرف پیادهسازی میشوند.
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
همانطور که می بینید، برای فراخوانی این روش باید یک StreamObserver
ایجاد کنیم که یک رابط ویژه برای فراخوانی سرور با پاسخ RouteSummary
خود پیاده سازی می کند. در StreamObserver
ما:
- هنگامی که سرور یک
RouteSummary
در جریان پیام می نویسد، روشonNext()
را لغو کنید تا اطلاعات برگشتی را چاپ کنید. - برای کاهش یک
CountDownLatch
روشonCompleted()
را لغو کنید (زمانی که سرور فراخوانی را در سمت خود انجام داد فراخوانی میشود) تا بتوانیم بررسی کنیم که آیا سرور نوشتن را به پایان رسانده است یا خیر.
سپس StreamObserver
به متد recordRoute()
stub ناهمزمان میدهیم و ناظر درخواست StreamObserver
خود را برمیگردانیم تا Points
ما را برای ارسال به سرور بنویسد. پس از پایان نوشتن نقاط، از متد onCompleted()
مشاهدهگر درخواست استفاده میکنیم تا به gRPC بگوییم که نوشتن در سمت کلاینت را به پایان رساندهایم. هنگامی که کارمان تمام شد، CountDownLatch
خود را بررسی می کنیم تا ببینیم آیا سرور در سمت خود تکمیل شده است یا خیر.
جریان دو طرفه RPC
در نهایت، اجازه دهید به جریان دو طرفه RPC RouteChat()
نگاه کنیم.
public CountDownLatch routeChat() {
info("*** RouteChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@Override
public void onError(Throwable t) {
warning("RouteChat Failed: {0}", Status.fromThrowable(t));
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// return the latch while receiving happens asynchronously
return finishLatch;
}
مانند مثال استریم سمت کلاینت، ما هم یک ناظر پاسخ StreamObserver
را دریافت میکنیم و هم برمیگردانیم، با این تفاوت که این بار مقادیر را از طریق مشاهدهگر پاسخ متد خود ارسال میکنیم در حالی که سرور همچنان در حال نوشتن پیامها به جریان پیام خود است. نحو برای خواندن و نوشتن در اینجا دقیقاً مشابه روش پخش جریانی مشتری ما است. اگرچه هر یک از طرفین همیشه پیام های طرف مقابل را به ترتیبی که نوشته شده دریافت می کند، هم مشتری و هم سرور می توانند به هر ترتیبی بخوانند و بنویسند - جریان ها کاملاً مستقل عمل می کنند.
7. آن را امتحان کنید!
- از دایرکتوری
start_here
:
$ ./gradlew installDist
این کد شما را کامپایل می کند، آن را در یک شیشه بسته بندی می کند و اسکریپت هایی را ایجاد می کند که نمونه را اجرا می کنند. آنها در دایرکتوری build/install/start_here/bin/
ایجاد خواهند شد. اسکریپت ها عبارتند از: route-guide-server
و route-guide-client
.
سرور باید قبل از راه اندازی کلاینت در حال اجرا باشد.
- سرور را اجرا کنید:
$ ./build/install/start_here/bin/route-guide-server
- کلاینت را اجرا کنید:
$ ./build/install/start_here/bin/route-guide-client
8. بعدی چه خواهد شد
- نحوه عملکرد gRPC را در مقدمه مفاهیم gRPC و Core بیاموزید
- از طریق آموزش اصول کار کنید
- مرجع API را کاوش کنید.