เริ่มต้นใช้งาน gRPC-Java - การสตรีม

1. บทนำ

ในโค้ดแล็บนี้ คุณจะได้ใช้ gRPC-Java เพื่อสร้างไคลเอ็นต์และเซิร์ฟเวอร์ซึ่งเป็นรากฐานของแอปพลิเคชันการแมปเส้นทางที่เขียนด้วย Java

เมื่อจบบทแนะนำนี้ คุณจะมีไคลเอ็นต์ที่เชื่อมต่อกับเซิร์ฟเวอร์ระยะไกลโดยใช้ gRPC เพื่อรับข้อมูลเกี่ยวกับฟีเจอร์ในเส้นทางของไคลเอ็นต์ สร้างข้อมูลสรุปของเส้นทางของไคลเอ็นต์ และแลกเปลี่ยนข้อมูลเส้นทาง เช่น ข้อมูลอัปเดตการจราจร กับเซิร์ฟเวอร์และไคลเอ็นต์อื่นๆ

บริการนี้กำหนดไว้ในไฟล์ Protocol Buffers ซึ่งจะใช้เพื่อสร้างโค้ดมาตรฐานสำหรับไคลเอ็นต์และเซิร์ฟเวอร์เพื่อให้สื่อสารกันได้ ซึ่งจะช่วยประหยัดเวลาและแรงในการติดตั้งใช้งานฟังก์ชันดังกล่าว

โค้ดที่สร้างขึ้นนี้ไม่เพียงแต่จัดการความซับซ้อนของการสื่อสารระหว่างเซิร์ฟเวอร์และไคลเอ็นต์เท่านั้น แต่ยังจัดการการซีเรียลไลซ์และการดีซีเรียลไลซ์ข้อมูลด้วย

สิ่งที่คุณจะได้เรียนรู้

  • วิธีใช้ Protocol Buffers เพื่อกำหนด API ของบริการ
  • วิธีสร้างไคลเอ็นต์และเซิร์ฟเวอร์ที่ใช้ gRPC จากคำจำกัดความของ Protocol Buffers โดยใช้การสร้างโค้ดอัตโนมัติ
  • ความเข้าใจเกี่ยวกับการสื่อสารแบบสตรีมมิงไคลเอ็นต์-เซิร์ฟเวอร์ด้วย gRPC

Codelab นี้มีไว้สำหรับนักพัฒนาแอป Java ที่เพิ่งเริ่มใช้ gRPC หรือต้องการทบทวน gRPC หรือใครก็ตามที่สนใจสร้างระบบแบบกระจาย ไม่จำเป็นต้องมีประสบการณ์เกี่ยวกับ gRPC มาก่อน

2. ก่อนเริ่มต้น

ข้อกำหนดเบื้องต้น

  • JDK เวอร์ชัน 24

รับโค้ด

Codelab นี้มีโครงสร้างของซอร์สโค้ดของแอปพลิเคชันเพื่อให้คุณทำต่อได้ คุณจึงไม่ต้องเริ่มต้นจากศูนย์ ขั้นตอนต่อไปนี้จะแสดงวิธีส่งแอปพลิเคชันให้เสร็จสมบูรณ์ ซึ่งรวมถึงการใช้ปลั๊กอินคอมไพเลอร์ Protocol Buffer เพื่อสร้างโค้ด gRPC ที่ซ้ำกัน

ก่อนอื่น ให้สร้างไดเรกทอรีการทำงานของ Codelab แล้วใช้คำสั่ง cd เพื่อเข้าไปในไดเรกทอรีดังกล่าว

mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started

ดาวน์โหลดและแตกไฟล์ Codelab โดยทำดังนี้

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-java-streaming/start_here

หรือคุณจะดาวน์โหลดไฟล์ .zip ที่มีเฉพาะไดเรกทอรี Codelab แล้วแตกไฟล์ด้วยตนเองก็ได้

ซอร์สโค้ดที่เสร็จสมบูรณ์แล้วพร้อมใช้งานใน GitHub หากคุณไม่ต้องการพิมพ์การติดตั้งใช้งาน

3. กำหนดข้อความและบริการ

ขั้นตอนแรกคือการกำหนดบริการ gRPC ของแอปพลิเคชัน เมธอด RPC รวมถึงประเภทข้อความคำขอและการตอบกลับโดยใช้ Protocol Buffers บริการของคุณจะให้ข้อมูลต่อไปนี้

  • เมธอด RPC ที่เรียกว่า ListFeatures, RecordRoute และ RouteChat ซึ่งเซิร์ฟเวอร์ใช้และไคลเอ็นต์เรียก
  • ประเภทข้อความ Point, Feature, Rectangle, RouteNote และ RouteSummary ซึ่งเป็นโครงสร้างข้อมูลที่แลกเปลี่ยนระหว่างไคลเอ็นต์และเซิร์ฟเวอร์เมื่อเรียกใช้เมธอดข้างต้น

Protocol Buffers เรียกกันโดยทั่วไปว่า protobuf ดูข้อมูลเพิ่มเติมเกี่ยวกับคำศัพท์ gRPC ได้ที่แนวคิดหลัก สถาปัตยกรรม และวงจรของ gRPC

วิธีการ RPC นี้และประเภทข้อความของวิธีการนี้จะกำหนดไว้ในไฟล์ proto/routeguide/route_guide.proto ของซอร์สโค้ดที่ระบุ

มาสร้างไฟล์ route_guide.proto กัน

เนื่องจากเราจะสร้างโค้ด Java ในตัวอย่างนี้ เราจึงระบุตัวเลือกไฟล์ java_package ใน .proto ดังนี้

option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";

กำหนดประเภทข้อความ

ในproto/routeguide/route_guide.protoไฟล์ของซอร์สโค้ด ให้กำหนดPointประเภทข้อความก่อน Point แสดงคู่พิกัดละติจูดและลองจิจูดบนแผนที่ สำหรับ Codelab นี้ ให้ใช้จำนวนเต็มสำหรับพิกัด

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

หมายเลข 1 และ 2 เป็นหมายเลขรหัสที่ไม่ซ้ำกันสำหรับแต่ละฟิลด์ในโครงสร้าง message

จากนั้นกำหนดFeatureประเภทข้อความ Feature ใช้ฟิลด์ string สำหรับชื่อหรือที่อยู่ไปรษณีย์ของสิ่งหนึ่งๆ ในสถานที่ที่ระบุโดย Point ดังนี้

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

หากต้องการสตรีมจุดหลายจุดภายในพื้นที่ไปยังไคลเอ็นต์ คุณจะต้องมีRectangleข้อความที่แสดงสี่เหลี่ยมผืนผ้าละติจูด-ลองจิจูด ซึ่งแสดงเป็น 2 จุดที่อยู่ตรงข้ามกันในแนวทแยง lo และ hi ดังนี้

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

นอกจากนี้ ยังมีข้อความ RouteNote ที่แสดงถึงข้อความที่ส่งขณะอยู่ที่จุดหนึ่งๆ ด้วย

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

สุดท้าย คุณจะต้องมีRouteSummaryข้อความ คุณจะได้รับข้อความนี้เป็นการตอบกลับ RPC ของ RecordRoute ซึ่งจะอธิบายในส่วนถัดไป โดยจะมีจำนวนจุดแต่ละจุดที่ได้รับ จำนวนฟีเจอร์ที่ตรวจพบ และระยะทางทั้งหมดที่ครอบคลุมเป็นผลรวมสะสมของระยะทางระหว่างแต่ละจุด

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

กำหนดวิธีการบริการ

หากต้องการกำหนดบริการ ให้ระบุบริการที่มีชื่อในไฟล์ .proto ไฟล์ route_guide.proto มีโครงสร้าง service ชื่อ RouteGuide ซึ่งกำหนดวิธีการอย่างน้อย 1 วิธีที่บริการของแอปพลิเคชันมีให้

เมื่อกำหนดRPCเมธอดภายในคำจำกัดความของบริการ คุณจะระบุประเภทคำขอและการตอบกลับของเมธอดเหล่านั้น ในส่วนนี้ของโค้ดแล็บ เราจะกำหนดค่าต่อไปนี้

ListFeatures

รับออบเจ็กต์ Feature ที่มีอยู่ภายใน Rectangle ที่ระบุ ระบบจะสตรีมผลการค้นหาแทนที่จะแสดงผลทั้งหมดในครั้งเดียว เนื่องจากสี่เหลี่ยมผืนผ้าอาจครอบคลุมพื้นที่ขนาดใหญ่และมีฟีเจอร์จำนวนมาก

สำหรับแอปพลิเคชันนี้ คุณจะใช้ RPC แบบสตรีมมิงฝั่งเซิร์ฟเวอร์ ซึ่งไคลเอ็นต์จะส่งคำขอไปยังเซิร์ฟเวอร์และรับสตรีมเพื่ออ่านลำดับข้อความกลับ ไคลเอ็นต์จะอ่านจากสตรีมที่ส่งคืนจนกว่าจะไม่มีข้อความเหลือ ดังที่เห็นในตัวอย่าง คุณระบุวิธีการสตรีมฝั่งเซิร์ฟเวอร์ได้โดยวางคีย์เวิร์ดสตรีมไว้ก่อนประเภทการตอบกลับ

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

ยอมรับสตรีมของจุดในเส้นทางที่กำลังเดินทาง และส่งคืน RouteSummary เมื่อการเดินทางเสร็จสมบูรณ์

ในกรณีนี้ RPC การสตรีมฝั่งไคลเอ็นต์จะเหมาะสม โดยไคลเอ็นต์จะเขียนลำดับข้อความและส่งไปยังเซิร์ฟเวอร์อีกครั้งโดยใช้สตรีมที่ระบุ เมื่อไคลเอ็นต์เขียนข้อความเสร็จแล้ว ไคลเอ็นต์จะรอให้เซิร์ฟเวอร์อ่านข้อความทั้งหมดและส่งการตอบกลับ คุณระบุวิธีการสตรีมฝั่งไคลเอ็นต์ได้โดยวางคีย์เวิร์ดสตรีมไว้ก่อนประเภทคำขอ

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

ยอมรับสตรีมของ RouteNotes ที่ส่งขณะที่กำลังเดินทางตามเส้นทาง ขณะเดียวกันก็รับ RouteNotes อื่นๆ (เช่น จากผู้ใช้รายอื่น)

นี่คือกรณีการใช้งานที่เหมาะกับการสตรีมแบบ 2 ทาง RPC แบบสตรีมมิงแบบ 2 ทางซึ่งทั้ง 2 ฝ่ายจะส่งลำดับข้อความโดยใช้สตรีมแบบอ่าน-เขียน สตรีมทั้ง 2 รายการทำงานแยกกัน ดังนั้นไคลเอ็นต์และเซิร์ฟเวอร์จึงอ่านและเขียนได้ตามลำดับที่ต้องการ เช่น เซิร์ฟเวอร์อาจรอรับข้อความทั้งหมดจากไคลเอ็นต์ก่อนที่จะเขียนการตอบกลับ หรืออาจอ่านข้อความแล้วเขียนข้อความสลับกัน หรืออาจใช้การอ่านและการเขียนแบบอื่นๆ ระบบจะรักษลําดับของข้อความในแต่ละสตรีม คุณระบุประเภทเมธอดนี้ได้โดยวางคีย์เวิร์ดสตรีมไว้ก่อนทั้งคำขอและการตอบกลับ

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. สร้างโค้ดไคลเอ็นต์และเซิร์ฟเวอร์

จากนั้นเราต้องสร้างอินเทอร์เฟซไคลเอ็นต์และเซิร์ฟเวอร์ gRPC จาก.protoคำจำกัดความของบริการ เราดำเนินการนี้โดยใช้คอมไพเลอร์บัฟเฟอร์โปรโตคอล protoc ร่วมกับปลั๊กอิน gRPC Java พิเศษ คุณต้องใช้คอมไพเลอร์ proto3 (ซึ่งรองรับทั้งไวยากรณ์ proto2 และ proto3) เพื่อสร้างบริการ gRPC

เมื่อใช้ Gradle หรือ Maven ปลั๊กอินการสร้าง protoc จะสร้างโค้ดที่จำเป็นเป็นส่วนหนึ่งของการสร้างได้ คุณดูวิธีสร้างโค้ดจากไฟล์ .proto ของตัวเองได้ใน README ของ grpc-java

เราได้ให้การกำหนดค่า Gradle ไว้แล้ว

จากไดเรกทอรี streaming-grpc-java-getting-started ให้ป้อน

$ chmod +x gradlew
$ ./gradlew generateProto

ระบบจะสร้างคลาสต่อไปนี้จากคำจำกัดความของบริการ (ในส่วน build/generated/sources/proto/main/java)

  • โดยมี 1 รายการสำหรับข้อความแต่ละประเภท ได้แก่ Feature.java, Rectangle.java, ... ซึ่งมีโค้ด Protocol Buffer ทั้งหมดเพื่อสร้างข้อมูล ทำให้เป็นอนุกรม และเรียกประเภทข้อความคำขอและการตอบกลับของเรา
  • RouteGuideGrpc.java ซึ่งมี (พร้อมกับโค้ดอื่นๆ ที่มีประโยชน์) คลาสพื้นฐานสำหรับRouteGuideเซิร์ฟเวอร์ที่จะใช้ RouteGuideGrpc.RouteGuideImplBase โดยมีเมธอดทั้งหมดที่กำหนดไว้ในRouteGuideคลาสบริการและคลาส Stub สำหรับไคลเอ็นต์ที่จะใช้

5. ติดตั้งใช้งานบริการ

ก่อนอื่นมาดูวิธีสร้างRouteGuideเซิร์ฟเวอร์กัน การทำให้RouteGuideบริการของเราทำงานได้ตามที่ควรจะเป็นมี 2 ส่วน ดังนี้

  • การติดตั้งใช้งานอินเทอร์เฟซบริการที่สร้างขึ้นจากคำจำกัดความของบริการ: การ "ทำงาน" จริงของบริการ
  • เรียกใช้เซิร์ฟเวอร์ gRPC เพื่อรอรับคำขอจากไคลเอ็นต์และส่งคำขอไปยังการติดตั้งใช้งานบริการที่ถูกต้อง

ใช้ RouteGuide

เราจะใช้RouteGuideServiceคลาสที่จะขยายคลาส RouteGuideGrpc.RouteGuideImplBase ที่สร้างขึ้น การติดตั้งใช้งานจะมีลักษณะดังนี้

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
        ...
}

public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {

        ...
}

public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {

        ...
}

มาดูรายละเอียดการติดตั้งใช้งาน RPC แต่ละรายการกัน

RPC การสตรีมฝั่งเซิร์ฟเวอร์

ต่อไป มาดู RPC แบบสตรีมมิงรายการหนึ่งของเรากัน ListFeatures เป็น RPC การสตรีมฝั่งเซิร์ฟเวอร์ ดังนั้นเราจึงต้องส่ง Features หลายรายการกลับไปยังไคลเอ็นต์

private final Collection<Feature> features;

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

เช่นเดียวกับ RPC อย่างง่าย วิธีนี้จะรับออบเจ็กต์คำขอ (Rectangle ที่ไคลเอ็นต์ต้องการค้นหา Features) และStreamObserverเครื่องสังเกตการตอบกลับ

คราวนี้เราจะรับออบเจ็กต์ Feature ให้ได้มากที่สุดเท่าที่จำเป็นเพื่อส่งคืนไปยังไคลเอ็นต์ (ในกรณีนี้ เราจะเลือกออบเจ็กต์จากคอลเล็กชันฟีเจอร์ของบริการโดยพิจารณาว่าออบเจ็กต์นั้นอยู่ในคำขอของเราหรือไม่ Rectangle) และเขียนออบเจ็กต์แต่ละรายการตามลำดับไปยังเครื่องมือสังเกตการตอบกลับโดยใช้เมธอด onNext() ของเครื่องมือ สุดท้ายนี้ เราใช้เมธอด onCompleted() ของเครื่องสังเกตการตอบกลับเพื่อบอก gRPC ว่าเราเขียนการตอบกลับเสร็จแล้ว เช่นเดียวกับใน RPC อย่างง่าย

RPC การสตรีมฝั่งไคลเอ็นต์

ตอนนี้เรามาดูสิ่งที่ซับซ้อนขึ้นเล็กน้อยกัน นั่นคือวิธีการสตรีมฝั่งไคลเอ็นต์ RecordRoute() ซึ่งเราจะรับสตรีมของ Points จากไคลเอ็นต์และส่งคืน RouteSummary รายการเดียวพร้อมข้อมูลเกี่ยวกับการเดินทาง

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

ดังที่เห็นได้ว่าเมธอดของเราจะรับพารามิเตอร์ StreamObserver responseObserver เช่นเดียวกับประเภทเมธอดก่อนหน้า แต่คราวนี้จะส่งคืน StreamObserver เพื่อให้ไคลเอ็นต์เขียน Points

ในเนื้อหาของเมธอด เราจะสร้างอินสแตนซ์ของ StreamObserver แบบไม่ระบุชื่อเพื่อส่งคืน ซึ่งเราจะทำดังนี้

  • ลบล้างเมธอด onNext() เพื่อรับฟีเจอร์และข้อมูลอื่นๆ ทุกครั้งที่ไคลเอ็นต์เขียน Point ลงในสตรีมข้อความ
  • แทนที่เมธอด onCompleted() (เรียกใช้เมื่อไคลเอ็นต์เขียนข้อความเสร็จแล้ว) เพื่อป้อนและสร้าง RouteSummary จากนั้นเราจะเรียกใช้ onNext() ของเครื่องมือสังเกตการตอบกลับของเมธอดของเราเองด้วย RouteSummary และเรียกใช้เมธอด onCompleted() เพื่อสิ้นสุดการเรียกจากฝั่งเซิร์ฟเวอร์

RPC การสตรีมแบบ 2 ทาง

สุดท้าย มาดู RPC การสตรีมแบบสองทาง RouteChat() กัน

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

เช่นเดียวกับตัวอย่างการสตรีมฝั่งไคลเอ็นต์ เราทั้งรับและส่งคืน StreamObserver แต่ครั้งนี้เราจะส่งคืนค่าผ่านเครื่องมือสังเกตการตอบกลับของเมธอดในขณะที่ไคลเอ็นต์ยังคงเขียนข้อความไปยังสตรีมข้อความของตน ไวยากรณ์สำหรับการอ่านและเขียนที่นี่จะเหมือนกับไวยากรณ์สำหรับวิธีการสตรีมมิงฝั่งไคลเอ็นต์และสตรีมมิงฝั่งเซิร์ฟเวอร์ทุกประการ แม้ว่าแต่ละฝ่ายจะได้รับข้อความของอีกฝ่ายตามลำดับที่เขียนเสมอ แต่ทั้งไคลเอ็นต์และเซิร์ฟเวอร์สามารถอ่านและเขียนได้ตามลำดับใดก็ได้ โดยสตรีมจะทำงานอย่างอิสระโดยสมบูรณ์

เริ่มเซิร์ฟเวอร์

เมื่อเราได้ใช้ทุกวิธีแล้ว เราก็ต้องเริ่มเซิร์ฟเวอร์ gRPC เพื่อให้ไคลเอ็นต์ใช้บริการของเราได้จริง ข้อมูลโค้ดต่อไปนี้แสดงวิธีที่เราดำเนินการนี้สำหรับบริการ RouteGuide

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
}

ดังที่เห็น เราสร้างและเริ่มเซิร์ฟเวอร์โดยใช้ ServerBuilder

เราจะดำเนินการดังกล่าวโดยทำดังนี้

  1. ระบุที่อยู่และพอร์ตที่เราต้องการใช้เพื่อรอรับคำขอของไคลเอ็นต์โดยใช้เมธอด forPort() ของบิลเดอร์
  2. สร้างอินสแตนซ์ของคลาสการติดตั้งใช้งานบริการ RouteGuideService และส่งไปยังเมธอด addService() ของ Builder
  3. เรียกใช้ build() และ start() ในบิลเดอร์เพื่อสร้างและเริ่มต้นเซิร์ฟเวอร์ RPC สำหรับบริการของเรา

เนื่องจาก ServerBuilder มีพอร์ตอยู่แล้ว เหตุผลเดียวที่เราส่งพอร์ตคือเพื่อใช้ในการบันทึก

6. สร้างไคลเอ็นต์

ในส่วนนี้ เราจะมาดูการสร้างไคลเอ็นต์สำหรับบริการ RouteGuide คุณดูโค้ดไคลเอ็นต์ตัวอย่างที่สมบูรณ์ได้ใน ../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java

สร้างอินสแตนซ์ของ Stub

หากต้องการเรียกใช้เมธอดบริการ เราต้องสร้างสตับก่อน หรือจะสร้าง 2 สตับก็ได้

  • สแต็บบล็อก/ซิงโครนัส: หมายความว่าการเรียก RPC จะรอให้เซิร์ฟเวอร์ตอบกลับ และจะส่งคืนการตอบกลับหรือยกเว้น
  • สตับที่ไม่บล็อก/แบบอะซิงโครนัสที่ทำการเรียกที่ไม่บล็อกไปยังเซิร์ฟเวอร์ ซึ่งจะส่งคืนการตอบกลับแบบอะซิงโครนัส คุณจะโทรแบบสตรีมมิงบางประเภทได้โดยใช้ Stub แบบไม่พร้อมกันเท่านั้น

ก่อนอื่นเราต้องสร้างแชแนล gRPC สำหรับสตับ โดยระบุที่อยู่เซิร์ฟเวอร์และพอร์ตที่เราต้องการเชื่อมต่อ

  public static void main(String[] args) throws InterruptedException {
    String target = "localhost:8980";
    if (args.length > 0) {
      if ("--help".equals(args[0])) {
        System.err.println("Usage: [target]");
        System.err.println("");
        System.err.println("  target  The server to connect to. Defaults to " + target);
        System.exit(1);
      }
      target = args[0];
    }

    List<Feature> features;
    try {
      features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
        .build();
    try {
      RouteGuideClient client = new RouteGuideClient(channel);

      // Looking for features between 40, -75 and 42, -73.
      client.listFeatures(400000000, -750000000, 420000000, -730000000);

      // Record a few randomly selected points from the features file.
      client.recordRoute(features, 10);

      // Send and receive some notes.
      CountDownLatch finishLatch = client.routeChat();

      if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        client.warning("routeChat did not finish within 1 minutes");
      }
    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }

เราใช้ManagedChannelBuilderเพื่อสร้างช่อง

ตอนนี้เราสามารถใช้แชแนลเพื่อสร้าง Stub โดยใช้เมธอด newStub และ newBlockingStub ที่ระบุไว้ในคลาส RouteGuideGrpc ที่เราสร้างจาก .proto

public RouteGuideClient(Channel channel) {
    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
  }

โปรดทราบว่าหากไม่ได้บล็อก ก็จะเป็นแบบไม่พร้อมกัน

วิธีการเรียกใช้บริการ

ตอนนี้มาดูวิธีเรียกใช้เมธอดบริการกัน โปรดทราบว่า RPC ที่สร้างจาก Stub ที่บล็อกจะทํางานในโหมดบล็อก/ซิงโครนัส ซึ่งหมายความว่าการเรียก RPC จะรอให้เซิร์ฟเวอร์ตอบกลับ และจะส่งคืนการตอบกลับหรือข้อผิดพลาด

RPC การสตรีมฝั่งเซิร์ฟเวอร์

ต่อไป เราจะมาดูการเรียกสตรีมฝั่งเซิร์ฟเวอร์ไปยัง ListFeatures ซึ่งจะแสดงผลสตรีมของFeatureทางภูมิศาสตร์

Rectangle request = Rectangle.newBuilder()
             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

ดังที่เห็นได้ว่าฟังก์ชันนี้คล้ายกับ RPC แบบเอกภาคอย่างง่ายที่เราดูใน Codelab Getting_Started_With_gRPC_Java มาก เพียงแต่แทนที่จะส่งคืน Feature รายการเดียว เมธอดจะส่งคืน Iterator ที่ไคลเอ็นต์ใช้เพื่ออ่าน Features ทั้งหมดที่ส่งคืนได้

RPC การสตรีมฝั่งไคลเอ็นต์

ตอนนี้มาดูวิธีที่ซับซ้อนขึ้นเล็กน้อยกัน นั่นคือวิธีการสตรีมฝั่งไคลเอ็นต์ RecordRoute ซึ่งเราจะส่งสตรีมของ Points ไปยังเซิร์ฟเวอร์และรับ RouteSummary กลับมา สำหรับวิธีนี้ เราต้องใช้ Stub Asynchronous หากคุณเคยอ่านการสร้างเซิร์ฟเวอร์แล้ว คุณอาจคุ้นเคยกับเนื้อหาบางส่วนนี้เป็นอย่างดี เนื่องจาก RPC แบบสตรีมมิงแบบไม่พร้อมกันจะได้รับการติดตั้งใช้งานในลักษณะที่คล้ายกันทั้ง 2 ฝั่ง

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {

    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

ดังที่เห็นได้ว่าในการเรียกใช้เมธอดนี้ เราต้องสร้าง StreamObserver ซึ่งใช้การติดตั้งใช้งานอินเทอร์เฟซพิเศษสำหรับเซิร์ฟเวอร์เพื่อเรียกใช้ด้วยการตอบกลับ RouteSummary ในStreamObserver เราทำสิ่งต่อไปนี้

  • ลบล้างเมธอด onNext() เพื่อพิมพ์ข้อมูลที่ส่งคืนเมื่อเซิร์ฟเวอร์เขียน RouteSummary ลงในสตรีมข้อความ
  • แทนที่เมธอด onCompleted() (เรียกใช้เมื่อ server ดำเนินการเรียกใช้ในฝั่งของตนเองเสร็จแล้ว) เพื่อลด CountDownLatch เพื่อให้เราตรวจสอบได้ว่าเซิร์ฟเวอร์เขียนเสร็จแล้วหรือไม่

จากนั้นเราจะส่ง StreamObserver ไปยังเมธอด recordRoute() ของสแต็บแบบอะซิงโครนัส และรับเครื่องสังเกตการณ์คำขอ StreamObserver ของเราเองกลับมาเพื่อเขียน Points ที่จะส่งไปยังเซิร์ฟเวอร์ เมื่อเขียนคะแนนเสร็จแล้ว เราจะใช้วิธี onCompleted() ของเครื่องสังเกตคำขอเพื่อบอก gRPC ว่าเราเขียนฝั่งไคลเอ็นต์เสร็จแล้ว เมื่อเสร็จแล้ว เราจะตรวจสอบ CountDownLatch เพื่อดูว่าเซิร์ฟเวอร์ดำเนินการเสร็จสมบูรณ์ในฝั่งของตนเองหรือไม่

RPC การสตรีมแบบ 2 ทาง

สุดท้าย มาดู RPC การสตรีมแบบสองทาง RouteChat() กัน

public CountDownLatch routeChat() {
    info("*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat(new StreamObserver<RouteNote>() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          }

          @Override
          public void onError(Throwable t) {
            warning("RouteChat Failed: {0}", Status.fromThrowable(t));
            finishLatch.countDown();
          }

          @Override
          public void onCompleted() {
            info("Finished RouteChat");
            finishLatch.countDown();
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
              newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // return the latch while receiving happens asynchronously
    return finishLatch;
  }

เช่นเดียวกับตัวอย่างการสตรีมฝั่งไคลเอ็นต์ เราจะได้รับและส่งคืน StreamObserver ResponseObserver ยกเว้นในครั้งนี้ที่เราจะส่งค่าผ่าน ResponseObserver ของเมธอดในขณะที่เซิร์ฟเวอร์ยังคงเขียนข้อความไปยังสตรีมข้อความของเซิร์ฟเวอร์ ไวยากรณ์สำหรับการอ่านและเขียนที่นี่จะเหมือนกับวิธีการสตรีมมิงแบบไคลเอ็นต์ของเราทุกประการ แม้ว่าแต่ละฝ่ายจะได้รับข้อความของอีกฝ่ายตามลำดับที่เขียนเสมอ แต่ทั้งไคลเอ็นต์และเซิร์ฟเวอร์สามารถอ่านและเขียนได้ตามลำดับใดก็ได้ โดยสตรีมจะทำงานอย่างอิสระโดยสมบูรณ์

7. ลองใช้เลย!

  1. จากไดเรกทอรี start_here ให้ทำดังนี้
$ ./gradlew installDist

ซึ่งจะคอมไพล์โค้ด แพ็กเกจในไฟล์ JAR และสร้างสคริปต์ที่เรียกใช้ตัวอย่าง ระบบจะสร้างไฟล์เหล่านี้ในไดเรกทอรี build/install/start_here/bin/ สคริปต์คือ route-guide-server และ route-guide-client

เซิร์ฟเวอร์ต้องทำงานก่อนจึงจะเริ่มไคลเอ็นต์ได้

  1. เรียกใช้เซิร์ฟเวอร์
$ ./build/install/start_here/bin/route-guide-server
  1. เรียกใช้ไคลเอ็นต์
$ ./build/install/start_here/bin/route-guide-client

8. ขั้นตอนถัดไป