gRPC-Python 使用入门 - 流式传输

1. 简介

在此 Codelab 中,您将使用 gRPC-Python 创建一个客户端和服务器,它们将构成以 Python 编写的路线映射应用的基础。

在本教程结束时,您将拥有一个使用 gRPC 连接到远程服务器的客户端,该客户端可以获取有关客户端路线上的功能的信息、创建客户端路线的摘要,并与服务器和其他客户端交换路线信息(例如流量更新)。

该服务在 Protocol Buffers 文件中定义,该文件将用于为客户端和服务器生成样板代码,以便它们可以相互通信,从而节省您实现该功能的时间和精力。

生成的代码不仅能处理服务器与客户端之间复杂的通信,还能处理数据序列化和反序列化。

学习内容

  • 如何使用 Protocol Buffers 定义服务 API。
  • 如何使用自动代码生成功能,基于 Protocol Buffers 定义构建基于 gRPC 的客户端和服务器。
  • 了解使用 gRPC 进行的客户端-服务器流式传输通信。

此 Codelab 适合刚开始使用 gRPC 或希望复习 gRPC 的 Python 开发者,也适合任何对构建分布式系统感兴趣的人。无需具备 gRPC 经验。

2. 准备工作

所需条件

  • Python 3.9 或更高版本。我们建议使用 Python 3.13。如需查看针对具体平台的安装说明,请参阅 Python 设置和使用。或者,使用 uv pyenv 等工具安装非系统 Python。
  • 使用 pip 安装 Python 软件包。
  • venv 来创建 Python 虚拟环境。

ensurepipvenv 软件包是 Python 标准库的一部分,通常默认可用。

不过,一些基于 Debian 的发行版(包括 Ubuntu)在重新分发 Python 时选择排除这些文件。如需安装软件包,请运行以下命令:

sudo apt install python3-pip python3-venv

获取代码

为了简化学习过程,此 Codelab 提供了预构建的源代码框架,可帮助您快速入门。以下步骤将引导您完成应用,包括使用 grpc_tools.protoc Protocol Buffer 编译器插件生成 gRPC 代码。

grpc-codelabs

此 Codelab 的框架源代码位于 codelabs/grpc-python-streaming/start_here 目录中。如果您不想自己实现代码,可以在 completed 目录中找到已完成的源代码。

首先,创建 Codelab 工作目录并进入该目录:

mkdir grpc-python-streaming && cd grpc-python-streaming

下载并解压缩 Codelab:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-python-streaming/start_here

或者,您也可以下载仅包含 Codelab 目录的 .zip 文件,然后手动将其解压缩。

3. 定义消息和服务

第一步是使用协议缓冲区定义应用的 gRPC 服务、RPC 方法以及请求和响应消息类型。您的服务将提供:

  • 服务器实现并由客户端调用的 RPC 方法 ListFeaturesRecordRouteRouteChat
  • 消息类型 PointFeatureRectangleRouteNoteRouteSummary,它们是在调用 RPC 方法时在客户端和服务器之间交换的数据结构。

这些 RPC 方法及其消息类型都将在所提供源代码的 protos/route_guide.proto 文件中定义。

协议缓冲区通常称为 protobuf。如需详细了解 gRPC 术语,请参阅 gRPC 的核心概念、架构和生命周期

定义消息类型

在源代码的 protos/route_guide.proto 文件中,首先定义 Point 消息类型。Point 表示地图上的纬度-经度坐标对。在此 Codelab 中,请使用整数作为坐标:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

数字 12message 结构中每个字段的唯一 ID 编号。

接下来,定义 Feature 消息类型。Feature 使用 string 字段来表示由 Point 指定的位置处的某个事物的名称或邮政地址:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

为了将某个区域内的多个点流式传输到客户端,您需要一条 Rectangle 消息,该消息表示一个经纬度矩形,以两个对角点 lohi 表示:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

此外,还包括表示在给定时间点发送的消息的 RouteNote 消息:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

最后,您需要收到 RouteSummary 消息。此消息是针对 RecordRoute RPC 收到的响应,下一部分将对此进行说明。它包含收到的各个点的数量、检测到的特征数量,以及作为每个点之间距离的累积总和的总覆盖距离。

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

定义服务方法

如需定义服务,请在 .proto 文件中指定命名服务。route_guide.proto 文件具有名为 RouteGuideservice 结构,用于定义应用服务提供的一个或多个方法。

在服务定义中定义 RPC 方法时,您需要指定其请求和响应类型。在此 Codelab 部分中,我们来定义以下内容:

ListFeatures

获取给定 Rectangle 内可用的 Feature 对象。由于矩形可能覆盖大面积区域并包含大量要素,因此结果会以流式传输,而不是一次性返回。

对于此应用,您将使用服务器端流式传输 RPC:客户端向服务器发送请求,并获取一个数据流来读取一系列返回的消息。客户端会从返回的数据流中读取数据,直到没有更多消息为止。如示例所示,您可以通过在响应类型之前放置 stream 关键字来指定服务器端流式传输方法。

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

接受正在遍历的路线上的点流,并在遍历完成后返回 RouteSummary

在这种情况下,客户端流式传输 RPC 非常适合:客户端写入一系列消息并使用提供的数据流将其发送到服务器。客户端完成消息写入后,会等待服务器读取所有消息并返回其响应。您可以通过在请求类型之前放置 stream 关键字来指定客户端流式传输方法。

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

接受在遍历路线时发送的 RouteNotes 流,同时接收其他 RouteNotes(例如来自其他用户的 RouteNotes)。

这正是双向流式传输的用例。一种双向流式传输 RPC,其中双方都使用读写数据流发送一系列消息。这两个数据流独立运行,因此客户端和服务器可以按任意顺序读取和写入数据:例如,服务器可以等待接收所有客户端消息,然后再写入响应;也可以交替读取消息和写入消息;或者以其他方式组合读取和写入操作。每个数据流中的消息顺序都会保留。您可以通过在请求和响应之前放置 stream 关键字来指定此类方法。

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. 生成客户端和服务器代码

接下来,使用协议缓冲区编译器从 .proto 文件中为客户端和服务器生成样板 gRPC 代码。

对于 gRPC Python 代码生成,我们创建了 grpcio-tools。其中包括:

  1. 常规的 protoc 编译器,用于根据 message 定义生成 Python 代码。
  2. gRPC protobuf 插件,可根据 service 定义生成 Python 代码(客户端和服务器桩)。

我们将使用 pip 安装 grpcio-tools Python 软件包。让我们创建一个新的 Python 虚拟环境 (venv),以将项目的依赖项与系统软件包隔离开来:

python3 -m venv --upgrade-deps .venv

在 bash/zsh shell 中激活虚拟环境:

source .venv/bin/activate

对于 Windows 和非标准 shell,请参阅 https://docs.python.org/3/library/venv.html#how-venvs-work 中的表格。

接下来,安装 grpcio-tools(这也会安装 grpcio 软件包):

pip install grpcio-tools

使用以下命令生成 Python 样板代码:

python -m grpc_tools.protoc --proto_path=./protos  \
 --python_out=. --pyi_out=. --grpc_python_out=. \
 ./protos/route_guide.proto

这将为我们在 route_guide.proto 中定义的接口生成以下文件:

  1. route_guide_pb2.py 包含根据 message 定义动态创建类的代码。
  2. route_guide_pb2.pyi 是根据 message 定义生成的 “桩文件”或“类型提示文件”。它仅包含签名,不包含实现。IDE 可以使用桩文件来提供更好的自动补全和错误检测功能。
  3. route_guide_pb2_grpc.py 是根据 service 定义生成的,包含特定于 gRPC 的类和函数。

gRPC 专用代码包含:

  1. RouteGuideStubgRPC 客户端可以使用它来调用 RouteGuide RPC。
  2. RouteGuideServicer,用于定义 RouteGuide 服务实现的接口。
  3. 用于向 gRPC 服务器注册 RouteGuideServiceradd_RouteGuideServicer_to_server 函数。

5. 创建服务器

首先,我们来看看如何创建 RouteGuide 服务器。创建和运行 RouteGuide 服务器分为两个工作项:

  • 实现从服务定义生成的服务接口,其中包含执行服务实际“工作”的函数。
  • 运行 gRPC 服务器以监听来自客户端的请求并传输响应。

我们来看看 route_guide_server.py

实现 RouteGuide

route_guide_server.py 具有一个 RouteGuideServicer 类,该类是生成的类 route_guide_pb2_grpc.RouteGuideServicer 的子类:

# RouteGuideServicer provides an implementation of the methods of the RouteGuide service.
class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):

RouteGuideServicer 实现所有 RouteGuide 服务方法。

服务器端流式 RPC

ListFeatures 是一个响应流式传输 RPC,可向客户端发送多个 Feature

def ListFeatures(self, request, context):
    """List all features contained within the given Rectangle."""
    left = min(request.lo.longitude, request.hi.longitude)
    right = max(request.lo.longitude, request.hi.longitude)
    top = max(request.lo.latitude, request.hi.latitude)
    bottom = min(request.lo.latitude, request.hi.latitude)
    for feature in self.db:
        lat, lng = feature.location.latitude, feature.location.longitude
        if left <= lng <= right and bottom <= lat <= top:
            yield feature

在此示例中,请求消息是一个 route_guide_pb2.Rectangle,客户端希望在其中找到 Feature。该方法会生成零个或多个响应,而不是返回单个响应。

客户端流式 RPC

请求流式传输方法 RecordRoute 使用请求值的 迭代器,并返回单个响应值。

def RecordRoute(self, request_iterator, context):
    """Calculate statistics about the trip composed of Points."""
    point_count = 0
    feature_count = 0
    distance = 0.0
    prev_point = None

    start_time = time.time()
    for point in request_iterator:
        point_count += 1
        if get_feature(self.db, point):
            feature_count += 1
        if prev_point:
            distance += get_distance(prev_point, point)
        prev_point = point

    elapsed_time = time.time() - start_time
    return route_guide_pb2.RouteSummary(
        point_count=point_count,
        feature_count=feature_count,
        distance=int(distance),
        elapsed_time=int(elapsed_time),
    )

双向流式传输 RPC

最后,我们来看看双向流式 RPC RouteChat()

def RouteChat(self, request_iterator, context):
    """
    Receive a stream of message/location pairs, and responds with
    a stream of all previous messages for the given location.
    """
    prev_notes = []
    for new_note in request_iterator:
        for prev_note in prev_notes:
            if prev_note.location == new_note.location:
                yield prev_note
        prev_notes.append(new_note)

此方法的语义是请求流式传输方法和响应流式传输方法的组合。它会传递一个请求值迭代器,并且本身也是一个响应值迭代器。

启动服务器

实现所有 RouteGuide 方法后,下一步是启动 gRPC 服务器,以便客户端能够实际使用您的服务:

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    route_guide_pb2_grpc.add_RouteGuideServicer_to_server(
        RouteGuideServicer(),
        server,
    )
    listen_addr = "localhost:50051"
    server.add_insecure_port(listen_addr)
    print(f"Starting server on {listen_addr}")
    server.start()
    server.wait_for_termination()

服务器 start() 方法是非阻塞的。系统将实例化一个新线程来处理请求。调用 server.start() 的线程通常在此期间不会有任何其他工作要做。在这种情况下,您可以调用 server.wait_for_termination() 来干净地阻塞调用线程,直到服务器终止。

6. 创建客户端

我们来看看 route_guide_client.py

创建桩

如需调用服务方法,我们首先需要创建 stub

我们在 run() 方法中实例化了从 .proto. 生成的 route_guide_pb2_grpc 模块的 RouteGuideStub 类:

with grpc.insecure_channel("localhost:50051") as channel:
    stub = route_guide_pb2_grpc.RouteGuideStub(channel)

请注意,此处 channel 用作上下文管理器,一旦解释器离开 with 块,它就会自动关闭。

调用服务方法

对于返回单个响应(“response-unary”方法)的 RPC 方法,gRPC Python 同时支持同步(阻塞)和异步(非阻塞)控制流语义。对于响应流式 RPC 方法,调用会立即返回响应值的迭代器。对该迭代器的 next() 方法的调用会阻塞,直到可从迭代器中产生响应为止。

服务器端流式 RPC

调用响应流式传输 ListFeatures 类似于处理序列类型:

def guide_list_features(stub):
    _lo = route_guide_pb2.Point(latitude=400000000, longitude=-750000000)
    _hi = route_guide_pb2.Point(latitude=420000000, longitude=-730000000)
    rectangle = route_guide_pb2.Rectangle(
        lo=_lo,
        hi=_hi,
    )
    print("Looking for features between 40, -75 and 42, -73")

    features = stub.ListFeatures(rectangle)
    for feature in features:
        print(
            f"Feature called '{feature.name}'"
            f" at {format_point(feature.location)}"
        )

客户端流式 RPC

调用请求流式传输 RecordRoute 类似于将迭代器传递给本地方法。与上面也返回单个响应的简单 RPC 类似,它可以同步调用:

def guide_record_route(stub):
    feature_list = route_guide_resources.read_route_guide_database()
    route_iterator = generate_route(feature_list)

    route_summary = stub.RecordRoute(route_iterator)
    print(f"Finished trip with {route_summary.point_count} points")
    print(f"Passed {route_summary.feature_count} features")
    print(f"Traveled {route_summary.distance} meters")
    print(f"It took {route_summary.elapsed_time} seconds")

双向流式传输 RPC

调用双向流式 RouteChat 具有(与服务侧相同)请求流式和响应流式语义的组合。

生成请求消息,并使用 yield 逐个发送这些消息。

def generate_notes():
    home = route_guide_pb2.Point(latitude=1, longitude=1)
    work = route_guide_pb2.Point(latitude=2, longitude=2)
    notes = [
        make_route_note("Departing from home", home),
        make_route_note("Arrived at work", work),
        make_route_note("Having lunch at work", work),
        make_route_note("Departing from work", work),
        make_route_note("Arrived home", home),
    ]
    for note in notes:
        print(
            f"Sending RouteNote for {format_point(note.location)}:"
            f" {note.message}"
        )
        yield note
        # Sleep to simulate moving from one point to another.
        # Only for demonstrating the order of the messages.
        time.sleep(0.1)

接收和处理服务器响应:

def guide_route_chat(stub):
    responses = stub.RouteChat(generate_notes())
    for response in responses:
        print(
            "< Found previous note at"
            f" {format_point(response.location)}: {response.message}"
        )

调用辅助方法

在 run 中,执行我们刚刚创建的方法,并向其传递 stub

print("-------------- ListFeatures --------------")
guide_list_features(stub)
print("-------------- RecordRoute --------------")
guide_record_route(stub)
print("-------------- RouteChat --------------")
guide_route_chat(stub)

7. 试试看

运行服务器:

python route_guide_server.py

在另一个终端中,再次激活虚拟环境 (source .venv/bin/activate)),然后运行客户端:

python route_guide_client.py

我们来看一下输出。

ListFeatures

首先,您会看到功能列表。每个功能都是在服务器发现它们位于所请求的矩形内时,从服务器(服务器端流式传输 RPC)流式传输的:

-------------- ListFeatures --------------
Looking for features between 40, -75 and 42, -73
Feature called 'Patriots Path, Mendham, NJ 07945, USA' at (lat=407838351, lng=-746143763)
Feature called '101 New Jersey 10, Whippany, NJ 07981, USA' at (lat=408122808, lng=-743999179)
Feature called 'U.S. 6, Shohola, PA 18458, USA' at (lat=413628156, lng=-749015468)
Feature called '5 Conners Road, Kingston, NY 12401, USA' at (lat=419999544, lng=-740371136)
...

RecordRoute

其次,RecordRoute 展示了从客户端流式传输到服务器的随机访问点列表(客户端流式传输 RPC)

-------------- RecordRoute --------------
Visiting point (lat=410395868, lng=-744972325)
Visiting point (lat=404310607, lng=-740282632)
Visiting point (lat=403966326, lng=-748519297)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=406589790, lng=-743560121)
Visiting point (lat=410322033, lng=-747871659)
Visiting point (lat=415464475, lng=-747175374)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=402647019, lng=-747071791)
Visiting point (lat=414638017, lng=-745957854)

当客户端完成所有访问点的流式传输后,它将收到来自服务器的非流式响应(即一元 RPC)。此响应将包含对客户完整路线执行的计算的摘要。

Finished trip with 10 points
Passed 10 features
Traveled 654743 meters
It took 0 seconds

RouteChat

最后,RouteChat 输出演示了双向流式传输。当客户端“访问”homework 点时,它会通过向服务器发送 RouteNote 来记录该点的注释。如果某个点已被访问过,服务器会回流该点的所有先前备注。

-------------- RouteChat --------------
Sending RouteNote for (lat=1, lng=1): Departing from home
Sending RouteNote for (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Having lunch at work
< Found previous note at (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Departing from work
< Found previous note at (lat=2, lng=2): Arrived at work
< Found previous note at (lat=2, lng=2): Having lunch at work
Sending RouteNote for (lat=1, lng=1): Arrived home
< Found previous note at (lat=1, lng=1): Departing from home

8. 后续步骤