1. บทนำ
ในโค้ดแล็บนี้ คุณจะได้ใช้ gRPC-Java เพื่อสร้างไคลเอ็นต์และเซิร์ฟเวอร์ซึ่งเป็นรากฐานของแอปพลิเคชันการแมปเส้นทางที่เขียนด้วย Java
เมื่อจบบทแนะนำนี้ คุณจะมีไคลเอ็นต์ที่เชื่อมต่อกับเซิร์ฟเวอร์ระยะไกลโดยใช้ gRPC เพื่อรับข้อมูลเกี่ยวกับฟีเจอร์ในเส้นทางของไคลเอ็นต์ สร้างข้อมูลสรุปของเส้นทางของไคลเอ็นต์ และแลกเปลี่ยนข้อมูลเส้นทาง เช่น ข้อมูลอัปเดตการจราจร กับเซิร์ฟเวอร์และไคลเอ็นต์อื่นๆ
บริการนี้กำหนดไว้ในไฟล์ Protocol Buffers ซึ่งจะใช้เพื่อสร้างโค้ดมาตรฐานสำหรับไคลเอ็นต์และเซิร์ฟเวอร์เพื่อให้สื่อสารกันได้ ซึ่งจะช่วยประหยัดเวลาและแรงในการติดตั้งใช้งานฟังก์ชันดังกล่าว
โค้ดที่สร้างขึ้นนี้ไม่เพียงแต่จัดการความซับซ้อนของการสื่อสารระหว่างเซิร์ฟเวอร์และไคลเอ็นต์เท่านั้น แต่ยังจัดการการซีเรียลไลซ์และการดีซีเรียลไลซ์ข้อมูลด้วย
สิ่งที่คุณจะได้เรียนรู้
- วิธีใช้ Protocol Buffers เพื่อกำหนด API ของบริการ
- วิธีสร้างไคลเอ็นต์และเซิร์ฟเวอร์ที่ใช้ gRPC จากคำจำกัดความของ Protocol Buffers โดยใช้การสร้างโค้ดอัตโนมัติ
- ความเข้าใจเกี่ยวกับการสื่อสารแบบสตรีมมิงไคลเอ็นต์-เซิร์ฟเวอร์ด้วย gRPC
Codelab นี้มีไว้สำหรับนักพัฒนาแอป Java ที่เพิ่งเริ่มใช้ gRPC หรือต้องการทบทวน gRPC หรือใครก็ตามที่สนใจสร้างระบบแบบกระจาย ไม่จำเป็นต้องมีประสบการณ์เกี่ยวกับ gRPC มาก่อน
2. ก่อนเริ่มต้น
ข้อกำหนดเบื้องต้น
- JDK เวอร์ชัน 24
รับโค้ด
Codelab นี้มีโครงสร้างของซอร์สโค้ดของแอปพลิเคชันเพื่อให้คุณทำต่อได้ คุณจึงไม่ต้องเริ่มต้นจากศูนย์ ขั้นตอนต่อไปนี้จะแสดงวิธีส่งแอปพลิเคชันให้เสร็จสมบูรณ์ ซึ่งรวมถึงการใช้ปลั๊กอินคอมไพเลอร์ Protocol Buffer เพื่อสร้างโค้ด gRPC ที่ซ้ำกัน
ก่อนอื่น ให้สร้างไดเรกทอรีการทำงานของ Codelab แล้วใช้คำสั่ง cd เพื่อเข้าไปในไดเรกทอรีดังกล่าว
mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started
ดาวน์โหลดและแตกไฟล์ Codelab โดยทำดังนี้
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
| tar xvz --strip-components=4 \
grpc-codelabs-1/codelabs/grpc-java-streaming/start_here
หรือคุณจะดาวน์โหลดไฟล์ .zip ที่มีเฉพาะไดเรกทอรี Codelab แล้วแตกไฟล์ด้วยตนเองก็ได้
ซอร์สโค้ดที่เสร็จสมบูรณ์แล้วพร้อมใช้งานใน GitHub หากคุณไม่ต้องการพิมพ์การติดตั้งใช้งาน
3. กำหนดข้อความและบริการ
ขั้นตอนแรกคือการกำหนดบริการ gRPC ของแอปพลิเคชัน เมธอด RPC รวมถึงประเภทข้อความคำขอและการตอบกลับโดยใช้ Protocol Buffers บริการของคุณจะให้ข้อมูลต่อไปนี้
- เมธอด RPC ที่เรียกว่า
ListFeatures
,RecordRoute
และRouteChat
ซึ่งเซิร์ฟเวอร์ใช้และไคลเอ็นต์เรียก - ประเภทข้อความ
Point
,Feature
,Rectangle
,RouteNote
และRouteSummary
ซึ่งเป็นโครงสร้างข้อมูลที่แลกเปลี่ยนระหว่างไคลเอ็นต์และเซิร์ฟเวอร์เมื่อเรียกใช้เมธอดข้างต้น
Protocol Buffers เรียกกันโดยทั่วไปว่า protobuf ดูข้อมูลเพิ่มเติมเกี่ยวกับคำศัพท์ gRPC ได้ที่แนวคิดหลัก สถาปัตยกรรม และวงจรของ gRPC
วิธีการ RPC นี้และประเภทข้อความของวิธีการนี้จะกำหนดไว้ในไฟล์ proto/routeguide/route_guide.proto
ของซอร์สโค้ดที่ระบุ
มาสร้างไฟล์ route_guide.proto
กัน
เนื่องจากเราจะสร้างโค้ด Java ในตัวอย่างนี้ เราจึงระบุตัวเลือกไฟล์ java_package
ใน .proto
ดังนี้
option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";
กำหนดประเภทข้อความ
ในproto/routeguide/route_guide.proto
ไฟล์ของซอร์สโค้ด ให้กำหนดPoint
ประเภทข้อความก่อน Point
แสดงคู่พิกัดละติจูดและลองจิจูดบนแผนที่ สำหรับ Codelab นี้ ให้ใช้จำนวนเต็มสำหรับพิกัด
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
หมายเลข 1
และ 2
เป็นหมายเลขรหัสที่ไม่ซ้ำกันสำหรับแต่ละฟิลด์ในโครงสร้าง message
จากนั้นกำหนดFeature
ประเภทข้อความ Feature
ใช้ฟิลด์ string
สำหรับชื่อหรือที่อยู่ไปรษณีย์ของสิ่งหนึ่งๆ ในสถานที่ที่ระบุโดย Point
ดังนี้
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
หากต้องการสตรีมจุดหลายจุดภายในพื้นที่ไปยังไคลเอ็นต์ คุณจะต้องมีRectangle
ข้อความที่แสดงสี่เหลี่ยมผืนผ้าละติจูด-ลองจิจูด ซึ่งแสดงเป็น 2 จุดที่อยู่ตรงข้ามกันในแนวทแยง lo
และ hi
ดังนี้
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
นอกจากนี้ ยังมีข้อความ RouteNote
ที่แสดงถึงข้อความที่ส่งขณะอยู่ที่จุดหนึ่งๆ ด้วย
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
สุดท้าย คุณจะต้องมีRouteSummary
ข้อความ คุณจะได้รับข้อความนี้เป็นการตอบกลับ RPC ของ RecordRoute
ซึ่งจะอธิบายในส่วนถัดไป โดยจะมีจำนวนจุดแต่ละจุดที่ได้รับ จำนวนฟีเจอร์ที่ตรวจพบ และระยะทางทั้งหมดที่ครอบคลุมเป็นผลรวมสะสมของระยะทางระหว่างแต่ละจุด
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
กำหนดวิธีการบริการ
หากต้องการกำหนดบริการ ให้ระบุบริการที่มีชื่อในไฟล์ .proto
ไฟล์ route_guide.proto
มีโครงสร้าง service
ชื่อ RouteGuide
ซึ่งกำหนดวิธีการอย่างน้อย 1 วิธีที่บริการของแอปพลิเคชันมีให้
เมื่อกำหนดRPC
เมธอดภายในคำจำกัดความของบริการ คุณจะระบุประเภทคำขอและการตอบกลับของเมธอดเหล่านั้น ในส่วนนี้ของโค้ดแล็บ เราจะกำหนดค่าต่อไปนี้
ListFeatures
รับออบเจ็กต์ Feature
ที่มีอยู่ภายใน Rectangle
ที่ระบุ ระบบจะสตรีมผลการค้นหาแทนที่จะแสดงผลทั้งหมดในครั้งเดียว เนื่องจากสี่เหลี่ยมผืนผ้าอาจครอบคลุมพื้นที่ขนาดใหญ่และมีฟีเจอร์จำนวนมาก
สำหรับแอปพลิเคชันนี้ คุณจะใช้ RPC แบบสตรีมมิงฝั่งเซิร์ฟเวอร์ ซึ่งไคลเอ็นต์จะส่งคำขอไปยังเซิร์ฟเวอร์และรับสตรีมเพื่ออ่านลำดับข้อความกลับ ไคลเอ็นต์จะอ่านจากสตรีมที่ส่งคืนจนกว่าจะไม่มีข้อความเหลือ ดังที่เห็นในตัวอย่าง คุณระบุวิธีการสตรีมฝั่งเซิร์ฟเวอร์ได้โดยวางคีย์เวิร์ดสตรีมไว้ก่อนประเภทการตอบกลับ
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
ยอมรับสตรีมของจุดในเส้นทางที่กำลังเดินทาง และส่งคืน RouteSummary
เมื่อการเดินทางเสร็จสมบูรณ์
ในกรณีนี้ RPC การสตรีมฝั่งไคลเอ็นต์จะเหมาะสม โดยไคลเอ็นต์จะเขียนลำดับข้อความและส่งไปยังเซิร์ฟเวอร์อีกครั้งโดยใช้สตรีมที่ระบุ เมื่อไคลเอ็นต์เขียนข้อความเสร็จแล้ว ไคลเอ็นต์จะรอให้เซิร์ฟเวอร์อ่านข้อความทั้งหมดและส่งการตอบกลับ คุณระบุวิธีการสตรีมฝั่งไคลเอ็นต์ได้โดยวางคีย์เวิร์ดสตรีมไว้ก่อนประเภทคำขอ
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
ยอมรับสตรีมของ RouteNotes
ที่ส่งขณะที่กำลังเดินทางตามเส้นทาง ขณะเดียวกันก็รับ RouteNotes
อื่นๆ (เช่น จากผู้ใช้รายอื่น)
นี่คือกรณีการใช้งานที่เหมาะกับการสตรีมแบบ 2 ทาง RPC แบบสตรีมมิงแบบ 2 ทางซึ่งทั้ง 2 ฝ่ายจะส่งลำดับข้อความโดยใช้สตรีมแบบอ่าน-เขียน สตรีมทั้ง 2 รายการทำงานแยกกัน ดังนั้นไคลเอ็นต์และเซิร์ฟเวอร์จึงอ่านและเขียนได้ตามลำดับที่ต้องการ เช่น เซิร์ฟเวอร์อาจรอรับข้อความทั้งหมดจากไคลเอ็นต์ก่อนที่จะเขียนการตอบกลับ หรืออาจอ่านข้อความแล้วเขียนข้อความสลับกัน หรืออาจใช้การอ่านและการเขียนแบบอื่นๆ ระบบจะรักษลําดับของข้อความในแต่ละสตรีม คุณระบุประเภทเมธอดนี้ได้โดยวางคีย์เวิร์ดสตรีมไว้ก่อนทั้งคำขอและการตอบกลับ
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. สร้างโค้ดไคลเอ็นต์และเซิร์ฟเวอร์
จากนั้นเราต้องสร้างอินเทอร์เฟซไคลเอ็นต์และเซิร์ฟเวอร์ gRPC จาก.proto
คำจำกัดความของบริการ เราดำเนินการนี้โดยใช้คอมไพเลอร์บัฟเฟอร์โปรโตคอล protoc
ร่วมกับปลั๊กอิน gRPC Java พิเศษ คุณต้องใช้คอมไพเลอร์ proto3 (ซึ่งรองรับทั้งไวยากรณ์ proto2 และ proto3) เพื่อสร้างบริการ gRPC
เมื่อใช้ Gradle หรือ Maven ปลั๊กอินการสร้าง protoc จะสร้างโค้ดที่จำเป็นเป็นส่วนหนึ่งของการสร้างได้ คุณดูวิธีสร้างโค้ดจากไฟล์ .proto
ของตัวเองได้ใน README ของ grpc-java
เราได้ให้การกำหนดค่า Gradle ไว้แล้ว
จากไดเรกทอรี streaming-grpc-java-getting-started
ให้ป้อน
$ chmod +x gradlew $ ./gradlew generateProto
ระบบจะสร้างคลาสต่อไปนี้จากคำจำกัดความของบริการ (ในส่วน build/generated/sources/proto/main/java
)
- โดยมี 1 รายการสำหรับข้อความแต่ละประเภท ได้แก่
Feature.java
,Rectangle.java, ...
ซึ่งมีโค้ด Protocol Buffer ทั้งหมดเพื่อสร้างข้อมูล ทำให้เป็นอนุกรม และเรียกประเภทข้อความคำขอและการตอบกลับของเรา RouteGuideGrpc.java
ซึ่งมี (พร้อมกับโค้ดอื่นๆ ที่มีประโยชน์) คลาสพื้นฐานสำหรับRouteGuide
เซิร์ฟเวอร์ที่จะใช้RouteGuideGrpc.RouteGuideImplBase
โดยมีเมธอดทั้งหมดที่กำหนดไว้ในRouteGuide
คลาสบริการและคลาส Stub สำหรับไคลเอ็นต์ที่จะใช้
5. ติดตั้งใช้งานบริการ
ก่อนอื่นมาดูวิธีสร้างRouteGuide
เซิร์ฟเวอร์กัน การทำให้RouteGuide
บริการของเราทำงานได้ตามที่ควรจะเป็นมี 2 ส่วน ดังนี้
- การติดตั้งใช้งานอินเทอร์เฟซบริการที่สร้างขึ้นจากคำจำกัดความของบริการ: การ "ทำงาน" จริงของบริการ
- เรียกใช้เซิร์ฟเวอร์ gRPC เพื่อรอรับคำขอจากไคลเอ็นต์และส่งคำขอไปยังการติดตั้งใช้งานบริการที่ถูกต้อง
ใช้ RouteGuide
เราจะใช้RouteGuideService
คลาสที่จะขยายคลาส RouteGuideGrpc.RouteGuideImplBase ที่สร้างขึ้น การติดตั้งใช้งานจะมีลักษณะดังนี้
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
...
}
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
...
}
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
...
}
มาดูรายละเอียดการติดตั้งใช้งาน RPC แต่ละรายการกัน
RPC การสตรีมฝั่งเซิร์ฟเวอร์
ต่อไป มาดู RPC แบบสตรีมมิงรายการหนึ่งของเรากัน ListFeatures
เป็น RPC การสตรีมฝั่งเซิร์ฟเวอร์ ดังนั้นเราจึงต้องส่ง Features
หลายรายการกลับไปยังไคลเอ็นต์
private final Collection<Feature> features;
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
เช่นเดียวกับ RPC อย่างง่าย วิธีนี้จะรับออบเจ็กต์คำขอ (Rectangle
ที่ไคลเอ็นต์ต้องการค้นหา Features
) และStreamObserver
เครื่องสังเกตการตอบกลับ
คราวนี้เราจะรับออบเจ็กต์ Feature
ให้ได้มากที่สุดเท่าที่จำเป็นเพื่อส่งคืนไปยังไคลเอ็นต์ (ในกรณีนี้ เราจะเลือกออบเจ็กต์จากคอลเล็กชันฟีเจอร์ของบริการโดยพิจารณาว่าออบเจ็กต์นั้นอยู่ในคำขอของเราหรือไม่ Rectangle
) และเขียนออบเจ็กต์แต่ละรายการตามลำดับไปยังเครื่องมือสังเกตการตอบกลับโดยใช้เมธอด onNext()
ของเครื่องมือ สุดท้ายนี้ เราใช้เมธอด onCompleted()
ของเครื่องสังเกตการตอบกลับเพื่อบอก gRPC ว่าเราเขียนการตอบกลับเสร็จแล้ว เช่นเดียวกับใน RPC อย่างง่าย
RPC การสตรีมฝั่งไคลเอ็นต์
ตอนนี้เรามาดูสิ่งที่ซับซ้อนขึ้นเล็กน้อยกัน นั่นคือวิธีการสตรีมฝั่งไคลเอ็นต์ RecordRoute()
ซึ่งเราจะรับสตรีมของ Points
จากไคลเอ็นต์และส่งคืน RouteSummary
รายการเดียวพร้อมข้อมูลเกี่ยวกับการเดินทาง
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
ดังที่เห็นได้ว่าเมธอดของเราจะรับพารามิเตอร์ StreamObserver
responseObserver
เช่นเดียวกับประเภทเมธอดก่อนหน้า แต่คราวนี้จะส่งคืน StreamObserver
เพื่อให้ไคลเอ็นต์เขียน Points
ในเนื้อหาของเมธอด เราจะสร้างอินสแตนซ์ของ StreamObserver
แบบไม่ระบุชื่อเพื่อส่งคืน ซึ่งเราจะทำดังนี้
- ลบล้างเมธอด
onNext()
เพื่อรับฟีเจอร์และข้อมูลอื่นๆ ทุกครั้งที่ไคลเอ็นต์เขียนPoint
ลงในสตรีมข้อความ - แทนที่เมธอด
onCompleted()
(เรียกใช้เมื่อไคลเอ็นต์เขียนข้อความเสร็จแล้ว) เพื่อป้อนและสร้างRouteSummary
จากนั้นเราจะเรียกใช้onNext()
ของเครื่องมือสังเกตการตอบกลับของเมธอดของเราเองด้วยRouteSummary
และเรียกใช้เมธอดonCompleted()
เพื่อสิ้นสุดการเรียกจากฝั่งเซิร์ฟเวอร์
RPC การสตรีมแบบ 2 ทาง
สุดท้าย มาดู RPC การสตรีมแบบสองทาง RouteChat()
กัน
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
เช่นเดียวกับตัวอย่างการสตรีมฝั่งไคลเอ็นต์ เราทั้งรับและส่งคืน StreamObserver
แต่ครั้งนี้เราจะส่งคืนค่าผ่านเครื่องมือสังเกตการตอบกลับของเมธอดในขณะที่ไคลเอ็นต์ยังคงเขียนข้อความไปยังสตรีมข้อความของตน ไวยากรณ์สำหรับการอ่านและเขียนที่นี่จะเหมือนกับไวยากรณ์สำหรับวิธีการสตรีมมิงฝั่งไคลเอ็นต์และสตรีมมิงฝั่งเซิร์ฟเวอร์ทุกประการ แม้ว่าแต่ละฝ่ายจะได้รับข้อความของอีกฝ่ายตามลำดับที่เขียนเสมอ แต่ทั้งไคลเอ็นต์และเซิร์ฟเวอร์สามารถอ่านและเขียนได้ตามลำดับใดก็ได้ โดยสตรีมจะทำงานอย่างอิสระโดยสมบูรณ์
เริ่มเซิร์ฟเวอร์
เมื่อเราได้ใช้ทุกวิธีแล้ว เราก็ต้องเริ่มเซิร์ฟเวอร์ gRPC เพื่อให้ไคลเอ็นต์ใช้บริการของเราได้จริง ข้อมูลโค้ดต่อไปนี้แสดงวิธีที่เราดำเนินการนี้สำหรับบริการ RouteGuide
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}
/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
}
ดังที่เห็น เราสร้างและเริ่มเซิร์ฟเวอร์โดยใช้ ServerBuilder
เราจะดำเนินการดังกล่าวโดยทำดังนี้
- ระบุที่อยู่และพอร์ตที่เราต้องการใช้เพื่อรอรับคำขอของไคลเอ็นต์โดยใช้เมธอด
forPort()
ของบิลเดอร์ - สร้างอินสแตนซ์ของคลาสการติดตั้งใช้งานบริการ
RouteGuideService
และส่งไปยังเมธอดaddService()
ของ Builder - เรียกใช้
build()
และstart()
ในบิลเดอร์เพื่อสร้างและเริ่มต้นเซิร์ฟเวอร์ RPC สำหรับบริการของเรา
เนื่องจาก ServerBuilder มีพอร์ตอยู่แล้ว เหตุผลเดียวที่เราส่งพอร์ตคือเพื่อใช้ในการบันทึก
6. สร้างไคลเอ็นต์
ในส่วนนี้ เราจะมาดูการสร้างไคลเอ็นต์สำหรับบริการ RouteGuide
คุณดูโค้ดไคลเอ็นต์ตัวอย่างที่สมบูรณ์ได้ใน ../complete/src/main/java/io/grpc/complete/routeguide/
RouteGuideClient.java
สร้างอินสแตนซ์ของ Stub
หากต้องการเรียกใช้เมธอดบริการ เราต้องสร้างสตับก่อน หรือจะสร้าง 2 สตับก็ได้
- สแต็บบล็อก/ซิงโครนัส: หมายความว่าการเรียก RPC จะรอให้เซิร์ฟเวอร์ตอบกลับ และจะส่งคืนการตอบกลับหรือยกเว้น
- สตับที่ไม่บล็อก/แบบอะซิงโครนัสที่ทำการเรียกที่ไม่บล็อกไปยังเซิร์ฟเวอร์ ซึ่งจะส่งคืนการตอบกลับแบบอะซิงโครนัส คุณจะโทรแบบสตรีมมิงบางประเภทได้โดยใช้ Stub แบบไม่พร้อมกันเท่านั้น
ก่อนอื่นเราต้องสร้างแชแนล gRPC สำหรับสตับ โดยระบุที่อยู่เซิร์ฟเวอร์และพอร์ตที่เราต้องการเชื่อมต่อ
public static void main(String[] args) throws InterruptedException {
String target = "localhost:8980";
if (args.length > 0) {
if ("--help".equals(args[0])) {
System.err.println("Usage: [target]");
System.err.println("");
System.err.println(" target The server to connect to. Defaults to " + target);
System.exit(1);
}
target = args[0];
}
List<Feature> features;
try {
features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
} catch (IOException ex) {
ex.printStackTrace();
return;
}
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
.build();
try {
RouteGuideClient client = new RouteGuideClient(channel);
// Looking for features between 40, -75 and 42, -73.
client.listFeatures(400000000, -750000000, 420000000, -730000000);
// Record a few randomly selected points from the features file.
client.recordRoute(features, 10);
// Send and receive some notes.
CountDownLatch finishLatch = client.routeChat();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
client.warning("routeChat did not finish within 1 minutes");
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
เราใช้ManagedChannelBuilder
เพื่อสร้างช่อง
ตอนนี้เราสามารถใช้แชแนลเพื่อสร้าง Stub โดยใช้เมธอด newStub
และ newBlockingStub
ที่ระบุไว้ในคลาส RouteGuideGrpc
ที่เราสร้างจาก .proto
public RouteGuideClient(Channel channel) {
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
โปรดทราบว่าหากไม่ได้บล็อก ก็จะเป็นแบบไม่พร้อมกัน
วิธีการเรียกใช้บริการ
ตอนนี้มาดูวิธีเรียกใช้เมธอดบริการกัน โปรดทราบว่า RPC ที่สร้างจาก Stub ที่บล็อกจะทํางานในโหมดบล็อก/ซิงโครนัส ซึ่งหมายความว่าการเรียก RPC จะรอให้เซิร์ฟเวอร์ตอบกลับ และจะส่งคืนการตอบกลับหรือข้อผิดพลาด
RPC การสตรีมฝั่งเซิร์ฟเวอร์
ต่อไป เราจะมาดูการเรียกสตรีมฝั่งเซิร์ฟเวอร์ไปยัง ListFeatures
ซึ่งจะแสดงผลสตรีมของFeature
ทางภูมิศาสตร์
Rectangle request = Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
ดังที่เห็นได้ว่าฟังก์ชันนี้คล้ายกับ RPC แบบเอกภาคอย่างง่ายที่เราดูใน Codelab Getting_Started_With_gRPC_Java มาก เพียงแต่แทนที่จะส่งคืน Feature
รายการเดียว เมธอดจะส่งคืน Iterator
ที่ไคลเอ็นต์ใช้เพื่ออ่าน Features
ทั้งหมดที่ส่งคืนได้
RPC การสตรีมฝั่งไคลเอ็นต์
ตอนนี้มาดูวิธีที่ซับซ้อนขึ้นเล็กน้อยกัน นั่นคือวิธีการสตรีมฝั่งไคลเอ็นต์ RecordRoute
ซึ่งเราจะส่งสตรีมของ Points
ไปยังเซิร์ฟเวอร์และรับ RouteSummary
กลับมา สำหรับวิธีนี้ เราต้องใช้ Stub Asynchronous หากคุณเคยอ่านการสร้างเซิร์ฟเวอร์แล้ว คุณอาจคุ้นเคยกับเนื้อหาบางส่วนนี้เป็นอย่างดี เนื่องจาก RPC แบบสตรีมมิงแบบไม่พร้อมกันจะได้รับการติดตั้งใช้งานในลักษณะที่คล้ายกันทั้ง 2 ฝั่ง
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
ดังที่เห็นได้ว่าในการเรียกใช้เมธอดนี้ เราต้องสร้าง StreamObserver
ซึ่งใช้การติดตั้งใช้งานอินเทอร์เฟซพิเศษสำหรับเซิร์ฟเวอร์เพื่อเรียกใช้ด้วยการตอบกลับ RouteSummary
ในStreamObserver
เราทำสิ่งต่อไปนี้
- ลบล้างเมธอด
onNext()
เพื่อพิมพ์ข้อมูลที่ส่งคืนเมื่อเซิร์ฟเวอร์เขียนRouteSummary
ลงในสตรีมข้อความ - แทนที่เมธอด
onCompleted()
(เรียกใช้เมื่อ server ดำเนินการเรียกใช้ในฝั่งของตนเองเสร็จแล้ว) เพื่อลดCountDownLatch
เพื่อให้เราตรวจสอบได้ว่าเซิร์ฟเวอร์เขียนเสร็จแล้วหรือไม่
จากนั้นเราจะส่ง StreamObserver
ไปยังเมธอด recordRoute()
ของสแต็บแบบอะซิงโครนัส และรับเครื่องสังเกตการณ์คำขอ StreamObserver
ของเราเองกลับมาเพื่อเขียน Points
ที่จะส่งไปยังเซิร์ฟเวอร์ เมื่อเขียนคะแนนเสร็จแล้ว เราจะใช้วิธี onCompleted()
ของเครื่องสังเกตคำขอเพื่อบอก gRPC ว่าเราเขียนฝั่งไคลเอ็นต์เสร็จแล้ว เมื่อเสร็จแล้ว เราจะตรวจสอบ CountDownLatch
เพื่อดูว่าเซิร์ฟเวอร์ดำเนินการเสร็จสมบูรณ์ในฝั่งของตนเองหรือไม่
RPC การสตรีมแบบ 2 ทาง
สุดท้าย มาดู RPC การสตรีมแบบสองทาง RouteChat()
กัน
public CountDownLatch routeChat() {
info("*** RouteChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@Override
public void onError(Throwable t) {
warning("RouteChat Failed: {0}", Status.fromThrowable(t));
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// return the latch while receiving happens asynchronously
return finishLatch;
}
เช่นเดียวกับตัวอย่างการสตรีมฝั่งไคลเอ็นต์ เราจะได้รับและส่งคืน StreamObserver
ResponseObserver ยกเว้นในครั้งนี้ที่เราจะส่งค่าผ่าน ResponseObserver ของเมธอดในขณะที่เซิร์ฟเวอร์ยังคงเขียนข้อความไปยังสตรีมข้อความของเซิร์ฟเวอร์ ไวยากรณ์สำหรับการอ่านและเขียนที่นี่จะเหมือนกับวิธีการสตรีมมิงแบบไคลเอ็นต์ของเราทุกประการ แม้ว่าแต่ละฝ่ายจะได้รับข้อความของอีกฝ่ายตามลำดับที่เขียนเสมอ แต่ทั้งไคลเอ็นต์และเซิร์ฟเวอร์สามารถอ่านและเขียนได้ตามลำดับใดก็ได้ โดยสตรีมจะทำงานอย่างอิสระโดยสมบูรณ์
7. ลองใช้เลย!
- จากไดเรกทอรี
start_here
ให้ทำดังนี้
$ ./gradlew installDist
ซึ่งจะคอมไพล์โค้ด แพ็กเกจในไฟล์ JAR และสร้างสคริปต์ที่เรียกใช้ตัวอย่าง ระบบจะสร้างไฟล์เหล่านี้ในไดเรกทอรี build/install/start_here/bin/
สคริปต์คือ route-guide-server
และ route-guide-client
เซิร์ฟเวอร์ต้องทำงานก่อนจึงจะเริ่มไคลเอ็นต์ได้
- เรียกใช้เซิร์ฟเวอร์
$ ./build/install/start_here/bin/route-guide-server
- เรียกใช้ไคลเอ็นต์
$ ./build/install/start_here/bin/route-guide-client
8. ขั้นตอนถัดไป
- ดูวิธีการทำงานของ gRPC ในข้อมูลเบื้องต้นเกี่ยวกับ gRPC และแนวคิดหลัก
- ดูบทแนะนำเกี่ยวกับพื้นฐาน
- สำรวจข้อมูลอ้างอิงของ API