1. Введение
В этой лабораторной работе вы будете использовать gRPC-Go для создания клиента и сервера, которые составят основу приложения для сопоставления маршрутов, написанного на Go.
К концу руководства у вас будет клиент, который подключается к удаленному серверу с помощью gRPC для получения информации об объектах на маршруте клиента, создания сводки маршрута клиента и обмена информацией о маршруте, такой как обновления трафика, с сервером и другими клиентами.
Служба определена в файле Protocol Buffers, который будет использоваться для генерации шаблонного кода для клиента и сервера, чтобы они могли взаимодействовать друг с другом, экономя ваше время и усилия при реализации этой функциональности.
Сгенерированный код учитывает не только сложности взаимодействия между сервером и клиентом, но также сериализацию и десериализацию данных.
Чему вы научитесь
- Как использовать Protocol Buffers для определения API сервиса.
- Как создать клиент и сервер на основе gRPC из определения Protocol Buffers с использованием автоматической генерации кода.
- Понимание потокового взаимодействия клиент-сервер с помощью gRPC.
Эта практическая работа предназначена для разработчиков на Go, впервые использующих gRPC или желающих освежить свои знания, а также для всех, кто интересуется разработкой распределённых систем. Опыт работы с gRPC не требуется.
2. Прежде чем начать
Предпосылки
Убедитесь, что у вас установлено следующее:
- Набор инструментов Go версии 1.24.5 или более поздней. Инструкции по установке см. в руководстве по началу работы с Go.
- Компилятор буфера протокола
protoc
версии 3.27.1 или более поздней. Инструкции по установке см. в руководстве по установке компилятора. - Плагины компилятора буфера протокола для Go и gRPC. Чтобы установить эти плагины, выполните следующие команды:
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
Обновите переменную PATH
, чтобы компилятор буфера протокола мог найти плагины:
export PATH="$PATH:$(go env GOPATH)/bin"
Получить код
Чтобы вам не пришлось начинать всё с нуля, эта лабораторная работа предоставляет вам заготовку исходного кода приложения. Следующие шаги покажут вам, как завершить приложение, включая использование плагинов компилятора буфера протокола для генерации шаблонного кода gRPC.
Сначала создайте рабочий каталог codelab и cd
в него:
mkdir streaming-grpc-go-getting-started && cd streaming-grpc-go-getting-started
Загрузите и распакуйте кодовую лабораторию:
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \ | tar xvz --strip-components=4 \ grpc-codelabs-1/codelabs/grpc-go-streaming/start_here
Кроме того, вы можете загрузить .zip-файл, содержащий только каталог codelab, и вручную распаковать его.
Если вы не хотите вводить реализацию вручную, готовый исходный код доступен на GitHub .
3. Определите сообщения и услуги
Первым шагом будет определение службы gRPC приложения, её методов RPC и типов сообщений запросов и ответов с помощью Protocol Buffers . Ваша служба будет предоставлять:
- Методы RPC, называемые
ListFeatures
,RecordRoute
иRouteChat
, которые реализует сервер и вызывает клиент. - Типы сообщений
Point
,Feature
,Rectangle
,RouteNote
иRouteSummary
представляют собой структуры данных, которыми обмениваются клиент и сервер при вызове методов, указанных выше.
Все эти методы RPC и типы сообщений будут определены в файле routeguide/route_guide.proto
предоставленного исходного кода.
Буферы протоколов обычно называются protobuf. Подробнее о терминологии gRPC см. в разделе «Основные концепции, архитектура и жизненный цикл gRPC».
Определить типы сообщений
В файле routeguide/route_guide.proto
исходного кода сначала определите тип сообщения Point
. Point
представляет собой пару координат (широта-долгота) на карте. В этой практической работе используйте целые числа в качестве координат:
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
Цифры 1
и 2
— это уникальные идентификационные номера для каждого из полей в структуре message
.
Затем определите тип сообщения Feature
. Feature
использует string
поле для имени или почтового адреса объекта, расположенного в месте, указанном Point
:
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
Далее следует сообщение Rectangle
, которое представляет собой прямоугольник широты и долготы, представленный двумя диагонально противоположными точками «lo» и «hi».
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
Также сообщение RouteNote
, которое представляет собой сообщение, отправленное в заданной точке.
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
Нам также потребуется сообщение RouteSummary
. Это сообщение приходит в ответ на RPC-запрос RecordRoute
, который описан в следующем разделе. Оно содержит количество полученных отдельных точек, количество обнаруженных объектов и общее пройденное расстояние, представляющее собой сумму расстояний между точками.
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
Определить методы обслуживания
Чтобы определить службу, укажите её название в файле .proto. Файл route_guide.proto
содержит структуру service
RouteGuide
, которая определяет один или несколько методов, предоставляемых службой приложения.
Определите методы RPC
в определении вашего сервиса, указав типы их запросов и ответов. В этом разделе практикума давайте определим:
СписокОсобенности
Получает Feature
, доступные в заданном Rectangle
. Результаты передаются потоком, а не возвращаются сразу (например, в ответном сообщении с повторяющимся полем), поскольку прямоугольник может охватывать большую область и содержать огромное количество объектов.
Подходящим типом для этого RPC является потоковый RPC на стороне сервера : клиент отправляет запрос серверу и получает поток для чтения последовательности сообщений. Клиент читает данные из возвращаемого потока до тех пор, пока сообщения не закончатся. Как видно из нашего примера, метод потоковой передачи на стороне сервера указывается ключевым словом stream перед типом ответа.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
Принимает поток Point
на проходимом маршруте и возвращает RouteSummary
после завершения обхода.
В данном случае RPC -вызов на стороне клиента кажется уместным: клиент записывает последовательность сообщений и отправляет их на сервер, снова используя предоставленный поток. После завершения записи сообщений клиент ожидает, пока сервер прочитает их все и вернет ответ. Метод потоковой передачи на стороне клиента указывается ключевым словом stream перед типом запроса.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
Принимает поток RouteNote
, отправленных во время прохождения маршрута, одновременно получая другие RouteNote
(например, от других пользователей).
Именно такой вариант использования двунаправленной потоковой передачи данных . Двунаправленный потоковый RPC предполагает, что обе стороны отправляют последовательность сообщений, используя поток чтения-записи. Два потока работают независимо, поэтому клиенты и серверы могут читать и записывать данные в любом порядке.
Например, сервер может дождаться получения всех клиентских сообщений, прежде чем писать свои ответы, или он может поочередно читать сообщение, а затем писать сообщение или использовать какую-либо другую комбинацию чтений и записей.
Порядок сообщений в каждом потоке сохраняется. Этот метод задаётся путём добавления ключевого слова stream перед запросом и ответом.
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. Генерация клиентского и серверного кода
Затем сгенерируйте шаблонный код gRPC для клиента и сервера из файла .proto
с помощью компилятора Protocol Buffer. В каталоге routeguide
выполните:
protoc --go_out=. --go_opt=paths=source_relative \ --go-grpc_out=. --go-grpc_opt=paths=source_relative \ route_guide.proto
Эта команда генерирует следующие файлы:
-
route_guide.pb.go
, который содержит функции для создания типов сообщений приложения и доступа к их данным, а также определение типов, представляющих сообщения. -
route_guide_grpc.pb.go
, который содержит функции, используемые клиентом для вызова удаленного метода gRPC службы, и функции, используемые сервером для предоставления этой удаленной службы.
Далее мы реализуем методы на стороне сервера, чтобы при отправке клиентом запроса сервер мог ответить.
5. Реализуйте услугу
Сначала давайте рассмотрим, как создать сервер RouteGuide
. Чтобы наш сервис RouteGuide
заработал, нужно выполнить два шага:
- Реализация интерфейса сервиса, созданного на основе определения нашего сервиса: выполнение фактической «работы» нашего сервиса.
- Запуск сервера gRPC для прослушивания запросов от клиентов и отправки их в нужную реализацию службы.
Давайте реализуем RouteGuide в server/server.go
.
Реализовать RouteGuide
Нам необходимо реализовать сгенерированный интерфейс RouteGuideService
. Вот как будет выглядеть реализация.
type routeGuideServer struct {
...
}
...
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
...
}
...
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
...
}
...
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
...
}
Давайте подробно рассмотрим каждую реализацию RPC.
Потоковая передача RPC на стороне сервера
Начнём с одного из наших потоковых RPC. ListFeatures
— это серверный потоковый RPC, поэтому нам нужно отправить несколько Feature
нашему клиенту.
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
for _, feature := range s.savedFeatures {
if inRange(feature.Location, rect) {
if err := stream.Send(feature); err != nil {
return err
}
}
}
return nil
}
Как видите, вместо получения простых объектов запроса и ответа в параметрах нашего метода, на этот раз мы получаем объект запроса ( Rectangle
, в котором наш клиент хочет найти Features
) и специальный объект RouteGuide_ListFeaturesServer
для записи наших ответов. В этом методе мы заполняем столько объектов Feature
, сколько нам нужно вернуть, записывая их в RouteGuide_ListFeaturesServer
с помощью его метода Send()
. Наконец, как и в нашем простом RPC, мы возвращаем nil
ошибку, сообщая gRPC о завершении записи ответов. Если в этом вызове возникнет какая-либо ошибка, мы возвращаем ненулевую ошибку; уровень gRPC преобразует её в соответствующий статус RPC для отправки по сети.
Клиентская потоковая передача RPC
Теперь рассмотрим что-то более сложное: клиентский потоковый метод RecordRoute
, который получает поток Points
от клиента и возвращает один RouteSummary
с информацией о его поездке. Как видите, на этот раз у метода вообще нет параметра запроса. Вместо этого он получает поток RouteGuide_RecordRouteServer
, который сервер может использовать как для чтения, так и для записи сообщений — он может получать сообщения клиента с помощью метода Recv()
и возвращать единственный ответ с помощью метода SendAndClose()
.
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
var pointCount, featureCount, distance int32
var lastPoint *pb.Point
startTime := time.Now()
for {
point, err := stream.Recv()
if err == io.EOF {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount: pointCount,
FeatureCount: featureCount,
Distance: distance,
ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
})
}
if err != nil {
return err
}
pointCount++
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
featureCount++
}
}
if lastPoint != nil {
distance += calcDistance(lastPoint, point)
}
lastPoint = point
}
}
В теле метода мы используем метод Recv()
объекта RouteGuide_RecordRouteServer
для многократного чтения клиентских запросов к объекту запроса (в данном случае Point
) до тех пор, пока сообщения не закончатся: серверу необходимо проверять ошибку, возвращаемую Recv()
после каждого вызова. Если это nil
, поток всё ещё работает, и сервер может продолжить чтение; если это io.EOF
, поток сообщений завершён, и сервер может вернуть RouteSummary
. Если у него есть какое-либо другое значение, мы возвращаем ошибку «как есть», чтобы уровень gRPC преобразовал её в статус RPC.
Двунаправленный потоковый RPC
Наконец, давайте рассмотрим наш двунаправленный потоковый RPC RouteChat()
.
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
key := serialize(in.Location)
s.mu.Lock()
s.routeNotes[key] = append(s.routeNotes[key], in)
// Note: this copy prevents blocking other clients while serving this one.
// We don't need to do a deep copy, because elements in the slice are
// insert-only and never modified.
rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
copy(rn, s.routeNotes[key])
s.mu.Unlock()
for _, note := range rn {
if err := stream.Send(note); err != nil {
return err
}
}
}
}
На этот раз мы получаем поток RouteGuide_RouteChatServer
, который, как и в нашем примере потоковой передачи на стороне клиента, можно использовать для чтения и записи сообщений. Однако на этот раз мы возвращаем значения через поток нашего метода, пока клиент продолжает записывать сообщения в свой поток. Синтаксис чтения и записи здесь очень похож на наш метод потоковой передачи на стороне клиента, за исключением того, что сервер использует метод send()
потока вместо SendAndClose()
поскольку он записывает несколько ответов. Хотя каждая сторона всегда получает сообщения другой стороны в порядке их записи, и клиент, и сервер могут читать и записывать сообщения в любом порядке — потоки работают совершенно независимо.
Запустить сервер
После реализации всех наших методов нам также необходимо запустить gRPC-сервер, чтобы клиенты могли использовать наш сервис. Следующий фрагмент кода показывает, как это сделать для нашего сервиса RouteGuide
:
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
s := &routeGuideServer{routeNotes: make(map[string][]*pb.RouteNote)}
s.loadFeatures()
pb.RegisterRouteGuideServer(grpcServer, s)
grpcServer.Serve(lis)
Вот что происходит в main()
, шаг за шагом:
- Укажите TCP-порт для прослушивания запросов удалённых клиентов с помощью
lis, err := net.Listen(...)
. По умолчанию приложение использует TCP-порт50051
, указанный переменнойport
или параметром--port
в командной строке при запуске сервера. Если TCP-порт не удаётся открыть, приложение завершает работу с фатальной ошибкой. - Создайте экземпляр сервера gRPC с помощью
grpc.NewServer(...)
, назвав этот экземплярgrpcServer
. - Создайте указатель на
routeGuideServer
, структуру, представляющую службу API приложения, назвав указательs.
- Используйте
s.loadFeatures()
для заполнения массиваs.savedFeatures
. - Зарегистрируйте реализацию нашей службы на сервере gRPC.
- Вызовите
Serve()
на сервере с данными нашего порта, чтобы выполнить блокирующее ожидание клиентских запросов; это продолжается до тех пор, пока процесс не будет завершен или не будет вызванStop()
.
Функция loadFeatures()
получает сопоставления координат и местоположения из server/testdata.go
.
6. Создайте клиента
Теперь отредактируйте client/client.go
, в котором вы будете реализовывать клиентский код.
Для вызова методов удалённой службы сначала необходимо создать канал gRPC для взаимодействия с сервером. Мы создаём его, передавая строку целевого URI сервера (в данном случае это просто адрес и номер порта) в grpc.NewClient()
в функции main()
клиента следующим образом:
// Set up a connection to the gRPC server.
conn, err := grpc.NewClient("dns:///"+*serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("fail to dial: %v", err)
}
defer conn.Close()
Адрес сервера, определяемый переменной serverAddr
, по умолчанию равен localhost:50051
и может быть переопределен с помощью переключателя --addr
в командной строке при запуске клиента.
Если клиенту необходимо подключиться к службе, требующей учётных данных аутентификации, например, TLS или JWT, он может передать объект DialOptions
в качестве параметра методу grpc.NewClient
, содержащему необходимые учётные данные. Служба RouteGuide
не требует никаких учётных данных.
После настройки канала gRPC нам понадобится клиентская заглушка для выполнения RPC-вызовов через вызовы функций Go. Мы получаем эту заглушку с помощью метода NewRouteGuideClient
, предоставленного файлом route_guide_grpc.pb.go
, сгенерированным из файла .proto
приложения.
import (pb "github.com/grpc-ecosystem/codelabs/getting_started_streaming/routeguide")
client := pb.NewRouteGuideClient(conn)
Методы обслуживания вызовов
Теперь давайте посмотрим, как мы вызываем наши сервисные методы. В gRPC-Go RPC работают в блокирующем/синхронном режиме, то есть RPC-вызов ожидает ответа сервера и либо возвращает ответ, либо ошибку.
Потоковая передача RPC на стороне сервера
Здесь мы вызываем серверный потоковый метод ListFeatures
, который возвращает поток географических объектов Feature
.
rect := &pb.Rectangle{ ... } // initialize a pb.Rectangle
log.Printf("Looking for features within %v", rect)
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
log.Fatalf("client.ListFeatures failed: %v", err)
}
for {
// For server-to-client streaming RPCs, you call stream.Recv() until it
// returns io.EOF.
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("client.ListFeatures failed: %v", err)
}
log.Printf("Feature: name: %q, point:(%v, %v)", feature.GetName(),
feature.GetLocation().GetLatitude(), feature.GetLocation().GetLongitude())
}
Как и в простом RPC, мы передаем методу контекст и запрос. Однако вместо получения объекта ответа мы получаем экземпляр RouteGuide_ListFeaturesClient
. Клиент может использовать поток RouteGuide_ListFeaturesClient
для чтения ответов сервера. Мы используем метод Recv()
класса RouteGuide_ListFeaturesClient
для многократного чтения ответов сервера на объект буфера протокола ответа (в данном случае Feature
), пока сообщения не закончатся: клиенту необходимо проверять ошибку err , возвращаемую Recv()
после каждого вызова. Если nil
, поток все еще работает и он может продолжать чтение; если это io.EOF
, то поток сообщений завершился; в противном случае должна быть ошибка RPC, которая передается через err
.
Клиентская потоковая передача RPC
Метод потоковой передачи RecordRoute
на стороне клиента аналогичен методу на стороне сервера, за исключением того, что мы передаем методу только контекст и получаем обратно поток RouteGuide_RecordRouteClient
, который мы можем использовать как для записи , так и для чтения сообщений.
// Create a random number of random points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
c2sStream, err := client.RecordRoute(context.TODO())
if err != nil {
log.Fatalf("client.RecordRoute failed: %v", err)
}
// Stream each point to the server.
for _, point := range points {
if err := c2sStream.Send(point); err != nil {
log.Fatalf("client.RecordRoute: stream.Send(%v) failed: %v", point, err)
}
}
// Close the stream and receive the RouteSummary from the server.
reply, err := c2sStream.CloseAndRecv()
if err != nil {
log.Fatalf("client.RecordRoute failed: %v", err)
}
log.Printf("Route summary: %v", reply)
У RouteGuide_RecordRouteClient
есть метод Send()
, который можно использовать для отправки запросов на сервер. После того, как мы завершим запись клиентских запросов в поток с помощью Send()
, нам нужно вызвать CloseAndRecv()
для потока, чтобы сообщить gRPC, что мы завершили запись и ожидаем ответа. Статус RPC определяется по значению err, возвращаемому CloseAndRecv()
. Если статус равен nil
, то первое возвращаемое CloseAndRecv()
значение будет допустимым ответом сервера.
Двунаправленный потоковый RPC
Наконец, давайте рассмотрим наш двунаправленный потоковый RPC RouteChat()
. Как и в случае с RecordRoute
, мы передаём методу только объект контекста и получаем обратно поток, который можно использовать как для записи, так и для чтения сообщений. Однако на этот раз мы возвращаем значения через поток нашего метода, пока сервер всё ещё записывает сообщения в свой поток сообщений.
biDiStream, err := client.RouteChat(context.Background())
if err != nil {
log.Fatalf("client.RouteChat failed: %v", err)
}
// this channel is used to wait for the receive goroutine to finish.
recvDoneCh := make(chan struct{})
// receive goroutine.
go func() {
for {
in, err := biDiStream.Recv()
if err == io.EOF {
// read done.
close(recvDoneCh)
return
}
if err != nil {
log.Fatalf("client.RouteChat failed: %v", err)
}
log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
// send messages simultaneously.
for _, note := range notes {
if err := biDiStream.Send(note); err != nil {
log.Fatalf("client.RouteChat: stream.Send(%v) failed: %v", note, err)
}
}
biDiStream.CloseSend()
// wait for the receive goroutine to finish.
<-recvDoneCh
Синтаксис чтения и записи здесь очень похож на наш метод потоковой передачи на стороне клиента, за исключением того, что мы используем метод CloseSend()
потока после завершения вызова. Хотя каждая сторона всегда получает сообщения другой стороны в том порядке, в котором они были записаны, и клиент, и сервер могут читать и записывать данные в любом порядке — потоки работают совершенно независимо.
7. Попробуйте
Убедитесь, что сервер и клиент работают друг с другом корректно, выполнив следующие команды в рабочем каталоге приложения:
- Запустите сервер в одном терминале:
cd server go run .
- Запустите клиент с другого терминала:
cd client go run .
Вы увидите примерно такой вывод, при этом временные метки опущены для ясности:
Looking for features within lo:<latitude:400000000 longitude:-750000000 > hi:<latitude:420000000 longitude:-730000000 > name:"Patriots Path, Mendham, NJ 07945, USA" location:<latitude:407838351 longitude:-746143763 > ... name:"3 Hasta Way, Newton, NJ 07860, USA" location:<latitude:410248224 longitude:-747127767 > Traversing 56 points. Route summary: point_count:56 distance:497013163 Got message First message at point(0, 1) Got message Second message at point(0, 2) Got message Third message at point(0, 3) Got message First message at point(0, 1) Got message Fourth message at point(0, 1) Got message Second message at point(0, 2) Got message Fifth message at point(0, 2) Got message Third message at point(0, 3) Got message Sixth message at point(0, 3)
8. Что дальше?
- Узнайте, как работает gRPC, из раздела Введение в gRPC и Основные концепции .
- Проработайте учебник по основам .
- Изучите справочник API .