1. บทนำ
ในโค้ดแล็บนี้ คุณจะได้ใช้ gRPC-Java เพื่อสร้างไคลเอ็นต์และเซิร์ฟเวอร์ซึ่งเป็นรากฐานของแอปพลิเคชันการแมปเส้นทางที่เขียนด้วย Java
เมื่อจบบทแนะนำนี้ คุณจะมีไคลเอ็นต์ที่เชื่อมต่อกับเซิร์ฟเวอร์ระยะไกลโดยใช้ gRPC เพื่อรับข้อมูลเกี่ยวกับฟีเจอร์ในเส้นทางของไคลเอ็นต์ สร้างข้อมูลสรุปของเส้นทางของไคลเอ็นต์ และแลกเปลี่ยนข้อมูลเส้นทาง เช่น ข้อมูลอัปเดตการจราจร กับเซิร์ฟเวอร์และไคลเอ็นต์อื่นๆ
บริการนี้กำหนดไว้ในไฟล์ Protocol Buffers ซึ่งจะใช้เพื่อสร้างโค้ดมาตรฐานสำหรับไคลเอ็นต์และเซิร์ฟเวอร์เพื่อให้สื่อสารกันได้ ซึ่งจะช่วยประหยัดเวลาและแรงในการติดตั้งใช้งานฟังก์ชันดังกล่าว
โค้ดที่สร้างขึ้นนี้ไม่เพียงแต่จัดการความซับซ้อนของการสื่อสารระหว่างเซิร์ฟเวอร์และไคลเอ็นต์เท่านั้น แต่ยังจัดการการซีเรียลไลซ์และการดีซีเรียลไลซ์ข้อมูลด้วย
สิ่งที่คุณจะได้เรียนรู้
- วิธีใช้ Protocol Buffers เพื่อกำหนด API ของบริการ
- วิธีสร้างไคลเอ็นต์และเซิร์ฟเวอร์ที่ใช้ gRPC จากคำจำกัดความของ Protocol Buffers โดยใช้การสร้างโค้ดอัตโนมัติ
- ความเข้าใจเกี่ยวกับการสื่อสารแบบสตรีมมิงไคลเอ็นต์-เซิร์ฟเวอร์ด้วย gRPC
Codelab นี้มีไว้สำหรับนักพัฒนาแอป Java ที่เพิ่งเริ่มใช้ gRPC หรือต้องการทบทวน gRPC หรือใครก็ตามที่สนใจสร้างระบบแบบกระจาย ไม่จำเป็นต้องมีประสบการณ์เกี่ยวกับ gRPC มาก่อน
2. ก่อนเริ่มต้น
ข้อกำหนดเบื้องต้น
- JDK เวอร์ชัน 24
รับโค้ด
Codelab นี้มีโครงสร้างของซอร์สโค้ดของแอปพลิเคชันเพื่อให้คุณทำต่อได้ คุณจึงไม่ต้องเริ่มต้นจากศูนย์ ขั้นตอนต่อไปนี้จะแสดงวิธีส่งแอปพลิเคชันให้เสร็จสมบูรณ์ ซึ่งรวมถึงการใช้ปลั๊กอินคอมไพเลอร์ Protocol Buffer เพื่อสร้างโค้ด gRPC ที่ซ้ำกัน
ก่อนอื่น ให้สร้างไดเรกทอรีการทำงานของ Codelab แล้วใช้คำสั่ง cd เพื่อเข้าไปในไดเรกทอรีดังกล่าว
mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started
ดาวน์โหลดและแตกไฟล์ Codelab โดยทำดังนี้
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
| tar xvz --strip-components=4 \
grpc-codelabs-1/codelabs/grpc-java-streaming/start_here
หรือคุณจะดาวน์โหลดไฟล์ .zip ที่มีเฉพาะไดเรกทอรี Codelab แล้วแตกไฟล์ด้วยตนเองก็ได้
ซอร์สโค้ดที่เสร็จสมบูรณ์แล้วพร้อมใช้งานใน GitHub หากคุณไม่ต้องการพิมพ์การติดตั้งใช้งาน
3. กำหนดข้อความและบริการ
ขั้นตอนแรกคือการกำหนดบริการ gRPC ของแอปพลิเคชัน เมธอด RPC รวมถึงประเภทข้อความคำขอและการตอบกลับโดยใช้ Protocol Buffers บริการของคุณจะให้ข้อมูลต่อไปนี้
- เมธอด RPC ที่เรียกว่า
ListFeatures,RecordRouteและRouteChatซึ่งเซิร์ฟเวอร์ใช้และไคลเอ็นต์เรียก - ประเภทข้อความ
Point,Feature,Rectangle,RouteNoteและRouteSummaryซึ่งเป็นโครงสร้างข้อมูลที่แลกเปลี่ยนระหว่างไคลเอ็นต์และเซิร์ฟเวอร์เมื่อเรียกใช้เมธอดข้างต้น
Protocol Buffers เรียกกันโดยทั่วไปว่า protobuf ดูข้อมูลเพิ่มเติมเกี่ยวกับคำศัพท์ gRPC ได้ที่แนวคิดหลัก สถาปัตยกรรม และวงจรของ gRPC
วิธีการ RPC นี้และประเภทข้อความของวิธีการนี้จะกำหนดไว้ในไฟล์ proto/routeguide/route_guide.proto ของซอร์สโค้ดที่ระบุ
มาสร้างไฟล์ route_guide.proto กัน
เนื่องจากเราจะสร้างโค้ด Java ในตัวอย่างนี้ เราจึงระบุตัวเลือกไฟล์ java_package ใน .proto ดังนี้
option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";
กำหนดประเภทข้อความ
ในproto/routeguide/route_guide.protoไฟล์ของซอร์สโค้ด ให้กำหนดPointประเภทข้อความก่อน Point แสดงคู่พิกัดละติจูดและลองจิจูดบนแผนที่ สำหรับ Codelab นี้ ให้ใช้จำนวนเต็มสำหรับพิกัด
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
หมายเลข 1 และ 2 เป็นหมายเลขรหัสที่ไม่ซ้ำกันสำหรับแต่ละฟิลด์ในโครงสร้าง message
จากนั้นกำหนดFeatureประเภทข้อความ Feature ใช้ฟิลด์ string สำหรับชื่อหรือที่อยู่ไปรษณีย์ของสิ่งหนึ่งๆ ในสถานที่ที่ระบุโดย Point ดังนี้
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
หากต้องการสตรีมจุดหลายจุดภายในพื้นที่ไปยังไคลเอ็นต์ คุณจะต้องมีRectangleข้อความที่แสดงสี่เหลี่ยมผืนผ้าละติจูด-ลองจิจูด ซึ่งแสดงเป็น 2 จุดที่อยู่ตรงข้ามกันในแนวทแยง lo และ hi ดังนี้
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
นอกจากนี้ ยังมีข้อความ RouteNote ที่แสดงถึงข้อความที่ส่งขณะอยู่ที่จุดหนึ่งๆ ด้วย
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
สุดท้าย คุณจะต้องมีRouteSummaryข้อความ คุณจะได้รับข้อความนี้เป็นการตอบกลับ RPC ของ RecordRoute ซึ่งจะอธิบายในส่วนถัดไป โดยจะมีจำนวนจุดแต่ละจุดที่ได้รับ จำนวนฟีเจอร์ที่ตรวจพบ และระยะทางทั้งหมดที่ครอบคลุมเป็นผลรวมสะสมของระยะทางระหว่างแต่ละจุด
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
กำหนดวิธีการบริการ
หากต้องการกำหนดบริการ ให้ระบุบริการที่มีชื่อในไฟล์ .proto ไฟล์ route_guide.proto มีโครงสร้าง service ชื่อ RouteGuide ซึ่งกำหนดวิธีการอย่างน้อย 1 วิธีที่บริการของแอปพลิเคชันมีให้
เมื่อกำหนดRPCเมธอดภายในคำจำกัดความของบริการ คุณจะระบุประเภทคำขอและการตอบกลับของเมธอดเหล่านั้น ในส่วนนี้ของโค้ดแล็บ เราจะกำหนดค่าต่อไปนี้
ListFeatures
รับออบเจ็กต์ Feature ที่มีอยู่ภายใน Rectangle ที่ระบุ ระบบจะสตรีมผลการค้นหาแทนที่จะแสดงผลทั้งหมดในครั้งเดียว เนื่องจากสี่เหลี่ยมผืนผ้าอาจครอบคลุมพื้นที่ขนาดใหญ่และมีฟีเจอร์จำนวนมาก
สำหรับแอปพลิเคชันนี้ คุณจะใช้ RPC แบบสตรีมมิงฝั่งเซิร์ฟเวอร์ ซึ่งไคลเอ็นต์จะส่งคำขอไปยังเซิร์ฟเวอร์และรับสตรีมเพื่ออ่านลำดับข้อความกลับ ไคลเอ็นต์จะอ่านจากสตรีมที่ส่งคืนจนกว่าจะไม่มีข้อความเหลือ ดังที่เห็นในตัวอย่าง คุณระบุวิธีการสตรีมฝั่งเซิร์ฟเวอร์ได้โดยวางคีย์เวิร์ดสตรีมไว้ก่อนประเภทการตอบกลับ
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
ยอมรับสตรีมของจุดในเส้นทางที่กำลังเดินทาง และส่งคืน RouteSummary เมื่อการเดินทางเสร็จสมบูรณ์
ในกรณีนี้ RPC การสตรีมฝั่งไคลเอ็นต์จะเหมาะสม โดยไคลเอ็นต์จะเขียนลำดับข้อความและส่งไปยังเซิร์ฟเวอร์อีกครั้งโดยใช้สตรีมที่ระบุ เมื่อไคลเอ็นต์เขียนข้อความเสร็จแล้ว ไคลเอ็นต์จะรอให้เซิร์ฟเวอร์อ่านข้อความทั้งหมดและส่งการตอบกลับ คุณระบุวิธีการสตรีมฝั่งไคลเอ็นต์ได้โดยวางคีย์เวิร์ดสตรีมไว้ก่อนประเภทคำขอ
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
ยอมรับสตรีมของ RouteNotes ที่ส่งขณะที่กำลังเดินทางตามเส้นทาง ขณะเดียวกันก็รับ RouteNotes อื่นๆ (เช่น จากผู้ใช้รายอื่น)
นี่คือกรณีการใช้งานที่เหมาะกับการสตรีมแบบ 2 ทาง RPC แบบสตรีมมิงแบบ 2 ทางซึ่งทั้ง 2 ฝ่ายจะส่งลำดับข้อความโดยใช้สตรีมแบบอ่าน-เขียน สตรีมทั้ง 2 รายการทำงานแยกกัน ดังนั้นไคลเอ็นต์และเซิร์ฟเวอร์จึงอ่านและเขียนได้ตามลำดับที่ต้องการ เช่น เซิร์ฟเวอร์อาจรอรับข้อความทั้งหมดจากไคลเอ็นต์ก่อนที่จะเขียนการตอบกลับ หรืออาจอ่านข้อความแล้วเขียนข้อความสลับกัน หรืออาจใช้การอ่านและการเขียนแบบอื่นๆ ระบบจะรักษลําดับของข้อความในแต่ละสตรีม คุณระบุประเภทเมธอดนี้ได้โดยวางคีย์เวิร์ดสตรีมไว้ก่อนทั้งคำขอและการตอบกลับ
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. สร้างโค้ดไคลเอ็นต์และเซิร์ฟเวอร์
จากนั้นเราต้องสร้างอินเทอร์เฟซไคลเอ็นต์และเซิร์ฟเวอร์ gRPC จาก.protoคำจำกัดความของบริการ เราดำเนินการนี้โดยใช้คอมไพเลอร์บัฟเฟอร์โปรโตคอล protoc ร่วมกับปลั๊กอิน gRPC Java พิเศษ คุณต้องใช้คอมไพเลอร์ proto3 (ซึ่งรองรับทั้งไวยากรณ์ proto2 และ proto3) เพื่อสร้างบริการ gRPC
เมื่อใช้ Gradle หรือ Maven ปลั๊กอินการสร้าง protoc จะสร้างโค้ดที่จำเป็นเป็นส่วนหนึ่งของการสร้างได้ คุณดูวิธีสร้างโค้ดจากไฟล์ .proto ของตัวเองได้ใน README ของ grpc-java
เราได้ให้การกำหนดค่า Gradle ไว้แล้ว
จากไดเรกทอรี streaming-grpc-java-getting-started ให้ป้อน
$ chmod +x gradlew $ ./gradlew generateProto
ระบบจะสร้างคลาสต่อไปนี้จากคำจำกัดความของบริการ (ในส่วน build/generated/sources/proto/main/java)
- โดยมี 1 รายการสำหรับข้อความแต่ละประเภท ได้แก่
Feature.java,Rectangle.java, ...ซึ่งมีโค้ด Protocol Buffer ทั้งหมดเพื่อสร้างข้อมูล ทำให้เป็นอนุกรม และเรียกประเภทข้อความคำขอและการตอบกลับของเรา RouteGuideGrpc.javaซึ่งมี (พร้อมกับโค้ดอื่นๆ ที่มีประโยชน์) คลาสพื้นฐานสำหรับRouteGuideเซิร์ฟเวอร์ที่จะใช้RouteGuideGrpc.RouteGuideImplBaseโดยมีเมธอดทั้งหมดที่กำหนดไว้ในRouteGuideคลาสบริการและคลาส Stub สำหรับไคลเอ็นต์ที่จะใช้
5. ติดตั้งใช้งานบริการ
ก่อนอื่นมาดูวิธีสร้างRouteGuideเซิร์ฟเวอร์กัน การทำให้RouteGuideบริการของเราทำงานได้ตามที่ควรจะเป็นมี 2 ส่วน ดังนี้
- การติดตั้งใช้งานอินเทอร์เฟซบริการที่สร้างขึ้นจากคำจำกัดความของบริการ: การ "ทำงาน" จริงของบริการ
- เรียกใช้เซิร์ฟเวอร์ gRPC เพื่อรอรับคำขอจากไคลเอ็นต์และส่งคำขอไปยังการติดตั้งใช้งานบริการที่ถูกต้อง
ใช้ RouteGuide
เราจะใช้RouteGuideServiceคลาสที่จะขยายคลาส RouteGuideGrpc.RouteGuideImplBase ที่สร้างขึ้น การติดตั้งใช้งานจะมีลักษณะดังนี้
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
...
}
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
...
}
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
...
}
มาดูรายละเอียดการติดตั้งใช้งาน RPC แต่ละรายการกัน
RPC การสตรีมฝั่งเซิร์ฟเวอร์
ต่อไป มาดู RPC แบบสตรีมมิงรายการหนึ่งของเรากัน ListFeatures เป็น RPC การสตรีมฝั่งเซิร์ฟเวอร์ ดังนั้นเราจึงต้องส่ง Features หลายรายการกลับไปยังไคลเอ็นต์
private final Collection<Feature> features;
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
เช่นเดียวกับ RPC อย่างง่าย วิธีนี้จะรับออบเจ็กต์คำขอ (Rectangle ที่ไคลเอ็นต์ต้องการค้นหา Features) และStreamObserverเครื่องสังเกตการตอบกลับ
คราวนี้เราจะรับออบเจ็กต์ Feature ให้ได้มากที่สุดเท่าที่จำเป็นเพื่อส่งคืนไปยังไคลเอ็นต์ (ในกรณีนี้ เราจะเลือกออบเจ็กต์จากคอลเล็กชันฟีเจอร์ของบริการโดยพิจารณาว่าออบเจ็กต์นั้นอยู่ในคำขอของเราหรือไม่ Rectangle) และเขียนออบเจ็กต์แต่ละรายการตามลำดับไปยังเครื่องมือสังเกตการตอบกลับโดยใช้เมธอด onNext() ของเครื่องมือ สุดท้ายนี้ เราใช้เมธอด onCompleted() ของเครื่องสังเกตการตอบกลับเพื่อบอก gRPC ว่าเราเขียนการตอบกลับเสร็จแล้ว เช่นเดียวกับใน RPC อย่างง่าย
RPC การสตรีมฝั่งไคลเอ็นต์
ตอนนี้เรามาดูสิ่งที่ซับซ้อนขึ้นเล็กน้อยกัน นั่นคือวิธีการสตรีมฝั่งไคลเอ็นต์ RecordRoute() ซึ่งเราจะรับสตรีมของ Points จากไคลเอ็นต์และส่งคืน RouteSummary รายการเดียวพร้อมข้อมูลเกี่ยวกับการเดินทาง
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
ดังที่เห็นได้ว่าเมธอดของเราจะรับพารามิเตอร์ StreamObserver responseObserver เช่นเดียวกับประเภทเมธอดก่อนหน้า แต่คราวนี้จะส่งคืน StreamObserver เพื่อให้ไคลเอ็นต์เขียน Points
ในเนื้อหาของเมธอด เราจะสร้างอินสแตนซ์ของ StreamObserver แบบไม่ระบุชื่อเพื่อส่งคืน ซึ่งเราจะทำดังนี้
- ลบล้างเมธอด
onNext()เพื่อรับฟีเจอร์และข้อมูลอื่นๆ ทุกครั้งที่ไคลเอ็นต์เขียนPointลงในสตรีมข้อความ - แทนที่เมธอด
onCompleted()(เรียกใช้เมื่อไคลเอ็นต์เขียนข้อความเสร็จแล้ว) เพื่อป้อนและสร้างRouteSummaryจากนั้นเราจะเรียกใช้onNext()ของเครื่องมือสังเกตการตอบกลับของเมธอดของเราเองด้วยRouteSummaryและเรียกใช้เมธอดonCompleted()เพื่อสิ้นสุดการเรียกจากฝั่งเซิร์ฟเวอร์
RPC การสตรีมแบบ 2 ทาง
สุดท้าย มาดู RPC การสตรีมแบบสองทาง RouteChat() กัน
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
เช่นเดียวกับตัวอย่างการสตรีมฝั่งไคลเอ็นต์ เราทั้งรับและส่งคืน StreamObserver แต่ครั้งนี้เราจะส่งคืนค่าผ่านเครื่องมือสังเกตการตอบกลับของเมธอดในขณะที่ไคลเอ็นต์ยังคงเขียนข้อความไปยังสตรีมข้อความของตน ไวยากรณ์สำหรับการอ่านและเขียนที่นี่จะเหมือนกับไวยากรณ์สำหรับวิธีการสตรีมมิงฝั่งไคลเอ็นต์และสตรีมมิงฝั่งเซิร์ฟเวอร์ทุกประการ แม้ว่าแต่ละฝ่ายจะได้รับข้อความของอีกฝ่ายตามลำดับที่เขียนเสมอ แต่ทั้งไคลเอ็นต์และเซิร์ฟเวอร์สามารถอ่านและเขียนได้ตามลำดับใดก็ได้ โดยสตรีมจะทำงานอย่างอิสระโดยสมบูรณ์
เริ่มเซิร์ฟเวอร์
เมื่อเราได้ใช้ทุกวิธีแล้ว เราก็ต้องเริ่มเซิร์ฟเวอร์ gRPC เพื่อให้ไคลเอ็นต์ใช้บริการของเราได้จริง ข้อมูลโค้ดต่อไปนี้แสดงวิธีที่เราดำเนินการนี้สำหรับบริการ RouteGuide
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}
/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
}
ดังที่เห็น เราสร้างและเริ่มเซิร์ฟเวอร์โดยใช้ ServerBuilder
เราจะดำเนินการดังกล่าวโดยทำดังนี้
- ระบุที่อยู่และพอร์ตที่เราต้องการใช้เพื่อรอรับคำขอของไคลเอ็นต์โดยใช้เมธอด
forPort()ของบิลเดอร์ - สร้างอินสแตนซ์ของคลาสการติดตั้งใช้งานบริการ
RouteGuideServiceและส่งไปยังเมธอดaddService()ของ Builder - เรียกใช้
build()และstart()ในบิลเดอร์เพื่อสร้างและเริ่มต้นเซิร์ฟเวอร์ RPC สำหรับบริการของเรา
เนื่องจาก ServerBuilder มีพอร์ตอยู่แล้ว เหตุผลเดียวที่เราส่งพอร์ตคือเพื่อใช้ในการบันทึก
6. สร้างไคลเอ็นต์
ในส่วนนี้ เราจะมาดูการสร้างไคลเอ็นต์สำหรับบริการ RouteGuide คุณดูโค้ดไคลเอ็นต์ตัวอย่างที่สมบูรณ์ได้ใน ../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java
สร้างอินสแตนซ์ของ Stub
หากต้องการเรียกใช้เมธอดบริการ เราต้องสร้างสตับก่อน หรือจะสร้าง 2 สตับก็ได้
- สแต็บบล็อก/ซิงโครนัส: หมายความว่าการเรียก RPC จะรอให้เซิร์ฟเวอร์ตอบกลับ และจะส่งคืนการตอบกลับหรือยกเว้น
- สตับที่ไม่บล็อก/แบบอะซิงโครนัสที่ทำการเรียกที่ไม่บล็อกไปยังเซิร์ฟเวอร์ ซึ่งจะส่งคืนการตอบกลับแบบอะซิงโครนัส คุณจะโทรแบบสตรีมมิงบางประเภทได้โดยใช้ Stub แบบไม่พร้อมกันเท่านั้น
ก่อนอื่นเราต้องสร้างแชแนล gRPC สำหรับสตับ โดยระบุที่อยู่เซิร์ฟเวอร์และพอร์ตที่เราต้องการเชื่อมต่อ
public static void main(String[] args) throws InterruptedException {
String target = "localhost:8980";
if (args.length > 0) {
if ("--help".equals(args[0])) {
System.err.println("Usage: [target]");
System.err.println("");
System.err.println(" target The server to connect to. Defaults to " + target);
System.exit(1);
}
target = args[0];
}
List<Feature> features;
try {
features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
} catch (IOException ex) {
ex.printStackTrace();
return;
}
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
.build();
try {
RouteGuideClient client = new RouteGuideClient(channel);
// Looking for features between 40, -75 and 42, -73.
client.listFeatures(400000000, -750000000, 420000000, -730000000);
// Record a few randomly selected points from the features file.
client.recordRoute(features, 10);
// Send and receive some notes.
CountDownLatch finishLatch = client.routeChat();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
client.warning("routeChat did not finish within 1 minutes");
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
เราใช้ManagedChannelBuilderเพื่อสร้างช่อง
ตอนนี้เราสามารถใช้แชแนลเพื่อสร้าง Stub โดยใช้เมธอด newStub และ newBlockingStub ที่ระบุไว้ในคลาส RouteGuideGrpc ที่เราสร้างจาก .proto
public RouteGuideClient(Channel channel) {
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
โปรดทราบว่าหากไม่ได้บล็อก ก็จะเป็นแบบไม่พร้อมกัน
วิธีการเรียกใช้บริการ
ตอนนี้มาดูวิธีเรียกใช้เมธอดบริการกัน โปรดทราบว่า RPC ที่สร้างจาก Stub ที่บล็อกจะทํางานในโหมดบล็อก/ซิงโครนัส ซึ่งหมายความว่าการเรียก RPC จะรอให้เซิร์ฟเวอร์ตอบกลับ และจะส่งคืนการตอบกลับหรือข้อผิดพลาด
RPC การสตรีมฝั่งเซิร์ฟเวอร์
ต่อไป เราจะมาดูการเรียกสตรีมฝั่งเซิร์ฟเวอร์ไปยัง ListFeatures ซึ่งจะแสดงผลสตรีมของFeatureทางภูมิศาสตร์
Rectangle request = Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
ดังที่เห็นได้ว่าฟังก์ชันนี้คล้ายกับ RPC แบบเอกภาคอย่างง่ายที่เราดูใน Codelab Getting_Started_With_gRPC_Java มาก เพียงแต่แทนที่จะส่งคืน Feature รายการเดียว เมธอดจะส่งคืน Iterator ที่ไคลเอ็นต์ใช้เพื่ออ่าน Features ทั้งหมดที่ส่งคืนได้
RPC การสตรีมฝั่งไคลเอ็นต์
ตอนนี้มาดูวิธีที่ซับซ้อนขึ้นเล็กน้อยกัน นั่นคือวิธีการสตรีมฝั่งไคลเอ็นต์ RecordRoute ซึ่งเราจะส่งสตรีมของ Points ไปยังเซิร์ฟเวอร์และรับ RouteSummary กลับมา สำหรับวิธีนี้ เราต้องใช้ Stub Asynchronous หากคุณเคยอ่านการสร้างเซิร์ฟเวอร์แล้ว คุณอาจคุ้นเคยกับเนื้อหาบางส่วนนี้เป็นอย่างดี เนื่องจาก RPC แบบสตรีมมิงแบบไม่พร้อมกันจะได้รับการติดตั้งใช้งานในลักษณะที่คล้ายกันทั้ง 2 ฝั่ง
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
ดังที่เห็นได้ว่าในการเรียกใช้เมธอดนี้ เราต้องสร้าง StreamObserver ซึ่งใช้การติดตั้งใช้งานอินเทอร์เฟซพิเศษสำหรับเซิร์ฟเวอร์เพื่อเรียกใช้ด้วยการตอบกลับ RouteSummary ในStreamObserver เราทำสิ่งต่อไปนี้
- ลบล้างเมธอด
onNext()เพื่อพิมพ์ข้อมูลที่ส่งคืนเมื่อเซิร์ฟเวอร์เขียนRouteSummaryลงในสตรีมข้อความ - แทนที่เมธอด
onCompleted()(เรียกใช้เมื่อ server ดำเนินการเรียกใช้ในฝั่งของตนเองเสร็จแล้ว) เพื่อลดCountDownLatchเพื่อให้เราตรวจสอบได้ว่าเซิร์ฟเวอร์เขียนเสร็จแล้วหรือไม่
จากนั้นเราจะส่ง StreamObserver ไปยังเมธอด recordRoute() ของสแต็บแบบอะซิงโครนัส และรับเครื่องสังเกตการณ์คำขอ StreamObserver ของเราเองกลับมาเพื่อเขียน Points ที่จะส่งไปยังเซิร์ฟเวอร์ เมื่อเขียนคะแนนเสร็จแล้ว เราจะใช้วิธี onCompleted() ของเครื่องสังเกตคำขอเพื่อบอก gRPC ว่าเราเขียนฝั่งไคลเอ็นต์เสร็จแล้ว เมื่อเสร็จแล้ว เราจะตรวจสอบ CountDownLatch เพื่อดูว่าเซิร์ฟเวอร์ดำเนินการเสร็จสมบูรณ์ในฝั่งของตนเองหรือไม่
RPC การสตรีมแบบ 2 ทาง
สุดท้าย มาดู RPC การสตรีมแบบสองทาง RouteChat() กัน
public CountDownLatch routeChat() {
info("*** RouteChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@Override
public void onError(Throwable t) {
warning("RouteChat Failed: {0}", Status.fromThrowable(t));
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// return the latch while receiving happens asynchronously
return finishLatch;
}
เช่นเดียวกับตัวอย่างการสตรีมฝั่งไคลเอ็นต์ เราจะได้รับและส่งคืน StreamObserver ResponseObserver ยกเว้นในครั้งนี้ที่เราจะส่งค่าผ่าน ResponseObserver ของเมธอดในขณะที่เซิร์ฟเวอร์ยังคงเขียนข้อความไปยังสตรีมข้อความของเซิร์ฟเวอร์ ไวยากรณ์สำหรับการอ่านและเขียนที่นี่จะเหมือนกับวิธีการสตรีมมิงแบบไคลเอ็นต์ของเราทุกประการ แม้ว่าแต่ละฝ่ายจะได้รับข้อความของอีกฝ่ายตามลำดับที่เขียนเสมอ แต่ทั้งไคลเอ็นต์และเซิร์ฟเวอร์สามารถอ่านและเขียนได้ตามลำดับใดก็ได้ โดยสตรีมจะทำงานอย่างอิสระโดยสมบูรณ์
7. ลองใช้เลย!
- จากไดเรกทอรี
start_hereให้ทำดังนี้
$ ./gradlew installDist
ซึ่งจะคอมไพล์โค้ด แพ็กเกจในไฟล์ JAR และสร้างสคริปต์ที่เรียกใช้ตัวอย่าง ระบบจะสร้างไฟล์เหล่านี้ในไดเรกทอรี build/install/start_here/bin/ สคริปต์คือ route-guide-server และ route-guide-client
เซิร์ฟเวอร์ต้องทำงานก่อนจึงจะเริ่มไคลเอ็นต์ได้
- เรียกใช้เซิร์ฟเวอร์
$ ./build/install/start_here/bin/route-guide-server
- เรียกใช้ไคลเอ็นต์
$ ./build/install/start_here/bin/route-guide-client
8. ขั้นตอนถัดไป
- ดูวิธีการทำงานของ gRPC ในข้อมูลเบื้องต้นเกี่ยวกับ gRPC และแนวคิดหลัก
- ดูบทแนะนำเกี่ยวกับพื้นฐาน
- สำรวจข้อมูลอ้างอิงของ API