1. 简介
在此 Codelab 中,您将使用 gRPC-Go 创建一个客户端和服务器,它们将构成使用 Go 编写的路线映射应用的基础。
在本教程结束时,您将拥有一个使用 gRPC 连接到远程服务器的客户端,该客户端可以获取有关客户端路线上的功能的信息、创建客户端路线的摘要,并与服务器和其他客户端交换路线信息(例如流量更新)。
该服务在 Protocol Buffers 文件中定义,该文件将用于为客户端和服务器生成样板代码,以便它们可以相互通信,从而节省您实现该功能的时间和精力。
生成的代码不仅能处理服务器与客户端之间复杂的通信,还能处理数据序列化和反序列化。
学习内容
- 如何使用 Protocol Buffers 定义服务 API。
- 如何使用自动代码生成功能,基于 Protocol Buffers 定义构建基于 gRPC 的客户端和服务器。
- 了解使用 gRPC 进行的客户端-服务器流式传输通信。
此 Codelab 适合刚开始使用 gRPC 或希望复习 gRPC 的 Go 开发者,也适合有意构建分布式系统的任何人。无需具备 gRPC 经验。
2. 准备工作
前提条件
请确保您已安装以下各项:
- Go 工具链版本 1.24.5 或更高版本。如需查看安装说明,请参阅 Go 的使用入门。
- Protocol Buffers 编译器
protoc
,版本为 3.27.1 或更高版本。如需了解安装说明,请参阅编译器的安装指南。 - 适用于 Go 和 gRPC 的协议缓冲区编译器插件。如需安装这些插件,请运行以下命令:
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
更新 PATH
变量,以便 Protocol Buffer 编译器可以找到插件:
export PATH="$PATH:$(go env GOPATH)/bin"
获取代码
为了避免您完全从头开始,本 Codelab 提供了一个应用源代码框架供您完成。以下步骤将展示如何完成应用,包括使用 Protocol Buffer 编译器插件生成样板 gRPC 代码。
首先,创建 Codelab 工作目录并 cd
到该目录:
mkdir streaming-grpc-go-getting-started && cd streaming-grpc-go-getting-started
下载并解压缩 Codelab:
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \ | tar xvz --strip-components=4 \ grpc-codelabs-1/codelabs/grpc-go-streaming/start_here
或者,您也可以下载仅包含 Codelab 目录的 .zip 文件,然后手动将其解压缩。
如果您想跳过输入实现代码的步骤,可以在 GitHub 上找到完整的源代码。
3. 定义消息和服务
第一步是使用协议缓冲区定义应用的 gRPC 服务、RPC 方法以及请求和响应消息类型。您的服务将提供:
- 服务器实现并由客户端调用的 RPC 方法
ListFeatures
、RecordRoute
和RouteChat
。 - 消息类型
Point
、Feature
、Rectangle
、RouteNote
和RouteSummary
,它们是调用上述方法时在客户端和服务器之间交换的数据结构。
这些 RPC 方法及其消息类型都将在所提供源代码的 routeguide/route_guide.proto
文件中定义。
协议缓冲区通常称为 protobuf。如需详细了解 gRPC 术语,请参阅 gRPC 的核心概念、架构和生命周期。
定义消息类型
在源代码的 routeguide/route_guide.proto
文件中,首先定义 Point
消息类型。Point
表示地图上的纬度-经度坐标对。在此 Codelab 中,请使用整数作为坐标:
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
数字 1
和 2
是 message
结构中每个字段的唯一 ID 编号。
接下来,定义 Feature
消息类型。Feature
使用 string
字段来表示由 Point
指定的位置处的某个事物的名称或邮政地址:
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
接下来是 Rectangle
消息,表示一个经纬度矩形,以两个对角点“lo”和“hi”表示。
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
也是一个 RouteNote
消息,表示在给定时间点发送的消息。
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
我们还需要一条 RouteSummary
消息。此消息是针对 RecordRoute
RPC 收到的响应,下一部分将对此进行说明。它包含收到的各个点的数量、检测到的特征数量,以及作为每个点之间距离的累积总和的总覆盖距离。
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
定义服务方法
如需定义服务,您需要在 .proto 文件中指定一个命名服务。route_guide.proto
文件具有名为 RouteGuide
的 service
结构,用于定义应用服务提供的一个或多个方法。
在服务定义中定义 RPC
方法,并指定其请求和响应类型。在此 Codelab 部分中,我们来定义以下内容:
ListFeatures
获取给定 Rectangle
内可用的 Feature
。由于矩形可能覆盖大面积区域并包含大量要素,因此结果会以流式传输,而不是一次性返回(例如,在包含重复字段的响应消息中)。
此 RPC 的合适类型是服务器端流式传输 RPC:客户端向服务器发送请求,并获取一个数据流来读取返回的一系列消息。客户端会从返回的数据流中读取数据,直到没有更多消息为止。如示例所示,您可以通过在响应类型之前放置 stream 关键字来指定服务器端流式传输方法。
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
接受正在遍历的路线上的 Point
流,并在遍历完成后返回 RouteSummary
。
在这种情况下,客户端流式传输 RPC 似乎很合适:客户端写入一系列消息并使用提供的数据流将其发送到服务器。客户端完成消息写入后,会等待服务器读取所有消息并返回其响应。您可以通过在请求类型之前放置 stream 关键字来指定客户端流式传输方法。
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
接受在遍历路线时发送的 RouteNote
流,同时接收其他 RouteNote
(例如来自其他用户的 RouteNote
)。
这正是双向流式传输的应用场景。双向流式传输 RPC 的双方都使用读写数据流发送一系列消息。这两个数据流独立运行,因此客户端和服务器可以按任意顺序进行读取和写入。
例如,服务器可以等待接收到所有客户端消息后再写入响应,也可以交替读取消息和写入消息,或者以其他读取和写入组合方式进行操作。
每个数据流中的消息顺序都会保留。您可以通过在请求和响应之前放置 stream 关键字来指定此类方法。
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. 生成客户端和服务器代码
接下来,使用协议缓冲区编译器从 .proto
文件中为客户端和服务器生成样板 gRPC 代码。在 routeguide
目录中,运行以下命令:
protoc --go_out=. --go_opt=paths=source_relative \ --go-grpc_out=. --go-grpc_opt=paths=source_relative \ route_guide.proto
此命令会生成以下文件:
route_guide.pb.go
,其中包含用于创建应用的消息类型、访问其数据以及表示消息的类型的定义的函数。route_guide_grpc.pb.go
,其中包含客户端用于调用服务的远程 gRPC 方法的函数,以及服务器用于提供该远程服务的函数。
接下来,我们将在服务器端实现这些方法,以便当客户端发送请求时,服务器可以回复答案。
5. 实现服务
首先,我们来看看如何创建 RouteGuide
服务器。要让 RouteGuide
服务正常运行,需要完成以下两个部分:
- 实现根据服务定义生成的服务接口:执行服务的实际“工作”。
- 运行 gRPC 服务器以侦听来自客户端的请求,并将这些请求分派给正确的服务实现。
我们来用 server/server.go
实现 RouteGuide。
实现 RouteGuide
我们需要实现生成的 RouteGuideService
接口。实现方式如下所示。
type routeGuideServer struct {
...
}
...
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
...
}
...
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
...
}
...
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
...
}
下面我们来详细了解每种 RPC 实现。
服务器端流式处理 RPC
从我们的某个流式 RPC 开始。ListFeatures
是服务器端流式传输 RPC,因此我们需要向客户端发送多个 Feature
。
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
for _, feature := range s.savedFeatures {
if inRange(feature.Location, rect) {
if err := stream.Send(feature); err != nil {
return err
}
}
}
return nil
}
如您所见,这次我们在方法参数中获得的不是简单的请求和响应对象,而是一个请求对象(我们的客户端想要在其中找到 Features
的 Rectangle
)和一个用于写入响应的特殊 RouteGuide_ListFeaturesServer
对象。在该方法中,我们填充需要返回的尽可能多的 Feature
对象,并使用 RouteGuide_ListFeaturesServer
的 Send()
方法将其写入 RouteGuide_ListFeaturesServer
。最后,与简单 RPC 中一样,我们返回 nil
错误,以告知 gRPC 我们已完成写入响应。如果此调用中发生任何错误,我们会返回一个非 nil 错误;gRPC 层会将其转换为适当的 RPC 状态,以便通过网络发送。
客户端流式 RPC
现在,我们来看一个稍微复杂一点的示例:客户端流式传输方法 RecordRoute
,其中我们从客户端获取 Points
的流,并返回包含有关其行程信息的单个 RouteSummary
。如您所见,这次的方法根本没有请求参数。相反,它会获得一个 RouteGuide_RecordRouteServer
流,服务器可以使用该流来读取和写入消息 - 它可以使用其 Recv()
方法接收客户端消息,并使用其 SendAndClose()
方法返回单个响应。
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
var pointCount, featureCount, distance int32
var lastPoint *pb.Point
startTime := time.Now()
for {
point, err := stream.Recv()
if err == io.EOF {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount: pointCount,
FeatureCount: featureCount,
Distance: distance,
ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
})
}
if err != nil {
return err
}
pointCount++
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
featureCount++
}
}
if lastPoint != nil {
distance += calcDistance(lastPoint, point)
}
lastPoint = point
}
}
在方法正文中,我们使用 RouteGuide_RecordRouteServer
的 Recv()
方法将客户端的请求反复读入请求对象(在本例中为 Point
),直到没有更多消息为止:服务器需要在每次调用后检查 Recv()
返回的错误。如果此值为 nil
,则表示数据流仍正常,可以继续读取;如果此值为 io.EOF
,则表示消息数据流已结束,服务器可以返回其 RouteSummary
。如果该值是任何其他值,我们会“按原样”返回错误,以便 gRPC 层将其转换为 RPC 状态。
双向流式传输 RPC
最后,我们来看看双向流式 RPC RouteChat()
。
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
key := serialize(in.Location)
s.mu.Lock()
s.routeNotes[key] = append(s.routeNotes[key], in)
// Note: this copy prevents blocking other clients while serving this one.
// We don't need to do a deep copy, because elements in the slice are
// insert-only and never modified.
rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
copy(rn, s.routeNotes[key])
s.mu.Unlock()
for _, note := range rn {
if err := stream.Send(note); err != nil {
return err
}
}
}
}
这次,我们获得了一个 RouteGuide_RouteChatServer
流,与客户端流式传输示例中一样,该流可用于读取和写入消息。不过,这次我们在客户端仍在向其消息流写入消息时,通过方法的数据流返回值。此处的读取和写入语法与客户端流式传输方法非常相似,不同之处在于服务器使用流的 send()
方法而不是 SendAndClose()
方法,因为服务器要写入多个响应。虽然每一方始终会按写入顺序接收对方的消息,但客户端和服务器都可以按任意顺序读取和写入,因为数据流是完全独立运行的。
启动服务器
实现所有方法后,我们还需要启动 gRPC 服务器,以便客户端能够实际使用我们的服务。以下代码段展示了我们如何为 RouteGuide
服务执行此操作:
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
s := &routeGuideServer{routeNotes: make(map[string][]*pb.RouteNote)}
s.loadFeatures()
pb.RegisterRouteGuideServer(grpcServer, s)
grpcServer.Serve(lis)
以下是 main()
中发生的情况,分步说明如下:
- 使用
lis, err := net.Listen(...)
指定用于侦听远程客户端请求的 TCP 端口。默认情况下,应用使用变量port
指定的 TCP 端口50051
,或者在运行服务器时通过命令行传递--port
开关。如果无法打开 TCP 端口,应用会因严重错误而结束。 - 使用
grpc.NewServer(...)
创建 gRPC 服务器的实例,并将此实例命名为grpcServer
。 - 创建指向
routeGuideServer
的指针,routeGuideServer
是表示应用 API 服务的结构,并将该指针命名为s.
- 使用
s.loadFeatures()
填充数组s.savedFeatures
。 - 向 gRPC 服务器注册服务实现。
- 在服务器上调用
Serve()
并提供端口详细信息,以阻塞方式等待客户端请求;此操作会一直持续,直到进程被终止或调用Stop()
。
函数 loadFeatures()
从 server/testdata.go
获取其坐标到位置的映射。
6. 创建客户端
现在,修改 client/client.go
,您将在其中实现客户端代码。
为了调用远程服务的方法,我们首先需要创建一个 gRPC 渠道来与服务器通信。我们通过将服务器的目标 URI 字符串(在本例中,该字符串只是地址和端口号)传递给客户端 main()
函数中的 grpc.NewClient()
来创建此对象,如下所示:
// Set up a connection to the gRPC server.
conn, err := grpc.NewClient("dns:///"+*serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("fail to dial: %v", err)
}
defer conn.Close()
由变量 serverAddr
定义的服务器地址默认为 localhost:50051
,可以在运行客户端时通过命令行中的 --addr
开关替换该地址。
如果客户端需要连接到需要身份验证凭据(例如 TLS 或 JWT 凭据)的服务,则客户端可以将 DialOptions
对象作为参数传递给 grpc.NewClient
,该对象包含所需的凭据。RouteGuide
服务不需要任何凭据。
设置好 gRPC 通道后,我们需要一个客户端存根,以便通过 Go 函数调用执行 RPC。我们使用 route_guide_grpc.pb.go
文件(从应用的 .proto
文件生成)提供的 NewRouteGuideClient
方法获取该 stub。
import (pb "github.com/grpc-ecosystem/codelabs/getting_started_streaming/routeguide")
client := pb.NewRouteGuideClient(conn)
调用服务方法
现在,我们来看看如何调用服务方法。在 gRPC-Go 中,RPC 以阻塞/同步模式运行,这意味着 RPC 调用会等待服务器响应,并返回响应或错误。
服务器端流式处理 RPC
我们在此处调用服务器端流式传输方法 ListFeatures
,该方法会返回地理位置 Feature
对象的流。
rect := &pb.Rectangle{ ... } // initialize a pb.Rectangle
log.Printf("Looking for features within %v", rect)
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
log.Fatalf("client.ListFeatures failed: %v", err)
}
for {
// For server-to-client streaming RPCs, you call stream.Recv() until it
// returns io.EOF.
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("client.ListFeatures failed: %v", err)
}
log.Printf("Feature: name: %q, point:(%v, %v)", feature.GetName(),
feature.GetLocation().GetLatitude(), feature.GetLocation().GetLongitude())
}
与简单的 RPC 一样,我们向方法传递了上下文和请求。不过,我们不会获得响应对象,而是会获得 RouteGuide_ListFeaturesClient
的实例。客户端可以使用 RouteGuide_ListFeaturesClient
流来读取服务器的响应。我们使用 RouteGuide_ListFeaturesClient
的 Recv()
方法反复读取服务器对响应协议缓冲区对象(在本例中为 Feature
)的响应,直到没有更多消息为止:客户端需要在每次调用后检查从 Recv()
返回的错误 err。如果为 nil
,则表示数据流仍正常,可以继续读取;如果为 io.EOF
,则表示消息数据流已结束;否则,必须存在 RPC 错误,该错误会通过 err
传递。
客户端流式 RPC
客户端流式传输方法 RecordRoute
与服务器端方法类似,只不过我们只向该方法传递一个上下文,并获得一个 RouteGuide_RecordRouteClient
数据流,该数据流可用于写入和读取消息。
// Create a random number of random points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
c2sStream, err := client.RecordRoute(context.TODO())
if err != nil {
log.Fatalf("client.RecordRoute failed: %v", err)
}
// Stream each point to the server.
for _, point := range points {
if err := c2sStream.Send(point); err != nil {
log.Fatalf("client.RecordRoute: stream.Send(%v) failed: %v", point, err)
}
}
// Close the stream and receive the RouteSummary from the server.
reply, err := c2sStream.CloseAndRecv()
if err != nil {
log.Fatalf("client.RecordRoute failed: %v", err)
}
log.Printf("Route summary: %v", reply)
RouteGuide_RecordRouteClient
有一个 Send()
方法,可用于向服务器发送请求。使用 Send()
将客户端的请求写入流后,我们需要对该流调用 CloseAndRecv()
,以告知 gRPC 我们已完成写入,并期待收到响应。我们从 CloseAndRecv()
返回的 err 中获取 RPC 状态。如果状态为 nil
,则 CloseAndRecv()
的第一个返回值将是有效的服务器响应。
双向流式传输 RPC
最后,我们来看看双向流式 RPC RouteChat()
。与 RecordRoute
的情况一样,我们只需向该方法传递一个上下文对象,即可获得一个可用于写入和读取消息的流。不过,这次我们在服务器仍在向其消息流写入消息时,通过方法流返回了值。
biDiStream, err := client.RouteChat(context.Background())
if err != nil {
log.Fatalf("client.RouteChat failed: %v", err)
}
// this channel is used to wait for the receive goroutine to finish.
recvDoneCh := make(chan struct{})
// receive goroutine.
go func() {
for {
in, err := biDiStream.Recv()
if err == io.EOF {
// read done.
close(recvDoneCh)
return
}
if err != nil {
log.Fatalf("client.RouteChat failed: %v", err)
}
log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
// send messages simultaneously.
for _, note := range notes {
if err := biDiStream.Send(note); err != nil {
log.Fatalf("client.RouteChat: stream.Send(%v) failed: %v", note, err)
}
}
biDiStream.CloseSend()
// wait for the receive goroutine to finish.
<-recvDoneCh
此处的读取和写入语法与客户端流式传输方法非常相似,只不过我们在完成调用后使用流的 CloseSend()
方法。虽然每一方始终会按写入顺序接收对方的消息,但客户端和服务器都可以按任意顺序读取和写入,因为数据流是完全独立运行的。
7. 试试看
在应用的工作目录中执行以下命令,确认服务器和客户端是否正常协作:
- 在一个终端中运行服务器:
cd server go run .
- 从另一个终端运行客户端:
cd client go run .
您将看到如下输出(为清晰起见,省略了时间戳):
Looking for features within lo:<latitude:400000000 longitude:-750000000 > hi:<latitude:420000000 longitude:-730000000 > name:"Patriots Path, Mendham, NJ 07945, USA" location:<latitude:407838351 longitude:-746143763 > ... name:"3 Hasta Way, Newton, NJ 07860, USA" location:<latitude:410248224 longitude:-747127767 > Traversing 56 points. Route summary: point_count:56 distance:497013163 Got message First message at point(0, 1) Got message Second message at point(0, 2) Got message Third message at point(0, 3) Got message First message at point(0, 1) Got message Fourth message at point(0, 1) Got message Second message at point(0, 2) Got message Fifth message at point(0, 2) Got message Third message at point(0, 3) Got message Sixth message at point(0, 3)