1. 简介
在此 Codelab 中,您将使用 gRPC-Rust 创建一个客户端和服务器,它们将构成用 Rust 编写的路线映射应用的基础。
在本教程结束时,您将拥有一个使用 gRPC 连接到远程服务器的客户端,该客户端可以获取有关客户端路线上的功能的信息、创建客户端路线的摘要,并与服务器和其他客户端交换路线信息(例如流量更新)。
该服务在 Protocol Buffers 文件中定义,该文件将用于为客户端和服务器生成样板代码,以便它们能够相互通信,从而节省您实现该功能的时间和精力。
生成的代码不仅能处理服务器与客户端之间复杂的通信,还能处理数据序列化和反序列化。
学习内容
- 如何使用 Protocol Buffers 定义服务 API。
- 如何使用自动代码生成功能基于 Protocol Buffers 定义构建基于 gRPC 的客户端和服务器。
- 了解使用 gRPC 进行的客户端-服务器流式传输通信。
本 Codelab 适用于刚开始使用 gRPC 或希望复习 gRPC 的 Rust 开发者,也适用于对构建分布式系统感兴趣的任何人。无需具备 gRPC 经验。
2. 准备工作
前提条件
请确保您已安装以下各项:
获取代码
为了避免您完全从头开始,本 Codelab 提供了一个应用源代码框架供您完成。以下步骤将展示如何完成应用,包括使用 Protocol Buffer 编译器插件生成样板 gRPC 代码。
首先,创建 Codelab 工作目录并 cd
到该目录:
mkdir streaming-grpc-rust-getting-started && cd streaming-grpc-rust-getting-started
下载并解压缩 Codelab:
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \ | tar xvz --strip-components=4 \ grpc-codelabs-1/codelabs/grpc-rust-streaming/start_here
或者,您也可以下载仅包含 Codelab 目录的 .zip 文件,然后手动将其解压缩。
如果您想跳过输入实现代码的步骤,可以在 GitHub 上找到完整的源代码。
3. 定义消息和服务
第一步是使用协议缓冲区定义应用的 gRPC 服务、RPC 方法以及请求和响应消息类型。您的服务将提供:
- 服务器实现并由客户端调用的 RPC 方法
ListFeatures
、RecordRoute
和RouteChat
。 - 消息类型
Point
、Feature
、Rectangle
、RouteNote
和RouteSummary
,它们是调用上述方法时在客户端和服务器之间交换的数据结构。
这些 RPC 方法及其消息类型都将在所提供源代码的 proto/routeguide.proto
文件中定义。
协议缓冲区通常称为 protobuf。如需详细了解 gRPC 术语,请参阅 gRPC 的核心概念、架构和生命周期。
定义消息类型
我们先来定义 RPC 将使用的消息。在源代码的 routeguide/route_guide.proto
文件中,首先定义 Point
消息类型。Point
表示地图上的纬度-经度坐标对。在此 Codelab 中,请使用整数作为坐标:
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
数字 1
和 2
是 message
结构中每个字段的唯一 ID 编号。
接下来,定义 Feature
消息类型。Feature
使用 string
字段来表示由 Point
指定的位置处的某个事物的名称或邮政地址:
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
接下来是 Rectangle
消息,表示一个经纬度矩形,以两个对角点“lo”和“hi”表示。
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
也是一个 RouteNote
消息,表示在给定时间点发送的消息。
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
我们还需要您提供 RouteSummary
消息。此消息是针对 RecordRoute
RPC 收到的响应,下一部分将对此进行说明。它包含收到的各个点的数量、检测到的特征数量,以及作为每个点之间距离的累积总和的总覆盖距离。
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
定义服务方法
我们先定义服务,然后再定义消息。如需定义服务,请在 .proto
文件中指定命名服务。proto/routeguide.proto
文件具有名为 RouteGuide
的 service
结构,用于定义应用服务提供的一个或多个方法。
在服务定义中定义 RPC 方法,并指定其请求和响应类型。在此 Codelab 部分中,我们来定义以下内容:
ListFeatures
获取给定 Rectangle
内可用的 Feature
。由于矩形可能覆盖大面积区域并包含大量要素,因此结果会以流式传输,而不是一次性返回(例如,在包含重复字段的响应消息中)。
此 RPC 的合适类型是服务器端流式传输 RPC:客户端向服务器发送请求,并获取一个数据流来读取返回的一系列消息。客户端会从返回的数据流中读取数据,直到没有更多消息为止。如示例所示,您可以通过在响应类型之前放置 stream
关键字来指定服务器端流式传输方法。
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
接受正在遍历的路线上的 Point
流,并在遍历完成后返回 RouteSummary
。
在这种情况下,客户端流式传输 RPC 似乎很合适:客户端写入一系列消息并使用提供的数据流将其发送到服务器。客户端完成消息写入后,会等待服务器读取所有消息并返回其响应。您可以在请求类型之前放置 stream
关键字,以指定客户端流式传输方法。
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
接受在遍历路线时发送的 RouteNote
流,同时接收其他 RouteNote
(例如来自其他用户的 RouteNote
)。
这正是双向流式传输的应用场景。双向流式传输 RPC 的双方都使用读写数据流发送一系列消息。这两个数据流独立运行,因此客户端和服务器可以按任意顺序进行读取和写入。
例如,服务器可以等待接收到所有客户端消息后再写入响应,也可以交替读取消息和写入消息,或者以其他读取和写入组合方式进行操作。
每个数据流中的消息顺序都会保留。您可以通过在请求和响应之前放置 stream
关键字来指定此类方法。
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. 生成客户端和服务器代码
我们已为您提供生成目录中 .proto
文件中的生成代码。
如果您想了解如何自行从 .proto
文件生成代码,或者想对 .proto
文件进行任何更改并进行测试,请参阅这些说明。
生成的代码包含:
- 消息类型
Point
、Feature
、Rectangle
、RouteNote
和RouteSummary
的结构体定义。 - 我们需要实现的服务特征:
route_guide_server::RouteGuide
。 - 我们将用于调用服务器的客户端类型:
route_guide_client::RouteGuideClient<T>
。
接下来,我们将在服务器端实现这些方法,以便当客户端发送请求时,服务器可以回复答案。
5. 实现服务
首先,我们来看看如何创建 RouteGuide
服务器。要让 RouteGuide
服务正常运行,需要完成以下两个部分:
- 实现根据服务定义生成的服务接口:执行服务的实际“工作”。
- 运行 gRPC 服务器以监听来自客户端的请求,并将这些请求分派给正确的方法实现。
在 src/server/server.rs
中,我们可以通过 gRPC 的 include_generated_proto!
宏将生成的代码纳入作用域,并导入 RouteGuide
特征和 Point
。
mod grpc_pb {
grpc::include_generated_proto!("generated", "routeguide");
}
pub use grpc_pb::{
route_guide_server::{RouteGuideServer, RouteGuide},
Point, Feature, Rectangle, RouteNote, RouteSummary
};
我们可以先定义一个结构来表示我们的服务。目前,我们可以在 src/server/server.rs
上执行此操作:
#[derive(Debug)]
pub struct RouteGuideService {
features: Vec<Feature>,
}
现在,我们需要实现生成的代码中的 route_guide_server::RouteGuide
特征。
实现 RouteGuide
我们需要实现生成的 RouteGuide
接口。实现方式如下所示。此内容已包含在模板中。
#[tonic::async_trait] impl RouteGuide for RouteGuideService { async fn list_features( &self, request: Request<Rectangle>, ) -> Result<Response<ListFeaturesStream>, Status> { ... } async fn record_route( &self, request: Request<tonic::Streaming<Point>>, ) -> Result<Response<RouteSummary>, Status> { ... } async fn route_chat( &self, request: Request<tonic::Streaming<RouteNote>>, ) -> Result<Response<RouteChatStream>, Status> { ... } }
下面我们来详细了解每种 RPC 实现。
服务器端流式处理 RPC
我们先从 ListFeatures
开始。这是一个服务器端流式传输 RPC,因此我们需要向客户端发送多个 Feature
。
async fn list_features(
&self,
request: Request<Rectangle>,
) -> Result<Response<ListFeaturesStream>, Status> {
println!("ListFeatures = {:?}", request);
let (tx, rx) = mpsc::channel(4);
let features = self.features.clone();
tokio::spawn(async move {
for feature in &features[..] {
if in_range(&feature.location().to_owned(), request.get_ref()) {
println!(" => send {feature:?}");
tx.send(Ok(feature.clone())).await.unwrap();
}
}
println!(" /// done sending");
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream)))
}
如您所见,我们获得了一个请求对象(客户端希望在其中找到 Features
的 Rectangle
)。这次,我们需要返回一个值流。我们创建一个渠道并生成一个新的异步任务,在该任务中执行查找,并将满足我们约束条件的特征发送到该渠道。通道的 Stream 部分会返回给调用方,并封装在 tonic::Response
中。
客户端流式 RPC
现在,我们来看一个稍微复杂一点的示例:客户端流式传输方法 RecordRoute
,其中我们从客户端获取 Points
的流,并返回包含客户行程信息的单个 RouteSummary
。它会获取一个数据流作为输入,服务器可以使用该数据流来读取和写入消息。它可以利用其 next()
方法遍历客户端消息,并返回单个响应。
async fn record_route(
&self,
request: Request<tonic::Streaming<Point>>,
) -> Result<Response<RouteSummary>, Status> {
println!("RecordRoute");
let mut stream = request.into_inner();
let mut summary = RouteSummary::default();
let mut last_point = None;
let now = Instant::now();
while let Some(point) = stream.next().await {
let point = point?;
println!(" ==> Point = {point:?}");
// Increment the point count
summary.set_point_count(summary.point_count() + 1);
// Find features
for feature in &self.features[..] {
if feature.location().latitude() == point.latitude() {
if feature.location().longitude() == point.longitude(){
summary.set_feature_count(summary.feature_count() + 1);
}
}
}
// Calculate the distance
if let Some(ref last_point) = last_point {
let new_dist = summary.distance() + calc_distance(last_point, &point);
summary.set_distance(new_dist);
}
last_point = Some(point);
}
summary.set_elapsed_time(now.elapsed().as_secs() as i32);
Ok(Response::new(summary))
}
在该方法正文中,我们使用流的 next()
方法反复读取客户端的请求到请求对象(在本例中为 Point
),直到没有更多消息为止。如果此值为 None,则表示数据流仍正常,可以继续读取。
双向流式传输 RPC
最后,我们来看看双向流式 RPC RouteChat()
。
async fn route_chat(
&self,
request: Request<tonic::Streaming<RouteNote>>,
) -> Result<Response<RouteChatStream>, Status> {
println!("RouteChat");
let mut notes: HashMap<(i32, i32), Vec<RouteNote>> = HashMap::new();
let mut stream = request.into_inner();
let output = async_stream::try_stream! {
while let Some(note) = stream.next().await {
let note = note?;
let location = note.location();
let key = (location.latitude(), location.longitude());
let location_notes = notes.entry(key).or_insert(vec![]);
location_notes.push(note);
for note in location_notes {
yield note.clone();
}
}
};
Ok(Response::new(Box::pin(output)))
}
这次,我们获得了一个流,与客户端流式传输示例中一样,该流可用于读取和写入消息。不过,这次我们在客户端仍在向其消息流写入消息时,通过方法的数据流返回值。此处的读取和写入语法与客户端流式传输方法非常相似,不同之处在于服务器会返回 RouteChatStream
。虽然每一方始终会按写入顺序接收对方的消息,但客户端和服务器都可以按任意顺序读取和写入,因为数据流是完全独立运行的。
我们使用 try_stream!
创建 output
流,这表示该流可能会返回错误。
启动服务器
实现此方法后,我们还需要启动 gRPC 服务器,以便客户端能够实际使用我们的服务。填写 main()
。
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:10000".parse().unwrap();
println!("RouteGuideServer listening on: {addr}");
let route_guide = RouteGuideService {
features: load(),
};
let svc = RouteGuideServer::new(route_guide);
Server::builder().add_service(svc).serve(addr).await?;
Ok(())
}
以下是 main()
中发生的情况,分步说明如下:
- 指定我们想要用于侦听客户端请求的端口
- 创建加载了功能的
RouteGuideService
- 使用我们创建的服务,通过
RouteGuideServer::new()
创建 gRPC 服务器的实例。 - 向 gRPC 服务器注册服务实现。
- 在服务器上使用我们的端口详细信息调用
serve()
,以进行阻塞等待,直到进程被终止。
6. 创建客户端
在本部分中,我们将了解如何为 src/client/client.rs
中的 RouteGuide 服务创建 Rust 客户端。
首先,将生成的代码纳入范围。
mod grpc_pb {
grpc::include_generated_proto!("generated", "routeguide");
}
use grpc_pb::route_guide_client::RouteGuideClient;
use grpc_pb::{Point, Rectangle, RouteNote};
调用服务方法
现在,我们来看看如何调用服务方法。在 gRPC-Rust 中,RPC 以阻塞/同步模式运行,这意味着 RPC 调用会等待服务器响应,并返回响应或错误。
服务器端流式处理 RPC
我们在此处调用服务器端流式传输方法 ListFeatures
,该方法会返回地理位置 Feature
对象的流。
async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let rectangle = proto!(Rectangle {
lo: proto!(Point {
latitude: 400_000_000,
longitude: -750_000_000,
}),
hi: proto!(Point {
latitude: 420_000_000,
longitude: -730_000_000,
}),
});
let mut stream = client
.list_features(Request::new(rectangle))
.await?
.into_inner();
while let Some(feature) = stream.message().await? {
println!("FEATURE: Name = \"{}\", Lat = {}, Lon = {}",
feature.name(),
feature.location().latitude(),
feature.location().longitude());
}
Ok(())
}
我们向该方法传递一个请求,并获得一个 ListFeaturesStream
实例。客户端可以使用 ListFeaturesStream
流来读取服务器的响应。我们使用 ListFeaturesStream
的 message()
方法反复读取服务器对响应协议缓冲区对象(在本例中为 Feature
)的响应,直到没有更多消息为止。
客户端流式 RPC
对于 record_route
,我们将点向量转换为流。然后,我们将此流作为请求传递到 record_route()
中,并在服务器完全处理该流后获得单个 RouteSummary
响应。
async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let mut rng = rand::rng();
let point_count: i32 = rng.random_range(2..100);
let mut points = vec![];
for _ in 0..=point_count {
points.push(random_point(&mut rng))
}
println!("Traversing {} points", points.len());
let request = Request::new(tokio_stream::iter(points));
match client.record_route(request).await {
Ok(response) => {
let response = response.into_inner();
println!("SUMMARY: Feature Count = {}, Distance = {}", response.feature_count(), response.distance())},
Err(e) => println!("something went wrong: {e:?}"),
}
Ok(())
}
双向流式传输 RPC
最后,我们来看看双向流式 RPC RouteChat()
。我们向该方法传递一个写入到的流请求,并返回一个可以从中读取消息的流。这次,我们在服务器仍在向其消息流写入消息时,通过我们方法的流返回值。
async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let start = time::Instant::now();
let outbound = async_stream::stream! {
let mut interval = time::interval(Duration::from_secs(1));
for _ in 0..10 {
let time = interval.tick().await;
let elapsed = time.duration_since(start);
let note = proto!(RouteNote {
location: proto!(Point {
latitude: 409146138 + elapsed.as_secs() as i32,
longitude: -746188906,
}),
message: format!("at {elapsed:?}"),
});
yield note;
}
};
let response = client.route_chat(Request::new(outbound)).await?;
let mut inbound = response.into_inner();
while let Some(note) = inbound.message().await? {
println!("Note: Latitude = {}, Longitude = {}, Message = \"{}\"",
note.location().latitude(),
note.location().longitude(),
note.message());
}
Ok(())
}
虽然每一方始终会按写入顺序接收对方的消息,但客户端和服务器都可以按任意顺序读取和写入,因为数据流是完全独立运行的。
调用辅助方法
如需调用服务方法,我们首先需要创建一个与服务器通信的渠道。我们通过以下方式创建此对象:先创建一个端点,连接到该端点,然后传递连接到 RouteGuideClient::new()
时创建的渠道:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoint to connect to
let endpoint = Endpoint::new("http://[::1]:10000")?;
let channel = endpoint.connect().await?;
// Create a new client
let mut client = RouteGuideClient::new(channel);
Ok(())
}
在 main()
中,执行我们刚刚创建的方法。
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoint to connect to
let endpoint = Endpoint::new("http://[::1]:10000")?;
let channel = endpoint.connect().await?;
// Create a new client
let mut client = RouteGuideClient::new(channel);
println!("\n*** SERVER STREAMING ***");
print_features(&mut client).await?;
println!("\n*** CLIENT STREAMING ***");
run_record_route(&mut client).await?;
println!("\n*** BIDIRECTIONAL STREAMING ***");
run_route_chat(&mut client).await?;
Ok(())
}
7. 试试看
首先,为了运行客户端和服务器,我们将它们作为二进制目标添加到 crate 中。我们需要相应地修改 Cargo.toml:
[[bin]]
name = "routeguide-server"
path = "src/server/server.rs"
[[bin]]
name = "routeguide-client"
path = "src/client/client.rs"
与任何项目一样,我们还需要考虑代码正常运行所需的依赖项。对于 Rust 项目,依赖项将位于 Cargo.toml
中。我们已在 Cargo.toml
文件中列出了必要的依赖项。
然后,从我们的工作目录执行以下命令:
- 在一个终端中运行服务器:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-server
- 从另一个终端运行客户端:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-client
您将会看到如下输出:
*** SERVER STREAMING *** FEATURE: Name = "Patriots Path, Mendham, NJ 07945, USA", Lat = 407838351, Lon = -746143763 FEATURE: Name = "101 New Jersey 10, Whippany, NJ 07981, USA", Lat = 408122808, Lon = -743999179 FEATURE: Name = "U.S. 6, Shohola, PA 18458, USA", Lat = 413628156, Lon = -749015468 ... *** CLIENT STREAMING *** Traversing 86 points SUMMARY: Feature Count = 0, Distance = 803709356 *** BIDIRECTIONAL STREAMING *** Note: Latitude = 409146138, Longitude = -746188906, Message = "at 112.45µs" Note: Latitude = 409146139, Longitude = -746188906, Message = "at 1.00011245s" Note: Latitude = 409146140, Longitude = -746188906, Message = "at 2.00011245s"