1. บทนำ
ในโค้ดแล็บนี้ คุณจะได้ใช้ gRPC-Rust เพื่อสร้างไคลเอ็นต์และเซิร์ฟเวอร์ซึ่งเป็นรากฐานของแอปพลิเคชันการแมปเส้นทางที่เขียนด้วย Rust
เมื่อจบบทแนะนำนี้ คุณจะมีไคลเอ็นต์ที่เชื่อมต่อกับเซิร์ฟเวอร์ระยะไกลโดยใช้ gRPC เพื่อรับข้อมูลเกี่ยวกับฟีเจอร์ในเส้นทางของไคลเอ็นต์ สร้างข้อมูลสรุปของเส้นทางของไคลเอ็นต์ และแลกเปลี่ยนข้อมูลเส้นทาง เช่น ข้อมูลอัปเดตการจราจร กับเซิร์ฟเวอร์และไคลเอ็นต์อื่นๆ
บริการนี้กำหนดไว้ในไฟล์ Protocol Buffers ซึ่งจะใช้เพื่อสร้างโค้ดมาตรฐานสำหรับไคลเอ็นต์และเซิร์ฟเวอร์เพื่อให้สื่อสารกันได้ ซึ่งจะช่วยประหยัดเวลาและแรงในการติดตั้งใช้งานฟังก์ชันดังกล่าว
โค้ดที่สร้างขึ้นนี้ไม่เพียงแต่จัดการความซับซ้อนของการสื่อสารระหว่างเซิร์ฟเวอร์และไคลเอ็นต์เท่านั้น แต่ยังจัดการการซีเรียลไลซ์และการดีซีเรียลไลซ์ข้อมูลด้วย
สิ่งที่คุณจะได้เรียนรู้
- วิธีใช้ Protocol Buffers เพื่อกำหนด API ของบริการ
- วิธีสร้างไคลเอ็นต์และเซิร์ฟเวอร์ที่ใช้ gRPC จากคำจำกัดความของ Protocol Buffers โดยใช้การสร้างโค้ดอัตโนมัติ
- ความเข้าใจเกี่ยวกับการสื่อสารแบบสตรีมมิงไคลเอ็นต์-เซิร์ฟเวอร์ด้วย gRPC
Codelab นี้มีไว้สำหรับนักพัฒนา Rust ที่เพิ่งเริ่มใช้ gRPC หรือต้องการทบทวน gRPC หรือใครก็ตามที่สนใจสร้างระบบแบบกระจาย ไม่จำเป็นต้องมีประสบการณ์เกี่ยวกับ gRPC มาก่อน
2. ก่อนเริ่มต้น
ข้อกำหนดเบื้องต้น
ตรวจสอบว่าคุณได้ติดตั้งสิ่งต่อไปนี้แล้ว
รับโค้ด
Codelab นี้มีโครงสร้างของซอร์สโค้ดของแอปพลิเคชันเพื่อให้คุณทำต่อได้ คุณจึงไม่ต้องเริ่มต้นจากศูนย์ ขั้นตอนต่อไปนี้จะแสดงวิธีส่งแอปพลิเคชันให้เสร็จสมบูรณ์ ซึ่งรวมถึงการใช้ปลั๊กอินคอมไพเลอร์ Protocol Buffer เพื่อสร้างโค้ด gRPC ที่ซ้ำกัน
ก่อนอื่น ให้สร้างไดเรกทอรีการทำงานของ Codelab แล้ว cd
ไปยังไดเรกทอรีนั้น
mkdir streaming-grpc-rust-getting-started && cd streaming-grpc-rust-getting-started
ดาวน์โหลดและแตกไฟล์ Codelab โดยทำดังนี้
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \ | tar xvz --strip-components=4 \ grpc-codelabs-1/codelabs/grpc-rust-streaming/start_here
หรือคุณจะดาวน์โหลดไฟล์ .zip ที่มีเฉพาะไดเรกทอรี Codelab แล้วแตกไฟล์ด้วยตนเองก็ได้
ซอร์สโค้ดที่เสร็จสมบูรณ์แล้วพร้อมใช้งานใน GitHub หากคุณไม่ต้องการพิมพ์การติดตั้งใช้งาน
3. กำหนดข้อความและบริการ
ขั้นตอนแรกคือการกำหนดบริการ gRPC ของแอปพลิเคชัน เมธอด RPC และประเภทข้อความคำขอและการตอบกลับโดยใช้ Protocol Buffers บริการของคุณจะให้ข้อมูลต่อไปนี้
- เมธอด RPC ที่เรียกว่า
ListFeatures
,RecordRoute
และRouteChat
ซึ่งเซิร์ฟเวอร์ใช้และไคลเอ็นต์เรียก - ประเภทข้อความ
Point
,Feature
,Rectangle
,RouteNote
และRouteSummary
ซึ่งเป็นโครงสร้างข้อมูลที่แลกเปลี่ยนระหว่างไคลเอ็นต์และเซิร์ฟเวอร์เมื่อเรียกใช้เมธอดข้างต้น
วิธีการ RPC และประเภทข้อความทั้งหมดจะกำหนดไว้ในproto/routeguide.proto
ของซอร์สโค้ดที่ให้ไว้
Protocol Buffers เรียกกันโดยทั่วไปว่า protobuf ดูข้อมูลเพิ่มเติมเกี่ยวกับคำศัพท์ gRPC ได้ที่แนวคิดหลัก สถาปัตยกรรม และวงจรของ gRPC
กำหนดประเภทข้อความ
ก่อนอื่นเรามากำหนดข้อความที่จะใช้โดย RPC กัน ในrouteguide/route_guide.proto
ไฟล์ของซอร์สโค้ด ให้กำหนดPoint
ประเภทข้อความก่อน Point
แสดงคู่พิกัดละติจูดและลองจิจูดบนแผนที่ สำหรับ Codelab นี้ ให้ใช้จำนวนเต็มสำหรับพิกัด
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
หมายเลข 1
และ 2
เป็นหมายเลขรหัสที่ไม่ซ้ำกันสำหรับแต่ละฟิลด์ในโครงสร้าง message
จากนั้นกำหนดFeature
ประเภทข้อความ Feature
ใช้ฟิลด์ string
สำหรับชื่อหรือที่อยู่ไปรษณีย์ของสิ่งหนึ่งๆ ในสถานที่ที่ระบุโดย Point
ดังนี้
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
ถัดมาคือRectangle
ข้อความซึ่งแสดงสี่เหลี่ยมผืนผ้าละติจูด-ลองจิจูด ซึ่งแสดงเป็นจุด 2 จุดที่อยู่ตรงข้ามกันในแนวทแยง "lo" และ "hi"
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
รวมถึงข้อความ RouteNote
ซึ่งแสดงถึงข้อความที่ส่งขณะอยู่ที่จุดหนึ่งๆ
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
นอกจากนี้ เรายังต้องการRouteSummary
ข้อความด้วย คุณจะได้รับข้อความนี้เป็นการตอบกลับ RPC ของ RecordRoute
ซึ่งจะอธิบายในส่วนถัดไป โดยจะมีจำนวนจุดแต่ละจุดที่ได้รับ จำนวนฟีเจอร์ที่ตรวจพบ และระยะทางทั้งหมดที่ครอบคลุมเป็นผลรวมสะสมของระยะทางระหว่างแต่ละจุด
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
กำหนดวิธีการบริการ
ก่อนอื่นเรามากำหนดบริการของเราก่อน แล้วค่อยกำหนดข้อความในภายหลัง หากต้องการกำหนดบริการ ให้ระบุบริการที่มีชื่อในไฟล์ .proto
ไฟล์ proto/routeguide.proto
มีโครงสร้าง service
ชื่อ RouteGuide
ซึ่งกำหนดวิธีการอย่างน้อย 1 วิธีที่บริการของแอปพลิเคชันมีให้
กำหนดเมธอด RPC ภายในคำจำกัดความของบริการ โดยระบุประเภทคำขอและการตอบกลับ ในส่วนนี้ของโค้ดแล็บ เราจะกำหนดค่าต่อไปนี้
ListFeatures
รับ Feature
ที่พร้อมใช้งานภายใน Rectangle
ที่ระบุ ระบบจะสตรีมผลลัพธ์แทนที่จะแสดงผลพร้อมกัน (เช่น ในข้อความตอบกลับที่มีฟิลด์ที่ซ้ำกัน) เนื่องจากสี่เหลี่ยมผืนผ้าอาจครอบคลุมพื้นที่ขนาดใหญ่และมีฟีเจอร์จำนวนมาก
ประเภทที่เหมาะสมสำหรับ RPC นี้คือ RPC แบบสตรีมมิงฝั่งเซิร์ฟเวอร์ ซึ่งไคลเอ็นต์จะส่งคำขอไปยังเซิร์ฟเวอร์และรับสตรีมเพื่ออ่านลำดับข้อความกลับ ไคลเอ็นต์จะอ่านจากสตรีมที่ส่งคืนจนกว่าจะไม่มีข้อความเหลือ ดังที่เห็นในตัวอย่าง คุณระบุวิธีการสตรีมฝั่งเซิร์ฟเวอร์ได้โดยวางคีย์เวิร์ด stream
ไว้ก่อนประเภทการตอบกลับ
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
ยอมรับสตรีมของ Point
ในเส้นทางที่กำลังเดินทาง และส่งคืน RouteSummary
เมื่อการเดินทางเสร็จสมบูรณ์
RPC การสตรีมฝั่งไคลเอ็นต์ดูเหมือนจะเหมาะสมในกรณีนี้ โดยไคลเอ็นต์จะเขียนลำดับข้อความและส่งไปยังเซิร์ฟเวอร์โดยใช้สตรีมที่ระบุอีกครั้ง เมื่อไคลเอ็นต์เขียนข้อความเสร็จแล้ว ไคลเอ็นต์จะรอให้เซิร์ฟเวอร์อ่านข้อความทั้งหมดและส่งการตอบกลับ คุณระบุวิธีการสตรีมมิงฝั่งไคลเอ็นต์ได้โดยวางคีย์เวิร์ด stream
ไว้ก่อนประเภทคำขอ
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
ยอมรับสตรีมของ RouteNote
ที่ส่งขณะที่กำลังเดินทางตามเส้นทาง ขณะเดียวกันก็รับ RouteNote
อื่นๆ (เช่น จากผู้ใช้รายอื่น)
นี่เป็นกรณีการใช้งานที่เหมาะสมสำหรับการสตรีมแบบสองทิศทาง RPC การสตรีมแบบ 2 ทางจะให้ทั้ง 2 ฝ่ายส่งลำดับข้อความโดยใช้สตรีมแบบอ่าน-เขียน สตรีมทั้ง 2 ทำงานแยกกัน ดังนั้นไคลเอ็นต์และเซิร์ฟเวอร์จึงอ่านและเขียนได้ตามลำดับที่ต้องการ
เช่น เซิร์ฟเวอร์อาจรอรับข้อความไคลเอ็นต์ทั้งหมดก่อนที่จะเขียนการตอบกลับ หรืออาจอ่านข้อความแล้วเขียนข้อความสลับกัน หรืออาจใช้การอ่านและการเขียนแบบอื่นๆ
ระบบจะรักษลําดับของข้อความในแต่ละสตรีม คุณระบุเมธอดประเภทนี้ได้โดยวางคีย์เวิร์ด stream
ไว้ก่อนทั้งคำขอและการตอบกลับ
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. สร้างโค้ดไคลเอ็นต์และเซิร์ฟเวอร์
เราได้ให้โค้ดที่สร้างจากไฟล์ .proto
ในไดเรกทอรีที่สร้างแล้วแก่คุณแล้ว
หากต้องการดูวิธีสร้างโค้ดจากไฟล์ .proto
ด้วยตนเอง หรือทำการเปลี่ยนแปลงไฟล์ .proto
และทดสอบ ให้ดูวิธีการเหล่านี้
โค้ดที่สร้างขึ้นประกอบด้วย
- คำจำกัดความของโครงสร้างสำหรับประเภทข้อความ
Point
,Feature
,Rectangle
,RouteNote
และRouteSummary
- ลักษณะบริการที่เราต้องใช้คือ
route_guide_server::RouteGuide
- ประเภทไคลเอ็นต์ที่เราจะใช้เพื่อเรียกเซิร์ฟเวอร์:
route_guide_client::RouteGuideClient<T>
จากนั้นเราจะใช้เมธอดในฝั่งเซิร์ฟเวอร์เพื่อให้เมื่อไคลเอ็นต์ส่งคำขอ เซิร์ฟเวอร์จะตอบกลับได้
5. ติดตั้งใช้งานบริการ
ก่อนอื่นมาดูวิธีสร้างRouteGuide
เซิร์ฟเวอร์กัน การทำให้RouteGuide
บริการของเราทำงานได้ตามที่ควรจะเป็นมี 2 ส่วน ดังนี้
- การติดตั้งใช้งานอินเทอร์เฟซบริการที่สร้างขึ้นจากคำจำกัดความของบริการ: การ "ทำงาน" จริงของบริการ
- การเรียกใช้เซิร์ฟเวอร์ gRPC เพื่อรอรับคำขอจากไคลเอ็นต์และส่งคำขอไปยังการติดตั้งใช้งานเมธอดที่ถูกต้อง
ใน src/server/server.rs
เราสามารถนำโค้ดที่สร้างขึ้นมาไว้ในขอบเขตผ่านมาโคร include_generated_proto!
ของ gRPC และนำเข้าลักษณะ RouteGuide
และ Point
mod grpc_pb {
grpc::include_generated_proto!("generated", "routeguide");
}
pub use grpc_pb::{
route_guide_server::{RouteGuideServer, RouteGuide},
Point, Feature, Rectangle, RouteNote, RouteSummary
};
เราสามารถเริ่มต้นด้วยการกำหนดโครงสร้างเพื่อแสดงถึงบริการของเรา โดยเราสามารถดำเนินการนี้ได้ใน src/server/server.rs
ในตอนนี้
#[derive(Debug)]
pub struct RouteGuideService {
features: Vec<Feature>,
}
ตอนนี้เราต้องใช้ลักษณะ route_guide_server::RouteGuide
จากโค้ดที่สร้างขึ้น
ใช้ RouteGuide
เราต้องติดตั้งใช้งานอินเทอร์เฟซ RouteGuide
ที่สร้างขึ้น การติดตั้งใช้งานจะมีลักษณะดังนี้ ซึ่งอยู่ในเทมเพลตอยู่แล้ว
#[tonic::async_trait] impl RouteGuide for RouteGuideService { async fn list_features( &self, request: Request<Rectangle>, ) -> Result<Response<ListFeaturesStream>, Status> { ... } async fn record_route( &self, request: Request<tonic::Streaming<Point>>, ) -> Result<Response<RouteSummary>, Status> { ... } async fn route_chat( &self, request: Request<tonic::Streaming<RouteNote>>, ) -> Result<Response<RouteChatStream>, Status> { ... } }
มาดูรายละเอียดการติดตั้งใช้งาน RPC แต่ละรายการกัน
RPC การสตรีมฝั่งเซิร์ฟเวอร์
มาเริ่มที่ ListFeatures
กันเลย นี่คือ RPC การสตรีมฝั่งเซิร์ฟเวอร์ ดังนั้นเราจึงต้องส่ง Feature
หลายรายการกลับไปยังไคลเอ็นต์
async fn list_features(
&self,
request: Request<Rectangle>,
) -> Result<Response<ListFeaturesStream>, Status> {
println!("ListFeatures = {:?}", request);
let (tx, rx) = mpsc::channel(4);
let features = self.features.clone();
tokio::spawn(async move {
for feature in &features[..] {
if in_range(&feature.location().to_owned(), request.get_ref()) {
println!(" => send {feature:?}");
tx.send(Ok(feature.clone())).await.unwrap();
}
}
println!(" /// done sending");
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream)))
}
ดังที่คุณเห็น เราได้รับออบเจ็กต์คำขอ (Rectangle
ซึ่งไคลเอ็นต์ต้องการค้นหา Features
) คราวนี้เราต้องส่งคืนสตรีมของค่า เราสร้างแชแนลและสร้างงานแบบอะซิงโครนัสใหม่ซึ่งเราจะทำการค้นหาและส่งฟีเจอร์ที่ตรงตามข้อจำกัดของเราไปยังแชแนล ระบบจะส่งคืนสตรีมครึ่งหนึ่งของช่องไปยังผู้เรียกใช้โดยห่อไว้ใน tonic::Response
RPC การสตรีมฝั่งไคลเอ็นต์
ตอนนี้เรามาดูสิ่งที่ซับซ้อนขึ้นเล็กน้อยกัน นั่นคือวิธีการสตรีมฝั่งไคลเอ็นต์ RecordRoute
ซึ่งเราจะรับสตรีมของ Points
จากไคลเอ็นต์และส่งคืน RouteSummary
รายการเดียวพร้อมข้อมูลเกี่ยวกับการเดินทาง โดยจะรับสตรีมเป็นอินพุต ซึ่งเซิร์ฟเวอร์สามารถใช้เพื่ออ่านและเขียนข้อความได้ โดยจะวนซ้ำข้อความไคลเอ็นต์โดยใช้วิธี next()
และส่งคืนการตอบกลับเดียว
async fn record_route(
&self,
request: Request<tonic::Streaming<Point>>,
) -> Result<Response<RouteSummary>, Status> {
println!("RecordRoute");
let mut stream = request.into_inner();
let mut summary = RouteSummary::default();
let mut last_point = None;
let now = Instant::now();
while let Some(point) = stream.next().await {
let point = point?;
println!(" ==> Point = {point:?}");
// Increment the point count
summary.set_point_count(summary.point_count() + 1);
// Find features
for feature in &self.features[..] {
if feature.location().latitude() == point.latitude() {
if feature.location().longitude() == point.longitude(){
summary.set_feature_count(summary.feature_count() + 1);
}
}
}
// Calculate the distance
if let Some(ref last_point) = last_point {
let new_dist = summary.distance() + calc_distance(last_point, &point);
summary.set_distance(new_dist);
}
last_point = Some(point);
}
summary.set_elapsed_time(now.elapsed().as_secs() as i32);
Ok(Response::new(summary))
}
ในส่วนเนื้อหาของเมธอด เราใช้เมธอด next()
ของสตรีมเพื่ออ่านคำขอของไคลเอ็นต์ซ้ำๆ ไปยังออบเจ็กต์คำขอ (ในกรณีนี้คือ Point
) จนกว่าจะไม่มีข้อความอีก หากเป็น None แสดงว่าสตรีมยังคงใช้งานได้และอ่านต่อไปได้
RPC การสตรีมแบบ 2 ทาง
สุดท้าย มาดู RPC การสตรีมแบบสองทาง RouteChat()
กัน
async fn route_chat(
&self,
request: Request<tonic::Streaming<RouteNote>>,
) -> Result<Response<RouteChatStream>, Status> {
println!("RouteChat");
let mut notes: HashMap<(i32, i32), Vec<RouteNote>> = HashMap::new();
let mut stream = request.into_inner();
let output = async_stream::try_stream! {
while let Some(note) = stream.next().await {
let note = note?;
let location = note.location();
let key = (location.latitude(), location.longitude());
let location_notes = notes.entry(key).or_insert(vec![]);
location_notes.push(note);
for note in location_notes {
yield note.clone();
}
}
};
Ok(Response::new(Box::pin(output)))
}
คราวนี้เราจะได้รับสตรีมที่ใช้เพื่ออ่านและเขียนข้อความได้เช่นเดียวกับตัวอย่างการสตรีมฝั่งไคลเอ็นต์ แต่คราวนี้เราจะแสดงค่าผ่านสตรีมของเมธอดในขณะที่ไคลเอ็นต์ยังคงเขียนข้อความลงในสตรีมข้อความ ไวยากรณ์สำหรับการอ่านและเขียนที่นี่คล้ายกับวิธีการสตรีมมิงฝั่งไคลเอ็นต์ของเรามาก ยกเว้นว่าเซิร์ฟเวอร์จะแสดง RouteChatStream
แม้ว่าแต่ละฝ่ายจะได้รับข้อความของอีกฝ่ายตามลำดับที่เขียนเสมอ แต่ทั้งไคลเอ็นต์และเซิร์ฟเวอร์สามารถอ่านและเขียนได้ตามลำดับใดก็ได้ โดยสตรีมจะทำงานอย่างอิสระโดยสมบูรณ์
เราสร้างสตรีม output
โดยใช้ try_stream!
ซึ่งบ่งชี้ว่าสตรีมอาจแสดงข้อผิดพลาด
เริ่มเซิร์ฟเวอร์
เมื่อใช้วิธีนี้แล้ว เราก็ต้องเริ่มเซิร์ฟเวอร์ gRPC เพื่อให้ไคลเอ็นต์ใช้บริการของเราได้จริง กรอกข้อมูลใน main()
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:10000".parse().unwrap();
println!("RouteGuideServer listening on: {addr}");
let route_guide = RouteGuideService {
features: load(),
};
let svc = RouteGuideServer::new(route_guide);
Server::builder().add_service(svc).serve(addr).await?;
Ok(())
}
สิ่งที่เกิดขึ้นใน main()
มีดังนี้
- ระบุพอร์ตที่เราต้องการใช้เพื่อรอรับคำขอของไคลเอ็นต์
- สร้าง
RouteGuideService
ที่โหลดฟีเจอร์ไว้ - สร้างอินสแตนซ์ของเซิร์ฟเวอร์ gRPC โดยใช้
RouteGuideServer::new()
โดยใช้บริการที่เราสร้างขึ้น - ลงทะเบียนการติดตั้งใช้งานบริการกับเซิร์ฟเวอร์ gRPC
- เรียกใช้
serve()
ในเซิร์ฟเวอร์พร้อมรายละเอียดพอร์ตเพื่อรอการบล็อกจนกว่าจะมีการสิ้นสุดกระบวนการ
6. สร้างไคลเอ็นต์
ในส่วนนี้ เราจะมาดูการสร้างไคลเอ็นต์ Rust สำหรับบริการ RouteGuide ใน src/client/client.rs
กัน
ก่อนอื่น ให้นำโค้ดที่สร้างขึ้นมาไว้ในขอบเขต
mod grpc_pb {
grpc::include_generated_proto!("generated", "routeguide");
}
use grpc_pb::route_guide_client::RouteGuideClient;
use grpc_pb::{Point, Rectangle, RouteNote};
วิธีการเรียกใช้บริการ
ตอนนี้มาดูวิธีเรียกใช้เมธอดบริการกัน ใน gRPC-Rust, RPC จะทํางานในโหมดบล็อก/ซิงโครนัส ซึ่งหมายความว่าการเรียก RPC จะรอให้เซิร์ฟเวอร์ตอบกลับ และจะส่งคืนการตอบกลับหรือข้อผิดพลาด
RPC การสตรีมฝั่งเซิร์ฟเวอร์
ในส่วนนี้ เราจะเรียกใช้เมธอดการสตรีมฝั่งเซิร์ฟเวอร์ ListFeatures
ซึ่งจะแสดงผลสตรีมของออบเจ็กต์ Feature
ทางภูมิศาสตร์
async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let rectangle = proto!(Rectangle {
lo: proto!(Point {
latitude: 400_000_000,
longitude: -750_000_000,
}),
hi: proto!(Point {
latitude: 420_000_000,
longitude: -730_000_000,
}),
});
let mut stream = client
.list_features(Request::new(rectangle))
.await?
.into_inner();
while let Some(feature) = stream.message().await? {
println!("FEATURE: Name = \"{}\", Lat = {}, Lon = {}",
feature.name(),
feature.location().latitude(),
feature.location().longitude());
}
Ok(())
}
เราส่งคำขอไปยังเมธอดและรับอินสแตนซ์ของ ListFeaturesStream
กลับมา ไคลเอ็นต์สามารถใช้ ListFeaturesStream
สตรีมเพื่ออ่านการตอบกลับของเซิร์ฟเวอร์ได้ เราใช้วิธี ListFeaturesStream
message()
เพื่ออ่านการตอบกลับของเซิร์ฟเวอร์ไปยังออบเจ็กต์บัฟเฟอร์โปรโตคอลการตอบกลับ (ในกรณีนี้คือ Feature
) ซ้ำๆ จนกว่าจะไม่มีข้อความอีก
RPC การสตรีมฝั่งไคลเอ็นต์
ในส่วน record_route
เราจะเปลี่ยนเวกเตอร์ของจุดเป็นสตรีม จากนั้นเราจะส่งสตรีมนี้ไปยัง record_route()
เป็นคำขอและรับการตอบกลับ RouteSummary
รายการเดียวหลังจากที่เซิร์ฟเวอร์ประมวลผลสตรีมเสร็จสมบูรณ์แล้ว
async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let mut rng = rand::rng();
let point_count: i32 = rng.random_range(2..100);
let mut points = vec![];
for _ in 0..=point_count {
points.push(random_point(&mut rng))
}
println!("Traversing {} points", points.len());
let request = Request::new(tokio_stream::iter(points));
match client.record_route(request).await {
Ok(response) => {
let response = response.into_inner();
println!("SUMMARY: Feature Count = {}, Distance = {}", response.feature_count(), response.distance())},
Err(e) => println!("something went wrong: {e:?}"),
}
Ok(())
}
RPC การสตรีมแบบ 2 ทาง
สุดท้าย มาดู RPC การสตรีมแบบสองทาง RouteChat()
กัน เราส่งคำขอสตรีมไปยังเมธอดที่เราเขียน และรับสตรีมกลับมาซึ่งเราสามารถอ่านข้อความได้ คราวนี้เราจะแสดงค่าผ่านสตรีมของเมธอดในขณะที่เซิร์ฟเวอร์ยังคงเขียนข้อความลงในสตรีมข้อความ
async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let start = time::Instant::now();
let outbound = async_stream::stream! {
let mut interval = time::interval(Duration::from_secs(1));
for _ in 0..10 {
let time = interval.tick().await;
let elapsed = time.duration_since(start);
let note = proto!(RouteNote {
location: proto!(Point {
latitude: 409146138 + elapsed.as_secs() as i32,
longitude: -746188906,
}),
message: format!("at {elapsed:?}"),
});
yield note;
}
};
let response = client.route_chat(Request::new(outbound)).await?;
let mut inbound = response.into_inner();
while let Some(note) = inbound.message().await? {
println!("Note: Latitude = {}, Longitude = {}, Message = \"{}\"",
note.location().latitude(),
note.location().longitude(),
note.message());
}
Ok(())
}
แม้ว่าแต่ละฝ่ายจะได้รับข้อความของอีกฝ่ายตามลำดับที่เขียนเสมอ แต่ทั้งไคลเอ็นต์และเซิร์ฟเวอร์สามารถอ่านและเขียนได้ตามลำดับใดก็ได้ โดยสตรีมจะทำงานอย่างอิสระโดยสมบูรณ์
เมธอดตัวช่วยการโทร
หากต้องการเรียกใช้เมธอดบริการ เราต้องสร้างแชแนลเพื่อสื่อสารกับเซิร์ฟเวอร์ก่อน เราสร้างสิ่งนี้โดยการสร้างปลายทางก่อน จากนั้นเชื่อมต่อกับปลายทางนั้น และส่งช่องที่สร้างขึ้นเมื่อเชื่อมต่อกับ RouteGuideClient::new()
ดังนี้
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoint to connect to
let endpoint = Endpoint::new("http://[::1]:10000")?;
let channel = endpoint.connect().await?;
// Create a new client
let mut client = RouteGuideClient::new(channel);
Ok(())
}
ใน main()
ให้เรียกใช้เมธอดที่เราเพิ่งสร้าง
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoint to connect to
let endpoint = Endpoint::new("http://[::1]:10000")?;
let channel = endpoint.connect().await?;
// Create a new client
let mut client = RouteGuideClient::new(channel);
println!("\n*** SERVER STREAMING ***");
print_features(&mut client).await?;
println!("\n*** CLIENT STREAMING ***");
run_record_route(&mut client).await?;
println!("\n*** BIDIRECTIONAL STREAMING ***");
run_route_chat(&mut client).await?;
Ok(())
}
7. ลองเลย
ก่อนอื่น หากต้องการเรียกใช้ไคลเอ็นต์และเซิร์ฟเวอร์ ให้เพิ่มเป็นเป้าหมายไบนารีลงใน Crate เราต้องแก้ไข Cargo.toml ตามนั้น
[[bin]]
name = "routeguide-server"
path = "src/server/server.rs"
[[bin]]
name = "routeguide-client"
path = "src/client/client.rs"
เช่นเดียวกับโปรเจ็กต์อื่นๆ เรายังต้องคำนึงถึงการขึ้นต่อกันที่จำเป็นต่อการทำงานของโค้ดด้วย สำหรับโปรเจ็กต์ Rust การอ้างอิงจะอยู่ใน Cargo.toml
เราได้แสดงรายการการอ้างอิงที่จำเป็นในไฟล์ Cargo.toml
แล้ว
จากนั้นเรียกใช้คำสั่งต่อไปนี้จากไดเรกทอรีที่ทำงานอยู่
- เรียกใช้เซิร์ฟเวอร์ในเทอร์มินัลหนึ่ง
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-server
- เรียกใช้ไคลเอ็นต์จากเทอร์มินัลอื่นโดยใช้คำสั่งต่อไปนี้
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-client
คุณจะเห็นเอาต์พุตดังนี้
*** SERVER STREAMING *** FEATURE: Name = "Patriots Path, Mendham, NJ 07945, USA", Lat = 407838351, Lon = -746143763 FEATURE: Name = "101 New Jersey 10, Whippany, NJ 07981, USA", Lat = 408122808, Lon = -743999179 FEATURE: Name = "U.S. 6, Shohola, PA 18458, USA", Lat = 413628156, Lon = -749015468 ... *** CLIENT STREAMING *** Traversing 86 points SUMMARY: Feature Count = 0, Distance = 803709356 *** BIDIRECTIONAL STREAMING *** Note: Latitude = 409146138, Longitude = -746188906, Message = "at 112.45µs" Note: Latitude = 409146139, Longitude = -746188906, Message = "at 1.00011245s" Note: Latitude = 409146140, Longitude = -746188906, Message = "at 2.00011245s"
8. ขั้นตอนถัดไป
- ดูวิธีการทำงานของ gRPC ในข้อมูลเบื้องต้นเกี่ยวกับ gRPC และแนวคิดหลัก
- ดูบทแนะนำเกี่ยวกับพื้นฐาน