เริ่มต้นใช้งาน gRPC-Rust - การสตรีม

1. บทนำ

ในโค้ดแล็บนี้ คุณจะได้ใช้ gRPC-Rust เพื่อสร้างไคลเอ็นต์และเซิร์ฟเวอร์ซึ่งเป็นรากฐานของแอปพลิเคชันการแมปเส้นทางที่เขียนด้วย Rust

เมื่อจบบทแนะนำนี้ คุณจะมีไคลเอ็นต์ที่เชื่อมต่อกับเซิร์ฟเวอร์ระยะไกลโดยใช้ gRPC เพื่อรับข้อมูลเกี่ยวกับฟีเจอร์ในเส้นทางของไคลเอ็นต์ สร้างข้อมูลสรุปของเส้นทางของไคลเอ็นต์ และแลกเปลี่ยนข้อมูลเส้นทาง เช่น ข้อมูลอัปเดตการจราจร กับเซิร์ฟเวอร์และไคลเอ็นต์อื่นๆ

บริการนี้กำหนดไว้ในไฟล์ Protocol Buffers ซึ่งจะใช้เพื่อสร้างโค้ดมาตรฐานสำหรับไคลเอ็นต์และเซิร์ฟเวอร์เพื่อให้สื่อสารกันได้ ซึ่งจะช่วยประหยัดเวลาและแรงในการติดตั้งใช้งานฟังก์ชันดังกล่าว

โค้ดที่สร้างขึ้นนี้ไม่เพียงแต่จัดการความซับซ้อนของการสื่อสารระหว่างเซิร์ฟเวอร์และไคลเอ็นต์เท่านั้น แต่ยังจัดการการซีเรียลไลซ์และการดีซีเรียลไลซ์ข้อมูลด้วย

สิ่งที่คุณจะได้เรียนรู้

  • วิธีใช้ Protocol Buffers เพื่อกำหนด API ของบริการ
  • วิธีสร้างไคลเอ็นต์และเซิร์ฟเวอร์ที่ใช้ gRPC จากคำจำกัดความของ Protocol Buffers โดยใช้การสร้างโค้ดอัตโนมัติ
  • ความเข้าใจเกี่ยวกับการสื่อสารแบบสตรีมมิงไคลเอ็นต์-เซิร์ฟเวอร์ด้วย gRPC

Codelab นี้มีไว้สำหรับนักพัฒนา Rust ที่เพิ่งเริ่มใช้ gRPC หรือต้องการทบทวน gRPC หรือใครก็ตามที่สนใจสร้างระบบแบบกระจาย ไม่จำเป็นต้องมีประสบการณ์เกี่ยวกับ gRPC มาก่อน

2. ก่อนเริ่มต้น

ข้อกำหนดเบื้องต้น

ตรวจสอบว่าคุณได้ติดตั้งสิ่งต่อไปนี้แล้ว

รับโค้ด

Codelab นี้มีโครงสร้างของซอร์สโค้ดของแอปพลิเคชันเพื่อให้คุณทำต่อได้ คุณจึงไม่ต้องเริ่มต้นจากศูนย์ ขั้นตอนต่อไปนี้จะแสดงวิธีส่งแอปพลิเคชันให้เสร็จสมบูรณ์ ซึ่งรวมถึงการใช้ปลั๊กอินคอมไพเลอร์ Protocol Buffer เพื่อสร้างโค้ด gRPC ที่ซ้ำกัน

ก่อนอื่น ให้สร้างไดเรกทอรีการทำงานของ Codelab แล้ว cd ไปยังไดเรกทอรีนั้น

mkdir streaming-grpc-rust-getting-started && cd streaming-grpc-rust-getting-started

ดาวน์โหลดและแตกไฟล์ Codelab โดยทำดังนี้

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-rust-streaming/start_here

หรือคุณจะดาวน์โหลดไฟล์ .zip ที่มีเฉพาะไดเรกทอรี Codelab แล้วแตกไฟล์ด้วยตนเองก็ได้

ซอร์สโค้ดที่เสร็จสมบูรณ์แล้วพร้อมใช้งานใน GitHub หากคุณไม่ต้องการพิมพ์การติดตั้งใช้งาน

3. กำหนดข้อความและบริการ

ขั้นตอนแรกคือการกำหนดบริการ gRPC ของแอปพลิเคชัน เมธอด RPC และประเภทข้อความคำขอและการตอบกลับโดยใช้ Protocol Buffers บริการของคุณจะให้ข้อมูลต่อไปนี้

  • เมธอด RPC ที่เรียกว่า ListFeatures, RecordRoute และ RouteChat ซึ่งเซิร์ฟเวอร์ใช้และไคลเอ็นต์เรียก
  • ประเภทข้อความ Point, Feature, Rectangle, RouteNote และ RouteSummary ซึ่งเป็นโครงสร้างข้อมูลที่แลกเปลี่ยนระหว่างไคลเอ็นต์และเซิร์ฟเวอร์เมื่อเรียกใช้เมธอดข้างต้น

วิธีการ RPC และประเภทข้อความทั้งหมดจะกำหนดไว้ในproto/routeguide.protoของซอร์สโค้ดที่ให้ไว้

Protocol Buffers เรียกกันโดยทั่วไปว่า protobuf ดูข้อมูลเพิ่มเติมเกี่ยวกับคำศัพท์ gRPC ได้ที่แนวคิดหลัก สถาปัตยกรรม และวงจรของ gRPC

กำหนดประเภทข้อความ

ก่อนอื่นเรามากำหนดข้อความที่จะใช้โดย RPC กัน ในrouteguide/route_guide.protoไฟล์ของซอร์สโค้ด ให้กำหนดPointประเภทข้อความก่อน Point แสดงคู่พิกัดละติจูดและลองจิจูดบนแผนที่ สำหรับ Codelab นี้ ให้ใช้จำนวนเต็มสำหรับพิกัด

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

หมายเลข 1 และ 2 เป็นหมายเลขรหัสที่ไม่ซ้ำกันสำหรับแต่ละฟิลด์ในโครงสร้าง message

จากนั้นกำหนดFeatureประเภทข้อความ Feature ใช้ฟิลด์ string สำหรับชื่อหรือที่อยู่ไปรษณีย์ของสิ่งหนึ่งๆ ในสถานที่ที่ระบุโดย Point ดังนี้

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

ถัดมาคือRectangleข้อความซึ่งแสดงสี่เหลี่ยมผืนผ้าละติจูด-ลองจิจูด ซึ่งแสดงเป็นจุด 2 จุดที่อยู่ตรงข้ามกันในแนวทแยง "lo" และ "hi"

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

รวมถึงข้อความ RouteNote ซึ่งแสดงถึงข้อความที่ส่งขณะอยู่ที่จุดหนึ่งๆ

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

นอกจากนี้ เรายังต้องการRouteSummaryข้อความด้วย คุณจะได้รับข้อความนี้เป็นการตอบกลับ RPC ของ RecordRoute ซึ่งจะอธิบายในส่วนถัดไป โดยจะมีจำนวนจุดแต่ละจุดที่ได้รับ จำนวนฟีเจอร์ที่ตรวจพบ และระยะทางทั้งหมดที่ครอบคลุมเป็นผลรวมสะสมของระยะทางระหว่างแต่ละจุด

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

กำหนดวิธีการบริการ

ก่อนอื่นเรามากำหนดบริการของเราก่อน แล้วค่อยกำหนดข้อความในภายหลัง หากต้องการกำหนดบริการ ให้ระบุบริการที่มีชื่อในไฟล์ .proto ไฟล์ proto/routeguide.proto มีโครงสร้าง service ชื่อ RouteGuide ซึ่งกำหนดวิธีการอย่างน้อย 1 วิธีที่บริการของแอปพลิเคชันมีให้

กำหนดเมธอด RPC ภายในคำจำกัดความของบริการ โดยระบุประเภทคำขอและการตอบกลับ ในส่วนนี้ของโค้ดแล็บ เราจะกำหนดค่าต่อไปนี้

ListFeatures

รับ Feature ที่พร้อมใช้งานภายใน Rectangle ที่ระบุ ระบบจะสตรีมผลลัพธ์แทนที่จะแสดงผลพร้อมกัน (เช่น ในข้อความตอบกลับที่มีฟิลด์ที่ซ้ำกัน) เนื่องจากสี่เหลี่ยมผืนผ้าอาจครอบคลุมพื้นที่ขนาดใหญ่และมีฟีเจอร์จำนวนมาก

ประเภทที่เหมาะสมสำหรับ RPC นี้คือ RPC แบบสตรีมมิงฝั่งเซิร์ฟเวอร์ ซึ่งไคลเอ็นต์จะส่งคำขอไปยังเซิร์ฟเวอร์และรับสตรีมเพื่ออ่านลำดับข้อความกลับ ไคลเอ็นต์จะอ่านจากสตรีมที่ส่งคืนจนกว่าจะไม่มีข้อความเหลือ ดังที่เห็นในตัวอย่าง คุณระบุวิธีการสตรีมฝั่งเซิร์ฟเวอร์ได้โดยวางคีย์เวิร์ด stream ไว้ก่อนประเภทการตอบกลับ

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

ยอมรับสตรีมของ Point ในเส้นทางที่กำลังเดินทาง และส่งคืน RouteSummary เมื่อการเดินทางเสร็จสมบูรณ์

RPC การสตรีมฝั่งไคลเอ็นต์ดูเหมือนจะเหมาะสมในกรณีนี้ โดยไคลเอ็นต์จะเขียนลำดับข้อความและส่งไปยังเซิร์ฟเวอร์โดยใช้สตรีมที่ระบุอีกครั้ง เมื่อไคลเอ็นต์เขียนข้อความเสร็จแล้ว ไคลเอ็นต์จะรอให้เซิร์ฟเวอร์อ่านข้อความทั้งหมดและส่งการตอบกลับ คุณระบุวิธีการสตรีมมิงฝั่งไคลเอ็นต์ได้โดยวางคีย์เวิร์ด stream ไว้ก่อนประเภทคำขอ

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

ยอมรับสตรีมของ RouteNote ที่ส่งขณะที่กำลังเดินทางตามเส้นทาง ขณะเดียวกันก็รับ RouteNote อื่นๆ (เช่น จากผู้ใช้รายอื่น)

นี่เป็นกรณีการใช้งานที่เหมาะสมสำหรับการสตรีมแบบสองทิศทาง RPC การสตรีมแบบ 2 ทางจะให้ทั้ง 2 ฝ่ายส่งลำดับข้อความโดยใช้สตรีมแบบอ่าน-เขียน สตรีมทั้ง 2 ทำงานแยกกัน ดังนั้นไคลเอ็นต์และเซิร์ฟเวอร์จึงอ่านและเขียนได้ตามลำดับที่ต้องการ

เช่น เซิร์ฟเวอร์อาจรอรับข้อความไคลเอ็นต์ทั้งหมดก่อนที่จะเขียนการตอบกลับ หรืออาจอ่านข้อความแล้วเขียนข้อความสลับกัน หรืออาจใช้การอ่านและการเขียนแบบอื่นๆ

ระบบจะรักษลําดับของข้อความในแต่ละสตรีม คุณระบุเมธอดประเภทนี้ได้โดยวางคีย์เวิร์ด stream ไว้ก่อนทั้งคำขอและการตอบกลับ

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. สร้างโค้ดไคลเอ็นต์และเซิร์ฟเวอร์

เราได้ให้โค้ดที่สร้างจากไฟล์ .proto ในไดเรกทอรีที่สร้างแล้วแก่คุณแล้ว

หากต้องการดูวิธีสร้างโค้ดจากไฟล์ .proto ด้วยตนเอง หรือทำการเปลี่ยนแปลงไฟล์ .proto และทดสอบ ให้ดูวิธีการเหล่านี้

โค้ดที่สร้างขึ้นประกอบด้วย

  • คำจำกัดความของโครงสร้างสำหรับประเภทข้อความ Point, Feature, Rectangle, RouteNote และ RouteSummary
  • ลักษณะบริการที่เราต้องใช้คือ route_guide_server::RouteGuide
  • ประเภทไคลเอ็นต์ที่เราจะใช้เพื่อเรียกเซิร์ฟเวอร์: route_guide_client::RouteGuideClient<T>

จากนั้นเราจะใช้เมธอดในฝั่งเซิร์ฟเวอร์เพื่อให้เมื่อไคลเอ็นต์ส่งคำขอ เซิร์ฟเวอร์จะตอบกลับได้

5. ติดตั้งใช้งานบริการ

ก่อนอื่นมาดูวิธีสร้างRouteGuideเซิร์ฟเวอร์กัน การทำให้RouteGuideบริการของเราทำงานได้ตามที่ควรจะเป็นมี 2 ส่วน ดังนี้

  • การติดตั้งใช้งานอินเทอร์เฟซบริการที่สร้างขึ้นจากคำจำกัดความของบริการ: การ "ทำงาน" จริงของบริการ
  • การเรียกใช้เซิร์ฟเวอร์ gRPC เพื่อรอรับคำขอจากไคลเอ็นต์และส่งคำขอไปยังการติดตั้งใช้งานเมธอดที่ถูกต้อง

ใน src/server/server.rs เราสามารถนำโค้ดที่สร้างขึ้นมาไว้ในขอบเขตผ่านมาโคร include_generated_proto! ของ gRPC และนำเข้าลักษณะ RouteGuide และ Point

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

pub use grpc_pb::{
    route_guide_server::{RouteGuideServer, RouteGuide},
    Point, Feature, Rectangle, RouteNote, RouteSummary
};

เราสามารถเริ่มต้นด้วยการกำหนดโครงสร้างเพื่อแสดงถึงบริการของเรา โดยเราสามารถดำเนินการนี้ได้ใน src/server/server.rs ในตอนนี้

#[derive(Debug)]
pub struct RouteGuideService {
    features: Vec<Feature>,
}

ตอนนี้เราต้องใช้ลักษณะ route_guide_server::RouteGuide จากโค้ดที่สร้างขึ้น

ใช้ RouteGuide

เราต้องติดตั้งใช้งานอินเทอร์เฟซ RouteGuide ที่สร้างขึ้น การติดตั้งใช้งานจะมีลักษณะดังนี้ ซึ่งอยู่ในเทมเพลตอยู่แล้ว

#[tonic::async_trait]
impl RouteGuide for RouteGuideService {
    async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        ...
    }

    async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        ...
    }

    async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        ...
    }
}

มาดูรายละเอียดการติดตั้งใช้งาน RPC แต่ละรายการกัน

RPC การสตรีมฝั่งเซิร์ฟเวอร์

มาเริ่มที่ ListFeatures กันเลย นี่คือ RPC การสตรีมฝั่งเซิร์ฟเวอร์ ดังนั้นเราจึงต้องส่ง Feature หลายรายการกลับไปยังไคลเอ็นต์

async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        println!("ListFeatures = {:?}", request);

        let (tx, rx) = mpsc::channel(4);
        let features = self.features.clone();

        tokio::spawn(async move {
            for feature in &features[..] {
                if in_range(&feature.location().to_owned(), request.get_ref()) {
                    println!("  => send {feature:?}");
                    tx.send(Ok(feature.clone())).await.unwrap();
                }
            }
            println!(" /// done sending");
        });

        let output_stream = ReceiverStream::new(rx);
        Ok(Response::new(Box::pin(output_stream)))
    }

ดังที่คุณเห็น เราได้รับออบเจ็กต์คำขอ (Rectangle ซึ่งไคลเอ็นต์ต้องการค้นหา Features) คราวนี้เราต้องส่งคืนสตรีมของค่า เราสร้างแชแนลและสร้างงานแบบอะซิงโครนัสใหม่ซึ่งเราจะทำการค้นหาและส่งฟีเจอร์ที่ตรงตามข้อจำกัดของเราไปยังแชแนล ระบบจะส่งคืนสตรีมครึ่งหนึ่งของช่องไปยังผู้เรียกใช้โดยห่อไว้ใน tonic::Response

RPC การสตรีมฝั่งไคลเอ็นต์

ตอนนี้เรามาดูสิ่งที่ซับซ้อนขึ้นเล็กน้อยกัน นั่นคือวิธีการสตรีมฝั่งไคลเอ็นต์ RecordRoute ซึ่งเราจะรับสตรีมของ Points จากไคลเอ็นต์และส่งคืน RouteSummary รายการเดียวพร้อมข้อมูลเกี่ยวกับการเดินทาง โดยจะรับสตรีมเป็นอินพุต ซึ่งเซิร์ฟเวอร์สามารถใช้เพื่ออ่านและเขียนข้อความได้ โดยจะวนซ้ำข้อความไคลเอ็นต์โดยใช้วิธี next() และส่งคืนการตอบกลับเดียว

async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        println!("RecordRoute");
        let mut stream = request.into_inner();
        let mut summary = RouteSummary::default();
        let mut last_point = None;
        let now = Instant::now();

        while let Some(point) = stream.next().await {
            let point = point?;
            println!("  ==> Point = {point:?}");

            // Increment the point count
            summary.set_point_count(summary.point_count() + 1);

            // Find features
            for feature in &self.features[..] {
                if feature.location().latitude() == point.latitude() {
                    if feature.location().longitude() == point.longitude(){
                        summary.set_feature_count(summary.feature_count() + 1);
                    }
                }
            }

            // Calculate the distance
            if let Some(ref last_point) = last_point {
                let new_dist = summary.distance() + calc_distance(last_point, &point);
                summary.set_distance(new_dist);
            }
            last_point = Some(point);
        }
        summary.set_elapsed_time(now.elapsed().as_secs() as i32);
        Ok(Response::new(summary))
    }

ในส่วนเนื้อหาของเมธอด เราใช้เมธอด next() ของสตรีมเพื่ออ่านคำขอของไคลเอ็นต์ซ้ำๆ ไปยังออบเจ็กต์คำขอ (ในกรณีนี้คือ Point) จนกว่าจะไม่มีข้อความอีก หากเป็น None แสดงว่าสตรีมยังคงใช้งานได้และอ่านต่อไปได้

RPC การสตรีมแบบ 2 ทาง

สุดท้าย มาดู RPC การสตรีมแบบสองทาง RouteChat() กัน

async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        println!("RouteChat");

        let mut notes: HashMap<(i32, i32), Vec<RouteNote>> = HashMap::new();
        let mut stream = request.into_inner();

        let output = async_stream::try_stream! {
            while let Some(note) = stream.next().await {
                let note = note?;
                let location = note.location();
                let key = (location.latitude(), location.longitude());
                let location_notes = notes.entry(key).or_insert(vec![]);
                location_notes.push(note);
                for note in location_notes {
                    yield note.clone();
                }
            }
        };
        Ok(Response::new(Box::pin(output)))
    }

คราวนี้เราจะได้รับสตรีมที่ใช้เพื่ออ่านและเขียนข้อความได้เช่นเดียวกับตัวอย่างการสตรีมฝั่งไคลเอ็นต์ แต่คราวนี้เราจะแสดงค่าผ่านสตรีมของเมธอดในขณะที่ไคลเอ็นต์ยังคงเขียนข้อความลงในสตรีมข้อความ ไวยากรณ์สำหรับการอ่านและเขียนที่นี่คล้ายกับวิธีการสตรีมมิงฝั่งไคลเอ็นต์ของเรามาก ยกเว้นว่าเซิร์ฟเวอร์จะแสดง RouteChatStream แม้ว่าแต่ละฝ่ายจะได้รับข้อความของอีกฝ่ายตามลำดับที่เขียนเสมอ แต่ทั้งไคลเอ็นต์และเซิร์ฟเวอร์สามารถอ่านและเขียนได้ตามลำดับใดก็ได้ โดยสตรีมจะทำงานอย่างอิสระโดยสมบูรณ์

เราสร้างสตรีม output โดยใช้ try_stream! ซึ่งบ่งชี้ว่าสตรีมอาจแสดงข้อผิดพลาด

เริ่มเซิร์ฟเวอร์

เมื่อใช้วิธีนี้แล้ว เราก็ต้องเริ่มเซิร์ฟเวอร์ gRPC เพื่อให้ไคลเอ็นต์ใช้บริการของเราได้จริง กรอกข้อมูลใน main()

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:10000".parse().unwrap();
    println!("RouteGuideServer listening on: {addr}");
    let route_guide = RouteGuideService {
        features: load(),
    };
    let svc = RouteGuideServer::new(route_guide);
    Server::builder().add_service(svc).serve(addr).await?;
    Ok(())
}

สิ่งที่เกิดขึ้นใน main() มีดังนี้

  1. ระบุพอร์ตที่เราต้องการใช้เพื่อรอรับคำขอของไคลเอ็นต์
  2. สร้าง RouteGuideService ที่โหลดฟีเจอร์ไว้
  3. สร้างอินสแตนซ์ของเซิร์ฟเวอร์ gRPC โดยใช้ RouteGuideServer::new() โดยใช้บริการที่เราสร้างขึ้น
  4. ลงทะเบียนการติดตั้งใช้งานบริการกับเซิร์ฟเวอร์ gRPC
  5. เรียกใช้ serve() ในเซิร์ฟเวอร์พร้อมรายละเอียดพอร์ตเพื่อรอการบล็อกจนกว่าจะมีการสิ้นสุดกระบวนการ

6. สร้างไคลเอ็นต์

ในส่วนนี้ เราจะมาดูการสร้างไคลเอ็นต์ Rust สำหรับบริการ RouteGuide ใน src/client/client.rs กัน

ก่อนอื่น ให้นำโค้ดที่สร้างขึ้นมาไว้ในขอบเขต

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

use grpc_pb::route_guide_client::RouteGuideClient;
use grpc_pb::{Point, Rectangle, RouteNote};

วิธีการเรียกใช้บริการ

ตอนนี้มาดูวิธีเรียกใช้เมธอดบริการกัน ใน gRPC-Rust, RPC จะทํางานในโหมดบล็อก/ซิงโครนัส ซึ่งหมายความว่าการเรียก RPC จะรอให้เซิร์ฟเวอร์ตอบกลับ และจะส่งคืนการตอบกลับหรือข้อผิดพลาด

RPC การสตรีมฝั่งเซิร์ฟเวอร์

ในส่วนนี้ เราจะเรียกใช้เมธอดการสตรีมฝั่งเซิร์ฟเวอร์ ListFeatures ซึ่งจะแสดงผลสตรีมของออบเจ็กต์ Feature ทางภูมิศาสตร์

async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let rectangle = proto!(Rectangle {
        lo: proto!(Point {
            latitude: 400_000_000,
            longitude: -750_000_000,
        }),
        hi: proto!(Point {
            latitude: 420_000_000,
            longitude: -730_000_000,
        }),
    });

    let mut stream = client
        .list_features(Request::new(rectangle))
        .await?
        .into_inner();

    while let Some(feature) = stream.message().await? {
        println!("FEATURE: Name = \"{}\", Lat = {}, Lon = {}",
            feature.name(),
            feature.location().latitude(),
            feature.location().longitude());
        }
    Ok(())
}

เราส่งคำขอไปยังเมธอดและรับอินสแตนซ์ของ ListFeaturesStream กลับมา ไคลเอ็นต์สามารถใช้ ListFeaturesStream สตรีมเพื่ออ่านการตอบกลับของเซิร์ฟเวอร์ได้ เราใช้วิธี ListFeaturesStreammessage() เพื่ออ่านการตอบกลับของเซิร์ฟเวอร์ไปยังออบเจ็กต์บัฟเฟอร์โปรโตคอลการตอบกลับ (ในกรณีนี้คือ Feature) ซ้ำๆ จนกว่าจะไม่มีข้อความอีก

RPC การสตรีมฝั่งไคลเอ็นต์

ในส่วน record_route เราจะเปลี่ยนเวกเตอร์ของจุดเป็นสตรีม จากนั้นเราจะส่งสตรีมนี้ไปยัง record_route() เป็นคำขอและรับการตอบกลับ RouteSummary รายการเดียวหลังจากที่เซิร์ฟเวอร์ประมวลผลสตรีมเสร็จสมบูรณ์แล้ว

async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let mut rng = rand::rng();
    let point_count: i32 = rng.random_range(2..100);

    let mut points = vec![];
    for _ in 0..=point_count {
        points.push(random_point(&mut rng))
    }

    println!("Traversing {} points", points.len());
    let request = Request::new(tokio_stream::iter(points));

    match client.record_route(request).await {
        Ok(response) => {
            let response = response.into_inner();
            println!("SUMMARY: Feature Count = {}, Distance = {}", response.feature_count(), response.distance())},
        Err(e) => println!("something went wrong: {e:?}"),
    }

    Ok(())
}

RPC การสตรีมแบบ 2 ทาง

สุดท้าย มาดู RPC การสตรีมแบบสองทาง RouteChat() กัน เราส่งคำขอสตรีมไปยังเมธอดที่เราเขียน และรับสตรีมกลับมาซึ่งเราสามารถอ่านข้อความได้ คราวนี้เราจะแสดงค่าผ่านสตรีมของเมธอดในขณะที่เซิร์ฟเวอร์ยังคงเขียนข้อความลงในสตรีมข้อความ

async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let start = time::Instant::now();
    let outbound = async_stream::stream! {
        let mut interval = time::interval(Duration::from_secs(1));
        for _ in 0..10 {
            let time = interval.tick().await;
            let elapsed = time.duration_since(start);
            let note = proto!(RouteNote {
                location: proto!(Point {
                    latitude: 409146138 + elapsed.as_secs() as i32,
                    longitude: -746188906,
                }),
                message: format!("at {elapsed:?}"),
            });
            yield note;
        }
    };
    let response = client.route_chat(Request::new(outbound)).await?;
    let mut inbound = response.into_inner();
    while let Some(note) = inbound.message().await? {
        println!("Note: Latitude = {}, Longitude = {}, Message = \"{}\"",
            note.location().latitude(),
            note.location().longitude(),
            note.message());
        }
    Ok(())
}

แม้ว่าแต่ละฝ่ายจะได้รับข้อความของอีกฝ่ายตามลำดับที่เขียนเสมอ แต่ทั้งไคลเอ็นต์และเซิร์ฟเวอร์สามารถอ่านและเขียนได้ตามลำดับใดก็ได้ โดยสตรีมจะทำงานอย่างอิสระโดยสมบูรณ์

เมธอดตัวช่วยการโทร

หากต้องการเรียกใช้เมธอดบริการ เราต้องสร้างแชแนลเพื่อสื่อสารกับเซิร์ฟเวอร์ก่อน เราสร้างสิ่งนี้โดยการสร้างปลายทางก่อน จากนั้นเชื่อมต่อกับปลายทางนั้น และส่งช่องที่สร้างขึ้นเมื่อเชื่อมต่อกับ RouteGuideClient::new() ดังนี้

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel); 
    Ok(())
}

ใน main() ให้เรียกใช้เมธอดที่เราเพิ่งสร้าง

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel);

    println!("\n*** SERVER STREAMING ***");
    print_features(&mut client).await?;

    println!("\n*** CLIENT STREAMING ***");
    run_record_route(&mut client).await?;

    println!("\n*** BIDIRECTIONAL STREAMING ***");
    run_route_chat(&mut client).await?;

    Ok(())
}

7. ลองเลย

ก่อนอื่น หากต้องการเรียกใช้ไคลเอ็นต์และเซิร์ฟเวอร์ ให้เพิ่มเป็นเป้าหมายไบนารีลงใน Crate เราต้องแก้ไข Cargo.toml ตามนั้น

[[bin]]
name = "routeguide-server"
path = "src/server/server.rs"

[[bin]]
name = "routeguide-client"
path = "src/client/client.rs"

เช่นเดียวกับโปรเจ็กต์อื่นๆ เรายังต้องคำนึงถึงการขึ้นต่อกันที่จำเป็นต่อการทำงานของโค้ดด้วย สำหรับโปรเจ็กต์ Rust การอ้างอิงจะอยู่ใน Cargo.toml เราได้แสดงรายการการอ้างอิงที่จำเป็นในไฟล์ Cargo.toml แล้ว

จากนั้นเรียกใช้คำสั่งต่อไปนี้จากไดเรกทอรีที่ทำงานอยู่

  1. เรียกใช้เซิร์ฟเวอร์ในเทอร์มินัลหนึ่ง
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-server 
  1. เรียกใช้ไคลเอ็นต์จากเทอร์มินัลอื่นโดยใช้คำสั่งต่อไปนี้
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-client

คุณจะเห็นเอาต์พุตดังนี้

*** SERVER STREAMING ***
FEATURE: Name = "Patriots Path, Mendham, NJ 07945, USA", Lat = 407838351, Lon = -746143763
FEATURE: Name = "101 New Jersey 10, Whippany, NJ 07981, USA", Lat = 408122808, Lon = -743999179
FEATURE: Name = "U.S. 6, Shohola, PA 18458, USA", Lat = 413628156, Lon = -749015468
...
*** CLIENT STREAMING ***
Traversing 86 points
SUMMARY: Feature Count = 0, Distance = 803709356

*** BIDIRECTIONAL STREAMING ***
Note: Latitude = 409146138, Longitude = -746188906, Message = "at 112.45µs"
Note: Latitude = 409146139, Longitude = -746188906, Message = "at 1.00011245s"
Note: Latitude = 409146140, Longitude = -746188906, Message = "at 2.00011245s"

8. ขั้นตอนถัดไป