1. 简介
在此 Codelab 中,您将使用 gRPC-Python 创建一个客户端和服务器,它们将构成以 Python 编写的路线映射应用的基础。
在本教程结束时,您将拥有一个使用 gRPC 连接到远程服务器的客户端,该客户端可以获取有关客户端路线上的功能的信息、创建客户端路线的摘要,并与服务器和其他客户端交换路线信息(例如流量更新)。
该服务在 Protocol Buffers 文件中定义,该文件将用于为客户端和服务器生成样板代码,以便它们可以相互通信,从而节省您实现该功能的时间和精力。
生成的代码不仅能处理服务器与客户端之间复杂的通信,还能处理数据序列化和反序列化。
学习内容
- 如何使用 Protocol Buffers 定义服务 API。
- 如何使用自动代码生成功能,基于 Protocol Buffers 定义构建基于 gRPC 的客户端和服务器。
- 了解使用 gRPC 进行的客户端-服务器流式传输通信。
此 Codelab 适合刚开始使用 gRPC 或希望复习 gRPC 的 Python 开发者,也适合任何对构建分布式系统感兴趣的人。无需具备 gRPC 经验。
2. 准备工作
所需条件
- Python 3.9 或更高版本。我们建议使用 Python 3.13。如需查看针对具体平台的安装说明,请参阅 Python 设置和使用。或者,使用 uv 或 pyenv 等工具安装非系统 Python。
- 使用 pip 安装 Python 软件包。
- venv 来创建 Python 虚拟环境。
ensurepip
和 venv
软件包是 Python 标准库的一部分,通常默认可用。
不过,一些基于 Debian 的发行版(包括 Ubuntu)在重新分发 Python 时选择排除这些文件。如需安装软件包,请运行以下命令:
sudo apt install python3-pip python3-venv
获取代码
为了简化学习过程,此 Codelab 提供了预构建的源代码框架,可帮助您快速入门。以下步骤将引导您完成应用,包括使用 grpc_tools.protoc
Protocol Buffer 编译器插件生成 gRPC 代码。
grpc-codelabs
此 Codelab 的框架源代码位于 codelabs/grpc-python-streaming/start_here 目录中。如果您不想自己实现代码,可以在 completed
目录中找到已完成的源代码。
首先,创建 Codelab 工作目录并进入该目录:
mkdir grpc-python-streaming && cd grpc-python-streaming
下载并解压缩 Codelab:
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
| tar xvz --strip-components=4 \
grpc-codelabs-1/codelabs/grpc-python-streaming/start_here
或者,您也可以下载仅包含 Codelab 目录的 .zip 文件,然后手动将其解压缩。
3. 定义消息和服务
第一步是使用协议缓冲区定义应用的 gRPC 服务、RPC 方法以及请求和响应消息类型。您的服务将提供:
- 服务器实现并由客户端调用的 RPC 方法
ListFeatures
、RecordRoute
和RouteChat
。 - 消息类型
Point
、Feature
、Rectangle
、RouteNote
和RouteSummary
,它们是在调用 RPC 方法时在客户端和服务器之间交换的数据结构。
这些 RPC 方法及其消息类型都将在所提供源代码的 protos/route_guide.proto
文件中定义。
协议缓冲区通常称为 protobuf。如需详细了解 gRPC 术语,请参阅 gRPC 的核心概念、架构和生命周期。
定义消息类型
在源代码的 protos/route_guide.proto
文件中,首先定义 Point
消息类型。Point
表示地图上的纬度-经度坐标对。在此 Codelab 中,请使用整数作为坐标:
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
数字 1
和 2
是 message
结构中每个字段的唯一 ID 编号。
接下来,定义 Feature
消息类型。Feature
使用 string
字段来表示由 Point
指定的位置处的某个事物的名称或邮政地址:
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
为了将某个区域内的多个点流式传输到客户端,您需要一条 Rectangle
消息,该消息表示一个经纬度矩形,以两个对角点 lo
和 hi
表示:
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
此外,还包括表示在给定时间点发送的消息的 RouteNote
消息:
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
最后,您需要收到 RouteSummary
消息。此消息是针对 RecordRoute
RPC 收到的响应,下一部分将对此进行说明。它包含收到的各个点的数量、检测到的特征数量,以及作为每个点之间距离的累积总和的总覆盖距离。
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
定义服务方法
如需定义服务,请在 .proto
文件中指定命名服务。route_guide.proto
文件具有名为 RouteGuide
的 service
结构,用于定义应用服务提供的一个或多个方法。
在服务定义中定义 RPC
方法时,您需要指定其请求和响应类型。在此 Codelab 部分中,我们来定义以下内容:
ListFeatures
获取给定 Rectangle
内可用的 Feature
对象。由于矩形可能覆盖大面积区域并包含大量要素,因此结果会以流式传输,而不是一次性返回。
对于此应用,您将使用服务器端流式传输 RPC:客户端向服务器发送请求,并获取一个数据流来读取一系列返回的消息。客户端会从返回的数据流中读取数据,直到没有更多消息为止。如示例所示,您可以通过在响应类型之前放置 stream 关键字来指定服务器端流式传输方法。
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
接受正在遍历的路线上的点流,并在遍历完成后返回 RouteSummary
。
在这种情况下,客户端流式传输 RPC 非常适合:客户端写入一系列消息并使用提供的数据流将其发送到服务器。客户端完成消息写入后,会等待服务器读取所有消息并返回其响应。您可以通过在请求类型之前放置 stream 关键字来指定客户端流式传输方法。
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
接受在遍历路线时发送的 RouteNotes
流,同时接收其他 RouteNotes
(例如来自其他用户的 RouteNotes
)。
这正是双向流式传输的用例。一种双向流式传输 RPC,其中双方都使用读写数据流发送一系列消息。这两个数据流独立运行,因此客户端和服务器可以按任意顺序读取和写入数据:例如,服务器可以等待接收所有客户端消息,然后再写入响应;也可以交替读取消息和写入消息;或者以其他方式组合读取和写入操作。每个数据流中的消息顺序都会保留。您可以通过在请求和响应之前放置 stream 关键字来指定此类方法。
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. 生成客户端和服务器代码
接下来,使用协议缓冲区编译器从 .proto
文件中为客户端和服务器生成样板 gRPC 代码。
对于 gRPC Python 代码生成,我们创建了 grpcio-tools。其中包括:
- 常规的 protoc 编译器,用于根据
message
定义生成 Python 代码。 - gRPC protobuf 插件,可根据
service
定义生成 Python 代码(客户端和服务器桩)。
我们将使用 pip 安装 grpcio-tools
Python 软件包。让我们创建一个新的 Python 虚拟环境 (venv),以将项目的依赖项与系统软件包隔离开来:
python3 -m venv --upgrade-deps .venv
在 bash/zsh shell 中激活虚拟环境:
source .venv/bin/activate
对于 Windows 和非标准 shell,请参阅 https://docs.python.org/3/library/venv.html#how-venvs-work 中的表格。
接下来,安装 grpcio-tools(这也会安装 grpcio 软件包):
pip install grpcio-tools
使用以下命令生成 Python 样板代码:
python -m grpc_tools.protoc --proto_path=./protos \
--python_out=. --pyi_out=. --grpc_python_out=. \
./protos/route_guide.proto
这将为我们在 route_guide.proto
中定义的接口生成以下文件:
route_guide_pb2.py
包含根据message
定义动态创建类的代码。route_guide_pb2.pyi
是根据message
定义生成的 “桩文件”或“类型提示文件”。它仅包含签名,不包含实现。IDE 可以使用桩文件来提供更好的自动补全和错误检测功能。route_guide_pb2_grpc.py
是根据service
定义生成的,包含特定于 gRPC 的类和函数。
gRPC 专用代码包含:
5. 创建服务器
首先,我们来看看如何创建 RouteGuide
服务器。创建和运行 RouteGuide
服务器分为两个工作项:
- 实现从服务定义生成的服务接口,其中包含执行服务实际“工作”的函数。
- 运行 gRPC 服务器以监听来自客户端的请求并传输响应。
我们来看看 route_guide_server.py
。
实现 RouteGuide
route_guide_server.py
具有一个 RouteGuideServicer
类,该类是生成的类 route_guide_pb2_grpc.RouteGuideServicer
的子类:
# RouteGuideServicer provides an implementation of the methods of the RouteGuide service.
class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):
RouteGuideServicer
实现所有 RouteGuide
服务方法。
服务器端流式 RPC
ListFeatures
是一个响应流式传输 RPC,可向客户端发送多个 Feature
:
def ListFeatures(self, request, context):
"""List all features contained within the given Rectangle."""
left = min(request.lo.longitude, request.hi.longitude)
right = max(request.lo.longitude, request.hi.longitude)
top = max(request.lo.latitude, request.hi.latitude)
bottom = min(request.lo.latitude, request.hi.latitude)
for feature in self.db:
lat, lng = feature.location.latitude, feature.location.longitude
if left <= lng <= right and bottom <= lat <= top:
yield feature
在此示例中,请求消息是一个 route_guide_pb2.Rectangle
,客户端希望在其中找到 Feature
。该方法会生成零个或多个响应,而不是返回单个响应。
客户端流式 RPC
请求流式传输方法 RecordRoute
使用请求值的 迭代器,并返回单个响应值。
def RecordRoute(self, request_iterator, context):
"""Calculate statistics about the trip composed of Points."""
point_count = 0
feature_count = 0
distance = 0.0
prev_point = None
start_time = time.time()
for point in request_iterator:
point_count += 1
if get_feature(self.db, point):
feature_count += 1
if prev_point:
distance += get_distance(prev_point, point)
prev_point = point
elapsed_time = time.time() - start_time
return route_guide_pb2.RouteSummary(
point_count=point_count,
feature_count=feature_count,
distance=int(distance),
elapsed_time=int(elapsed_time),
)
双向流式传输 RPC
最后,我们来看看双向流式 RPC RouteChat()
:
def RouteChat(self, request_iterator, context):
"""
Receive a stream of message/location pairs, and responds with
a stream of all previous messages for the given location.
"""
prev_notes = []
for new_note in request_iterator:
for prev_note in prev_notes:
if prev_note.location == new_note.location:
yield prev_note
prev_notes.append(new_note)
此方法的语义是请求流式传输方法和响应流式传输方法的组合。它会传递一个请求值迭代器,并且本身也是一个响应值迭代器。
启动服务器
实现所有 RouteGuide
方法后,下一步是启动 gRPC 服务器,以便客户端能够实际使用您的服务:
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
route_guide_pb2_grpc.add_RouteGuideServicer_to_server(
RouteGuideServicer(),
server,
)
listen_addr = "localhost:50051"
server.add_insecure_port(listen_addr)
print(f"Starting server on {listen_addr}")
server.start()
server.wait_for_termination()
服务器 start()
方法是非阻塞的。系统将实例化一个新线程来处理请求。调用 server.start()
的线程通常在此期间不会有任何其他工作要做。在这种情况下,您可以调用 server.wait_for_termination()
来干净地阻塞调用线程,直到服务器终止。
6. 创建客户端
我们来看看 route_guide_client.py
。
创建桩
如需调用服务方法,我们首先需要创建 stub。
我们在 run()
方法中实例化了从 .proto.
生成的 route_guide_pb2_grpc
模块的 RouteGuideStub
类:
with grpc.insecure_channel("localhost:50051") as channel:
stub = route_guide_pb2_grpc.RouteGuideStub(channel)
请注意,此处 channel
用作上下文管理器,一旦解释器离开 with
块,它就会自动关闭。
调用服务方法
对于返回单个响应(“response-unary”方法)的 RPC 方法,gRPC Python 同时支持同步(阻塞)和异步(非阻塞)控制流语义。对于响应流式 RPC 方法,调用会立即返回响应值的迭代器。对该迭代器的 next()
方法的调用会阻塞,直到可从迭代器中产生响应为止。
服务器端流式 RPC
调用响应流式传输 ListFeatures
类似于处理序列类型:
def guide_list_features(stub):
_lo = route_guide_pb2.Point(latitude=400000000, longitude=-750000000)
_hi = route_guide_pb2.Point(latitude=420000000, longitude=-730000000)
rectangle = route_guide_pb2.Rectangle(
lo=_lo,
hi=_hi,
)
print("Looking for features between 40, -75 and 42, -73")
features = stub.ListFeatures(rectangle)
for feature in features:
print(
f"Feature called '{feature.name}'"
f" at {format_point(feature.location)}"
)
客户端流式 RPC
调用请求流式传输 RecordRoute
类似于将迭代器传递给本地方法。与上面也返回单个响应的简单 RPC 类似,它可以同步调用:
def guide_record_route(stub):
feature_list = route_guide_resources.read_route_guide_database()
route_iterator = generate_route(feature_list)
route_summary = stub.RecordRoute(route_iterator)
print(f"Finished trip with {route_summary.point_count} points")
print(f"Passed {route_summary.feature_count} features")
print(f"Traveled {route_summary.distance} meters")
print(f"It took {route_summary.elapsed_time} seconds")
双向流式传输 RPC
调用双向流式 RouteChat
具有(与服务侧相同)请求流式和响应流式语义的组合。
生成请求消息,并使用 yield
逐个发送这些消息。
def generate_notes():
home = route_guide_pb2.Point(latitude=1, longitude=1)
work = route_guide_pb2.Point(latitude=2, longitude=2)
notes = [
make_route_note("Departing from home", home),
make_route_note("Arrived at work", work),
make_route_note("Having lunch at work", work),
make_route_note("Departing from work", work),
make_route_note("Arrived home", home),
]
for note in notes:
print(
f"Sending RouteNote for {format_point(note.location)}:"
f" {note.message}"
)
yield note
# Sleep to simulate moving from one point to another.
# Only for demonstrating the order of the messages.
time.sleep(0.1)
接收和处理服务器响应:
def guide_route_chat(stub):
responses = stub.RouteChat(generate_notes())
for response in responses:
print(
"< Found previous note at"
f" {format_point(response.location)}: {response.message}"
)
调用辅助方法
在 run 中,执行我们刚刚创建的方法,并向其传递 stub
。
print("-------------- ListFeatures --------------")
guide_list_features(stub)
print("-------------- RecordRoute --------------")
guide_record_route(stub)
print("-------------- RouteChat --------------")
guide_route_chat(stub)
7. 试试看
运行服务器:
python route_guide_server.py
在另一个终端中,再次激活虚拟环境 (source .venv/bin/activate)
),然后运行客户端:
python route_guide_client.py
我们来看一下输出。
ListFeatures
首先,您会看到功能列表。每个功能都是在服务器发现它们位于所请求的矩形内时,从服务器(服务器端流式传输 RPC)流式传输的:
-------------- ListFeatures -------------- Looking for features between 40, -75 and 42, -73 Feature called 'Patriots Path, Mendham, NJ 07945, USA' at (lat=407838351, lng=-746143763) Feature called '101 New Jersey 10, Whippany, NJ 07981, USA' at (lat=408122808, lng=-743999179) Feature called 'U.S. 6, Shohola, PA 18458, USA' at (lat=413628156, lng=-749015468) Feature called '5 Conners Road, Kingston, NY 12401, USA' at (lat=419999544, lng=-740371136) ...
RecordRoute
其次,RecordRoute
展示了从客户端流式传输到服务器的随机访问点列表(客户端流式传输 RPC):
-------------- RecordRoute -------------- Visiting point (lat=410395868, lng=-744972325) Visiting point (lat=404310607, lng=-740282632) Visiting point (lat=403966326, lng=-748519297) Visiting point (lat=407586880, lng=-741670168) Visiting point (lat=406589790, lng=-743560121) Visiting point (lat=410322033, lng=-747871659) Visiting point (lat=415464475, lng=-747175374) Visiting point (lat=407586880, lng=-741670168) Visiting point (lat=402647019, lng=-747071791) Visiting point (lat=414638017, lng=-745957854)
当客户端完成所有访问点的流式传输后,它将收到来自服务器的非流式响应(即一元 RPC)。此响应将包含对客户完整路线执行的计算的摘要。
Finished trip with 10 points Passed 10 features Traveled 654743 meters It took 0 seconds
RouteChat
最后,RouteChat
输出演示了双向流式传输。当客户端“访问”home
或 work
点时,它会通过向服务器发送 RouteNote 来记录该点的注释。如果某个点已被访问过,服务器会回流该点的所有先前备注。
-------------- RouteChat -------------- Sending RouteNote for (lat=1, lng=1): Departing from home Sending RouteNote for (lat=2, lng=2): Arrived at work Sending RouteNote for (lat=2, lng=2): Having lunch at work < Found previous note at (lat=2, lng=2): Arrived at work Sending RouteNote for (lat=2, lng=2): Departing from work < Found previous note at (lat=2, lng=2): Arrived at work < Found previous note at (lat=2, lng=2): Having lunch at work Sending RouteNote for (lat=1, lng=1): Arrived home < Found previous note at (lat=1, lng=1): Departing from home
8. 后续步骤
- 如需了解 gRPC 的运作方式,请参阅 gRPC 简介和核心概念
- 完成基础知识教程
- 探索 Python API 参考文档