gRPC-Rust 入门 - 流式传输

1. 简介

在此 Codelab 中,您将使用 gRPC-Rust 创建一个客户端和服务器,它们将构成用 Rust 编写的路线映射应用的基础。

在本教程结束时,您将拥有一个使用 gRPC 连接到远程服务器的客户端,该客户端可以获取有关客户端路线上的功能的信息、创建客户端路线的摘要,并与服务器和其他客户端交换路线信息(例如流量更新)。

该服务在 Protocol Buffers 文件中定义,该文件将用于为客户端和服务器生成样板代码,以便它们能够相互通信,从而节省您实现该功能的时间和精力。

生成的代码不仅能处理服务器与客户端之间复杂的通信,还能处理数据序列化和反序列化。

学习内容

  • 如何使用 Protocol Buffers 定义服务 API。
  • 如何使用自动代码生成功能基于 Protocol Buffers 定义构建基于 gRPC 的客户端和服务器。
  • 了解使用 gRPC 进行的客户端-服务器流式传输通信。

本 Codelab 适用于刚开始使用 gRPC 或希望复习 gRPC 的 Rust 开发者,也适用于对构建分布式系统感兴趣的任何人。无需具备 gRPC 经验。

2. 准备工作

前提条件

请确保您已安装以下各项:

  • GCC。请按照此处的说明操作
  • Rust,最新版本。请按照此处的安装说明操作。

获取代码

为了避免您完全从头开始,本 Codelab 提供了一个应用源代码框架供您完成。以下步骤将展示如何完成应用,包括使用 Protocol Buffer 编译器插件生成样板 gRPC 代码。

首先,创建 Codelab 工作目录并 cd 到该目录:

mkdir streaming-grpc-rust-getting-started && cd streaming-grpc-rust-getting-started

下载并解压缩 Codelab:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-rust-streaming/start_here

或者,您也可以下载仅包含 Codelab 目录的 .zip 文件,然后手动将其解压缩。

如果您想跳过输入实现代码的步骤,可以在 GitHub 上找到完整的源代码

3. 定义消息和服务

第一步是使用协议缓冲区定义应用的 gRPC 服务、RPC 方法以及请求和响应消息类型。您的服务将提供:

  • 服务器实现并由客户端调用的 RPC 方法 ListFeaturesRecordRouteRouteChat
  • 消息类型 PointFeatureRectangleRouteNoteRouteSummary,它们是调用上述方法时在客户端和服务器之间交换的数据结构。

这些 RPC 方法及其消息类型都将在所提供源代码的 proto/routeguide.proto 文件中定义。

协议缓冲区通常称为 protobuf。如需详细了解 gRPC 术语,请参阅 gRPC 的核心概念、架构和生命周期

定义消息类型

我们先来定义 RPC 将使用的消息。在源代码的 routeguide/route_guide.proto 文件中,首先定义 Point 消息类型。Point 表示地图上的纬度-经度坐标对。在此 Codelab 中,请使用整数作为坐标:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

数字 12message 结构中每个字段的唯一 ID 编号。

接下来,定义 Feature 消息类型。Feature 使用 string 字段来表示由 Point 指定的位置处的某个事物的名称或邮政地址:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

接下来是 Rectangle 消息,表示一个经纬度矩形,以两个对角点“lo”和“hi”表示。

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

也是一个 RouteNote 消息,表示在给定时间点发送的消息。

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

我们还需要您提供 RouteSummary 消息。此消息是针对 RecordRoute RPC 收到的响应,下一部分将对此进行说明。它包含收到的各个点的数量、检测到的特征数量,以及作为每个点之间距离的累积总和的总覆盖距离。

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

定义服务方法

我们先定义服务,然后再定义消息。如需定义服务,请在 .proto 文件中指定命名服务。proto/routeguide.proto 文件具有名为 RouteGuideservice 结构,用于定义应用服务提供的一个或多个方法。

在服务定义中定义 RPC 方法,并指定其请求和响应类型。在此 Codelab 部分中,我们来定义以下内容:

ListFeatures

获取给定 Rectangle 内可用的 Feature。由于矩形可能覆盖大面积区域并包含大量要素,因此结果会以流式传输,而不是一次性返回(例如,在包含重复字段的响应消息中)。

此 RPC 的合适类型是服务器端流式传输 RPC:客户端向服务器发送请求,并获取一个数据流来读取返回的一系列消息。客户端会从返回的数据流中读取数据,直到没有更多消息为止。如示例所示,您可以通过在响应类型之前放置 stream 关键字来指定服务器端流式传输方法。

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

接受正在遍历的路线上的 Point 流,并在遍历完成后返回 RouteSummary

在这种情况下,客户端流式传输 RPC 似乎很合适:客户端写入一系列消息并使用提供的数据流将其发送到服务器。客户端完成消息写入后,会等待服务器读取所有消息并返回其响应。您可以在请求类型之前放置 stream 关键字,以指定客户端流式传输方法。

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

接受在遍历路线时发送的 RouteNote 流,同时接收其他 RouteNote(例如来自其他用户的 RouteNote)。

这正是双向流式传输的应用场景。双向流式传输 RPC 的双方都使用读写数据流发送一系列消息。这两个数据流独立运行,因此客户端和服务器可以按任意顺序进行读取和写入。

例如,服务器可以等待接收到所有客户端消息后再写入响应,也可以交替读取消息和写入消息,或者以其他读取和写入组合方式进行操作。

每个数据流中的消息顺序都会保留。您可以通过在请求和响应之前放置 stream 关键字来指定此类方法。

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. 生成客户端和服务器代码

我们已为您提供生成目录中 .proto 文件中的生成代码。

如果您想了解如何自行从 .proto 文件生成代码,或者想对 .proto 文件进行任何更改并进行测试,请参阅这些说明

生成的代码包含:

  • 消息类型 PointFeatureRectangleRouteNoteRouteSummary 的结构体定义。
  • 我们需要实现的服务特征:route_guide_server::RouteGuide
  • 我们将用于调用服务器的客户端类型:route_guide_client::RouteGuideClient<T>

接下来,我们将在服务器端实现这些方法,以便当客户端发送请求时,服务器可以回复答案。

5. 实现服务

首先,我们来看看如何创建 RouteGuide 服务器。要让 RouteGuide 服务正常运行,需要完成以下两个部分:

  • 实现根据服务定义生成的服务接口:执行服务的实际“工作”。
  • 运行 gRPC 服务器以监听来自客户端的请求,并将这些请求分派给正确的方法实现。

src/server/server.rs 中,我们可以通过 gRPC 的 include_generated_proto! 宏将生成的代码纳入作用域,并导入 RouteGuide 特征和 Point

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

pub use grpc_pb::{
    route_guide_server::{RouteGuideServer, RouteGuide},
    Point, Feature, Rectangle, RouteNote, RouteSummary
};

我们可以先定义一个结构来表示我们的服务。目前,我们可以在 src/server/server.rs 上执行此操作:

#[derive(Debug)]
pub struct RouteGuideService {
    features: Vec<Feature>,
}

现在,我们需要实现生成的代码中的 route_guide_server::RouteGuide 特征。

实现 RouteGuide

我们需要实现生成的 RouteGuide 接口。实现方式如下所示。此内容已包含在模板中。

#[tonic::async_trait]
impl RouteGuide for RouteGuideService {
    async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        ...
    }

    async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        ...
    }

    async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        ...
    }
}

下面我们来详细了解每种 RPC 实现。

服务器端流式处理 RPC

我们先从 ListFeatures 开始。这是一个服务器端流式传输 RPC,因此我们需要向客户端发送多个 Feature

async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        println!("ListFeatures = {:?}", request);

        let (tx, rx) = mpsc::channel(4);
        let features = self.features.clone();

        tokio::spawn(async move {
            for feature in &features[..] {
                if in_range(&feature.location().to_owned(), request.get_ref()) {
                    println!("  => send {feature:?}");
                    tx.send(Ok(feature.clone())).await.unwrap();
                }
            }
            println!(" /// done sending");
        });

        let output_stream = ReceiverStream::new(rx);
        Ok(Response::new(Box::pin(output_stream)))
    }

如您所见,我们获得了一个请求对象(客户端希望在其中找到 FeaturesRectangle)。这次,我们需要返回一个值流。我们创建一个渠道并生成一个新的异步任务,在该任务中执行查找,并将满足我们约束条件的特征发送到该渠道。通道的 Stream 部分会返回给调用方,并封装在 tonic::Response 中。

客户端流式 RPC

现在,我们来看一个稍微复杂一点的示例:客户端流式传输方法 RecordRoute,其中我们从客户端获取 Points 的流,并返回包含客户行程信息的单个 RouteSummary。它会获取一个数据流作为输入,服务器可以使用该数据流来读取和写入消息。它可以利用其 next() 方法遍历客户端消息,并返回单个响应。

async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        println!("RecordRoute");
        let mut stream = request.into_inner();
        let mut summary = RouteSummary::default();
        let mut last_point = None;
        let now = Instant::now();

        while let Some(point) = stream.next().await {
            let point = point?;
            println!("  ==> Point = {point:?}");

            // Increment the point count
            summary.set_point_count(summary.point_count() + 1);

            // Find features
            for feature in &self.features[..] {
                if feature.location().latitude() == point.latitude() {
                    if feature.location().longitude() == point.longitude(){
                        summary.set_feature_count(summary.feature_count() + 1);
                    }
                }
            }

            // Calculate the distance
            if let Some(ref last_point) = last_point {
                let new_dist = summary.distance() + calc_distance(last_point, &point);
                summary.set_distance(new_dist);
            }
            last_point = Some(point);
        }
        summary.set_elapsed_time(now.elapsed().as_secs() as i32);
        Ok(Response::new(summary))
    }

在该方法正文中,我们使用流的 next() 方法反复读取客户端的请求到请求对象(在本例中为 Point),直到没有更多消息为止。如果此值为 None,则表示数据流仍正常,可以继续读取。

双向流式传输 RPC

最后,我们来看看双向流式 RPC RouteChat()

async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        println!("RouteChat");

        let mut notes: HashMap<(i32, i32), Vec<RouteNote>> = HashMap::new();
        let mut stream = request.into_inner();

        let output = async_stream::try_stream! {
            while let Some(note) = stream.next().await {
                let note = note?;
                let location = note.location();
                let key = (location.latitude(), location.longitude());
                let location_notes = notes.entry(key).or_insert(vec![]);
                location_notes.push(note);
                for note in location_notes {
                    yield note.clone();
                }
            }
        };
        Ok(Response::new(Box::pin(output)))
    }

这次,我们获得了一个流,与客户端流式传输示例中一样,该流可用于读取和写入消息。不过,这次我们在客户端仍在向其消息流写入消息时,通过方法的数据流返回值。此处的读取和写入语法与客户端流式传输方法非常相似,不同之处在于服务器会返回 RouteChatStream。虽然每一方始终会按写入顺序接收对方的消息,但客户端和服务器都可以按任意顺序读取和写入,因为数据流是完全独立运行的。

我们使用 try_stream! 创建 output 流,这表示该流可能会返回错误。

启动服务器

实现此方法后,我们还需要启动 gRPC 服务器,以便客户端能够实际使用我们的服务。填写 main()

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:10000".parse().unwrap();
    println!("RouteGuideServer listening on: {addr}");
    let route_guide = RouteGuideService {
        features: load(),
    };
    let svc = RouteGuideServer::new(route_guide);
    Server::builder().add_service(svc).serve(addr).await?;
    Ok(())
}

以下是 main() 中发生的情况,分步说明如下:

  1. 指定我们想要用于侦听客户端请求的端口
  2. 创建加载了功能的 RouteGuideService
  3. 使用我们创建的服务,通过 RouteGuideServer::new() 创建 gRPC 服务器的实例。
  4. 向 gRPC 服务器注册服务实现。
  5. 在服务器上使用我们的端口详细信息调用 serve(),以进行阻塞等待,直到进程被终止。

6. 创建客户端

在本部分中,我们将了解如何为 src/client/client.rs 中的 RouteGuide 服务创建 Rust 客户端。

首先,将生成的代码纳入范围。

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

use grpc_pb::route_guide_client::RouteGuideClient;
use grpc_pb::{Point, Rectangle, RouteNote};

调用服务方法

现在,我们来看看如何调用服务方法。在 gRPC-Rust 中,RPC 以阻塞/同步模式运行,这意味着 RPC 调用会等待服务器响应,并返回响应或错误。

服务器端流式处理 RPC

我们在此处调用服务器端流式传输方法 ListFeatures,该方法会返回地理位置 Feature 对象的流。

async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let rectangle = proto!(Rectangle {
        lo: proto!(Point {
            latitude: 400_000_000,
            longitude: -750_000_000,
        }),
        hi: proto!(Point {
            latitude: 420_000_000,
            longitude: -730_000_000,
        }),
    });

    let mut stream = client
        .list_features(Request::new(rectangle))
        .await?
        .into_inner();

    while let Some(feature) = stream.message().await? {
        println!("FEATURE: Name = \"{}\", Lat = {}, Lon = {}",
            feature.name(),
            feature.location().latitude(),
            feature.location().longitude());
        }
    Ok(())
}

我们向该方法传递一个请求,并获得一个 ListFeaturesStream 实例。客户端可以使用 ListFeaturesStream 流来读取服务器的响应。我们使用 ListFeaturesStreammessage() 方法反复读取服务器对响应协议缓冲区对象(在本例中为 Feature)的响应,直到没有更多消息为止。

客户端流式 RPC

对于 record_route,我们将点向量转换为流。然后,我们将此流作为请求传递到 record_route() 中,并在服务器完全处理该流后获得单个 RouteSummary 响应。

async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let mut rng = rand::rng();
    let point_count: i32 = rng.random_range(2..100);

    let mut points = vec![];
    for _ in 0..=point_count {
        points.push(random_point(&mut rng))
    }

    println!("Traversing {} points", points.len());
    let request = Request::new(tokio_stream::iter(points));

    match client.record_route(request).await {
        Ok(response) => {
            let response = response.into_inner();
            println!("SUMMARY: Feature Count = {}, Distance = {}", response.feature_count(), response.distance())},
        Err(e) => println!("something went wrong: {e:?}"),
    }

    Ok(())
}

双向流式传输 RPC

最后,我们来看看双向流式 RPC RouteChat()。我们向该方法传递一个写入到的流请求,并返回一个可以从中读取消息的流。这次,我们在服务器仍在向其消息流写入消息时,通过我们方法的流返回值。

async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let start = time::Instant::now();
    let outbound = async_stream::stream! {
        let mut interval = time::interval(Duration::from_secs(1));
        for _ in 0..10 {
            let time = interval.tick().await;
            let elapsed = time.duration_since(start);
            let note = proto!(RouteNote {
                location: proto!(Point {
                    latitude: 409146138 + elapsed.as_secs() as i32,
                    longitude: -746188906,
                }),
                message: format!("at {elapsed:?}"),
            });
            yield note;
        }
    };
    let response = client.route_chat(Request::new(outbound)).await?;
    let mut inbound = response.into_inner();
    while let Some(note) = inbound.message().await? {
        println!("Note: Latitude = {}, Longitude = {}, Message = \"{}\"",
            note.location().latitude(),
            note.location().longitude(),
            note.message());
        }
    Ok(())
}

虽然每一方始终会按写入顺序接收对方的消息,但客户端和服务器都可以按任意顺序读取和写入,因为数据流是完全独立运行的。

调用辅助方法

如需调用服务方法,我们首先需要创建一个与服务器通信的渠道。我们通过以下方式创建此对象:先创建一个端点,连接到该端点,然后传递连接到 RouteGuideClient::new() 时创建的渠道:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel); 
    Ok(())
}

main() 中,执行我们刚刚创建的方法。

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel);

    println!("\n*** SERVER STREAMING ***");
    print_features(&mut client).await?;

    println!("\n*** CLIENT STREAMING ***");
    run_record_route(&mut client).await?;

    println!("\n*** BIDIRECTIONAL STREAMING ***");
    run_route_chat(&mut client).await?;

    Ok(())
}

7. 试试看

首先,为了运行客户端和服务器,我们将它们作为二进制目标添加到 crate 中。我们需要相应地修改 Cargo.toml:

[[bin]]
name = "routeguide-server"
path = "src/server/server.rs"

[[bin]]
name = "routeguide-client"
path = "src/client/client.rs"

与任何项目一样,我们还需要考虑代码正常运行所需的依赖项。对于 Rust 项目,依赖项将位于 Cargo.toml 中。我们已在 Cargo.toml 文件中列出了必要的依赖项。

然后,从我们的工作目录执行以下命令:

  1. 在一个终端中运行服务器:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-server 
  1. 从另一个终端运行客户端:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-client

您将会看到如下输出:

*** SERVER STREAMING ***
FEATURE: Name = "Patriots Path, Mendham, NJ 07945, USA", Lat = 407838351, Lon = -746143763
FEATURE: Name = "101 New Jersey 10, Whippany, NJ 07981, USA", Lat = 408122808, Lon = -743999179
FEATURE: Name = "U.S. 6, Shohola, PA 18458, USA", Lat = 413628156, Lon = -749015468
...
*** CLIENT STREAMING ***
Traversing 86 points
SUMMARY: Feature Count = 0, Distance = 803709356

*** BIDIRECTIONAL STREAMING ***
Note: Latitude = 409146138, Longitude = -746188906, Message = "at 112.45µs"
Note: Latitude = 409146139, Longitude = -746188906, Message = "at 1.00011245s"
Note: Latitude = 409146140, Longitude = -746188906, Message = "at 2.00011245s"

8. 后续步骤