Начало работы с gRPC-Go — потоковая передача

1. Введение

В этом практическом занятии вы будете использовать gRPC-Go для создания клиента и сервера, которые станут основой приложения для построения маршрутов, написанного на Go.

К концу этого руководства у вас будет клиент, который подключается к удаленному серверу с помощью gRPC для получения информации о характеристиках маршрута клиента, создания сводки маршрута клиента и обмена информацией о маршруте, такой как обновления трафика, с сервером и другими клиентами.

Сервис определяется в файле Protocol Buffers, который будет использоваться для генерации шаблонного кода для клиента и сервера, чтобы они могли взаимодействовать друг с другом, экономя ваше время и усилия на реализации этой функциональности.

Сгенерированный код учитывает не только сложности взаимодействия между сервером и клиентом, но и сериализацию и десериализацию данных.

Что вы узнаете

  • Как использовать Protocol Buffers для определения API сервиса.
  • Как создать клиент и сервер на основе gRPC из определения Protocol Buffers с помощью автоматической генерации кода.
  • Понимание принципов потокового взаимодействия между клиентом и сервером с использованием gRPC.

Данный практический семинар предназначен для разработчиков на Go, которые только начинают работать с gRPC или хотят освежить свои знания gRPC, а также для всех, кто заинтересован в создании распределенных систем. Предварительный опыт работы с gRPC не требуется.

2. Прежде чем начать

Предварительные требования

Убедитесь, что у вас установлены следующие компоненты:

  • Требуется инструментарий Go версии 1.24.5 или более поздней. Инструкции по установке см. в разделе « Начало работы с Go».
  • Компилятор протокола Protocol Buffer, protoc , версии 3.27.1 или более поздней. Инструкции по установке см. в руководстве по установке компилятора.
  • Плагины компилятора протокола Protocol Buffer для Go и gRPC. Для установки этих плагинов выполните следующие команды:
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

Обновите переменную PATH , чтобы компилятор протокола Protocol Buffer мог найти плагины:

export PATH="$PATH:$(go env GOPATH)/bin"

Получите код

Чтобы вам не пришлось начинать с нуля, в этом практическом руководстве представлен шаблон исходного кода приложения, который вы сможете доработать. Следующие шаги покажут вам, как завершить приложение, включая использование плагинов компилятора Protocol Buffer для генерации шаблонного кода gRPC.

Сначала создайте рабочую директорию codelab и перейдите в неё с помощью cd :

mkdir streaming-grpc-go-getting-started && cd streaming-grpc-go-getting-started

Скачайте и распакуйте CodeLab:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-go-streaming/start_here

В качестве альтернативы вы можете скачать ZIP-архив, содержащий только папку codelab, и распаковать его вручную.

Полный исходный код доступен на GitHub, если вы хотите обойтись без ввода кода вручную.

3. Определите сообщения и сервисы.

Первым шагом является определение gRPC-сервиса приложения, его RPC-методов, а также типов сообщений запроса и ответа с помощью Protocol Buffers . Ваш сервис будет предоставлять:

  • RPC-методы ListFeatures , RecordRoute и RouteChat , которые реализует сервер, а вызывает клиент.
  • Типы сообщений Point , Feature , Rectangle , RouteNote и RouteSummary представляют собой структуры данных, которыми обмениваются клиент и сервер при вызове указанных выше методов.

Все эти RPC-методы и типы сообщений будут определены в файле routeguide/route_guide.proto предоставленного исходного кода.

Протоколы Protocol Buffers обычно называются protobufs. Для получения дополнительной информации о терминологии gRPC см. раздел «Основные концепции, архитектура и жизненный цикл gRPC».

Определение типов сообщений

В файле routeguide/route_guide.proto исходного кода сначала определите тип сообщения Point . Point представляет собой пару координат широты и долготы на карте. Для этого практического задания используйте целые числа для координат:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

Цифры 1 и 2 — это уникальные идентификационные номера для каждого поля в структуре message .

Далее определите тип сообщения Feature . В Feature используется string поле для имени или почтового адреса объекта, расположенного в точке Point :

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

Далее следует сообщение Rectangle , представляющее собой прямоугольник, отображающий широту и долготу, в виде двух точек, расположенных по диагонали друг от друга: «lo» и «hi».

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

Также сообщение RouteNote , представляющее собой сообщение, отправленное в заданной точке маршрута.

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

Нам также потребуется сообщение RouteSummary . Это сообщение принимается в ответ на RPC-запрос RecordRoute , который описан в следующем разделе. Оно содержит количество полученных отдельных точек, количество обнаруженных объектов и общее пройденное расстояние как сумма расстояний между каждой точкой.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

Определите методы обслуживания

Для определения сервиса необходимо указать именованный сервис в файле .proto. Файл route_guide.proto содержит структуру service с именем RouteGuide , которая определяет один или несколько методов, предоставляемых сервисом приложения.

Определите методы RPC внутри определения вашего сервиса, указав их типы запроса и ответа. В этом разделе практического задания давайте определим:

ListFeatures

Получает доступные Feature в пределах заданного Rectangle . Результаты передаются потоком, а не возвращаются сразу (например, в ответном сообщении с повторяющимся полем), поскольку прямоугольник может занимать большую площадь и содержать огромное количество объектов.

Подходящим типом для этого RPC является потоковый RPC на стороне сервера : клиент отправляет запрос на сервер и получает поток для чтения последовательности сообщений. Клиент читает из возвращенного потока до тех пор, пока не закончатся сообщения. Как видно в нашем примере, метод потоковой передачи на стороне сервера указывается путем размещения ключевого слова stream перед типом ответа.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

Принимает поток Point на проходимом маршруте и возвращает RouteSummary после завершения обхода.

В данном случае представляется подходящим клиентский потоковый RPC: клиент записывает последовательность сообщений и отправляет их на сервер, снова используя предоставленный поток. После завершения записи сообщений клиент ожидает, пока сервер прочтет их все и вернет свой ответ. Метод клиентской потоковой передачи указывается путем добавления ключевого слова stream перед типом запроса.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

Принимает поток сообщений RouteNote , отправляемых во время прохождения маршрута, одновременно получая другие сообщения RouteNote (например, от других пользователей).

Это как раз тот случай, когда двунаправленная потоковая передача данных необходима. В RPC-запросе с двунаправленной потоковой передачей обе стороны отправляют последовательность сообщений, используя поток чтения и записи. Два потока работают независимо, поэтому клиенты и серверы могут читать и записывать данные в любом порядке.

Например, сервер может дождаться получения всех сообщений от клиентов, прежде чем записывать свои ответы, или же он может попеременно читать сообщение, а затем записывать его, или использовать какую-либо другую комбинацию операций чтения и записи.

Порядок сообщений в каждом потоке сохраняется. Этот тип метода указывается путем размещения ключевого слова stream перед запросом и ответом.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. Сгенерируйте код клиента и сервера.

Далее сгенерируйте шаблонный gRPC-код для клиента и сервера из файла .proto , используя компилятор Protocol Buffer. В каталоге routeguide выполните следующую команду:

protoc --go_out=. --go_opt=paths=source_relative \
       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
       route_guide.proto

Эта команда создаёт следующие файлы:

  • файл route_guide.pb.go содержит функции для создания типов сообщений приложения, доступа к их данным и определения типов, представляющих сообщения.
  • route_guide_grpc.pb.go содержит функции, которые клиент использует для вызова удаленного gRPC-метода сервиса, а также функции, используемые сервером для предоставления этого удаленного сервиса.

Далее мы реализуем методы на стороне сервера, чтобы при отправке клиентом запроса сервер мог ответить.

5. Внедрить сервис.

Для начала давайте рассмотрим, как создать сервер RouteGuide . Для того, чтобы наш сервис RouteGuide выполнял свою работу, необходимо выполнить две задачи:

  • Реализация интерфейса сервиса, сгенерированного на основе определения нашего сервиса: выполнение фактической «работы» нашего сервиса.
  • Запуск gRPC-сервера для приема запросов от клиентов и их перенаправления к соответствующей реализации сервиса.

Давайте реализуем RouteGuide в server/server.go .

Внедрить RouteGuide

Нам необходимо реализовать сгенерированный интерфейс RouteGuideService . Вот как будет выглядеть реализация.

type routeGuideServer struct {
        ...
}
...
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
        ...
}
...

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
        ...
}
...

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
        ...
}

Давайте подробно рассмотрим каждую реализацию RPC.

Серверная потоковая передача RPC

Начнём с одного из наших потоковых RPC-запросов. ListFeatures — это потоковый RPC-запрос на стороне сервера, поэтому нам нужно отправить клиенту несколько Feature .

func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
  for _, feature := range s.savedFeatures {
    if inRange(feature.Location, rect) {
      if err := stream.Send(feature); err != nil {
        return err
      }
    }
  }
  return nil
}

Как видите, вместо простых объектов запроса и ответа в параметрах метода, на этот раз мы получаем объект запроса ( Rectangle в котором наш клиент хочет найти Features ) и специальный объект RouteGuide_ListFeaturesServer для записи ответов. В методе мы заполняем столько объектов Feature , сколько нам нужно вернуть, записывая их в RouteGuide_ListFeaturesServer с помощью его метода Send() . Наконец, как и в нашем простом RPC, мы возвращаем ошибку nil , чтобы сообщить gRPC, что мы закончили запись ответов. Если в этом вызове произойдет какая-либо ошибка, мы вернем ошибку, не являющуюся nil; слой gRPC преобразует ее в соответствующий статус RPC для отправки по сети.

RPC потоковой передачи на стороне клиента

Теперь давайте рассмотрим нечто немного более сложное: метод потоковой передачи на стороне клиента RecordRoute , где мы получаем поток Points от клиента и возвращаем один RouteSummary с информацией о поездке. Как видите, на этот раз у метода вообще нет параметра запроса. Вместо этого он получает поток RouteGuide_RecordRouteServer , который сервер может использовать как для чтения, так и для записи сообщений — он может получать сообщения от клиента с помощью метода Recv() и возвращать свой единственный ответ с помощью метода SendAndClose() .

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
  var pointCount, featureCount, distance int32
  var lastPoint *pb.Point
  startTime := time.Now()
  for {
    point, err := stream.Recv()
    if err == io.EOF {
      endTime := time.Now()
      return stream.SendAndClose(&pb.RouteSummary{
        PointCount:   pointCount,
        FeatureCount: featureCount,
        Distance:     distance,
        ElapsedTime:  int32(endTime.Sub(startTime).Seconds()),
      })
    }
    if err != nil {
      return err
    }
    pointCount++
    for _, feature := range s.savedFeatures {
      if proto.Equal(feature.Location, point) {
        featureCount++
      }
    }
    if lastPoint != nil {
      distance += calcDistance(lastPoint, point)
    }
    lastPoint = point
  }
}

В теле метода мы используем метод Recv() класса RouteGuide_RecordRouteServer для многократного чтения запросов клиента к объекту запроса (в данном случае Point ) до тех пор, пока не закончатся сообщения: сервер должен проверять ошибку, возвращаемую методом Recv() после каждого вызова. Если это nil , поток все еще в порядке, и он может продолжить чтение; если это io.EOF поток сообщений завершен, и сервер может вернуть свой RouteSummary . Если он имеет любое другое значение, мы возвращаем ошибку "как есть", чтобы она была преобразована в статус RPC слоем gRPC.

Двунаправленный потоковый RPC

Наконец, давайте рассмотрим наш двунаправленный потоковый RPC RouteChat() .

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
  for {
    in, err := stream.Recv()
    if err == io.EOF {
      return nil
    }
    if err != nil {
      return err
    }
    key := serialize(in.Location)

    s.mu.Lock()
    s.routeNotes[key] = append(s.routeNotes[key], in)
    // Note: this copy prevents blocking other clients while serving this one.
    // We don't need to do a deep copy, because elements in the slice are
    // insert-only and never modified.
    rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
    copy(rn, s.routeNotes[key])
    s.mu.Unlock()

    for _, note := range rn {
      if err := stream.Send(note); err != nil {
        return err
      }
    }
  }
}

В этот раз мы получаем поток RouteGuide_RouteChatServer , который, как и в нашем примере с потоковой передачей на стороне клиента, можно использовать для чтения и записи сообщений. Однако на этот раз мы возвращаем значения через поток нашего метода, пока клиент продолжает записывать сообщения в свой поток сообщений. Синтаксис чтения и записи здесь очень похож на наш метод потоковой передачи на стороне клиента, за исключением того, что сервер использует метод send() потока, а не SendAndClose() поскольку он записывает несколько ответов. Хотя каждая сторона всегда будет получать сообщения другой стороны в том порядке, в котором они были записаны, и клиент, и сервер могут читать и записывать в любом порядке — потоки работают полностью независимо.

Запустите сервер

После реализации всех наших методов нам также необходимо запустить gRPC-сервер, чтобы клиенты могли фактически использовать наш сервис. Следующий фрагмент кода показывает, как мы это делаем для нашего сервиса RouteGuide :

lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
  log.Fatalf("failed to listen: %v", err)
}

grpcServer := grpc.NewServer()

s := &routeGuideServer{routeNotes: make(map[string][]*pb.RouteNote)}
s.loadFeatures()
pb.RegisterRouteGuideServer(grpcServer, s)
grpcServer.Serve(lis)

Вот что происходит в main() , шаг за шагом:

  1. Укажите TCP-порт для прослушивания запросов от удалённых клиентов, используя lis, err := net.Listen(...) . По умолчанию приложение использует TCP-порт 50051 , указанный переменной port или параметром --port в командной строке при запуске сервера. Если TCP-порт не может быть открыт, приложение завершается с фатальной ошибкой.
  2. Создайте экземпляр gRPC-сервера с помощью grpc.NewServer(...) , назвав этот экземпляр grpcServer .
  3. Создайте указатель на структуру routeGuideServer , представляющую API-сервис приложения, и назовите указатель s.
  4. Используйте s.loadFeatures() для заполнения массива s.savedFeatures .
  5. Зарегистрируйте реализацию нашего сервиса на gRPC-сервере.
  6. Вызовите Serve() на сервере, указав порт, чтобы выполнить блокирующее ожидание запросов от клиентов; это будет продолжаться до тех пор, пока процесс не будет завершен или не будет вызвана Stop() .

Функция loadFeatures() получает сопоставление координат с местоположением из server/testdata.go .

6. Создайте клиента.

Теперь отредактируйте файл client/client.go , в котором вы разместите клиентский код.

Для вызова методов удалённого сервиса нам сначала необходимо создать gRPC- канал для связи с сервером. Мы создаём его, передавая строку целевого URI сервера (в данном случае это просто адрес и номер порта) в метод grpc.NewClient() в функции main() клиента следующим образом:

// Set up a connection to the gRPC server.
conn, err := grpc.NewClient("dns:///"+*serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
  log.Fatalf("fail to dial: %v", err)
}
defer conn.Close()

Адрес сервера, определяемый переменной serverAddr , по умолчанию равен localhost:50051 и может быть переопределен с помощью ключа --addr в командной строке при запуске клиента.

Если клиенту необходимо подключиться к службе, требующей учетных данных для аутентификации, например, TLS или JWT, клиент может передать объект DialOptions в качестве параметра функции grpc.NewClient , содержащий необходимые учетные данные. Служба RouteGuide не требует никаких учетных данных.

После настройки канала gRPC нам потребуется клиентский заглушка для выполнения RPC-вызовов через вызовы функций Go. Мы получаем эту заглушку с помощью метода NewRouteGuideClient , предоставляемого файлом route_guide_grpc.pb.go , сгенерированным из файла .proto приложения.

import (pb "github.com/grpc-ecosystem/codelabs/getting_started_streaming/routeguide")

client := pb.NewRouteGuideClient(conn)

Методы вызова сервиса

Теперь давайте посмотрим, как мы вызываем методы нашего сервиса. В gRPC-Go RPC-вызовы работают в блокирующем/синхронном режиме, что означает, что RPC-вызов ожидает ответа от сервера и либо возвращает ответ, либо ошибку.

Серверная потоковая передача RPC

Здесь мы вызываем серверный потоковый метод ListFeatures , который возвращает поток географических объектов Feature .

rect := &pb.Rectangle{ ... }  // initialize a pb.Rectangle
log.Printf("Looking for features within %v", rect)
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
  log.Fatalf("client.ListFeatures failed: %v", err)
}
for {
  // For server-to-client streaming RPCs, you call stream.Recv() until it
  // returns io.EOF.
  feature, err := stream.Recv()
  if err == io.EOF {
    break
  }
  if err != nil {
    log.Fatalf("client.ListFeatures failed: %v", err)
  }
  log.Printf("Feature: name: %q, point:(%v, %v)", feature.GetName(),
    feature.GetLocation().GetLatitude(), feature.GetLocation().GetLongitude())
}

Как и в простом RPC, мы передаем методу контекст и запрос. Однако вместо объекта ответа мы получаем экземпляр RouteGuide_ListFeaturesClient . Клиент может использовать поток RouteGuide_ListFeaturesClient для чтения ответов сервера. Мы используем метод Recv() объекта RouteGuide_ListFeaturesClient для многократного чтения ответов сервера в объект буфера протокола ответов (в данном случае Feature ), пока не закончатся сообщения: клиенту необходимо проверять ошибку err, возвращаемую методом Recv() после каждого вызова. Если nil , поток все еще исправен, и он может продолжать чтение; если io.EOF , то поток сообщений завершился; в противном случае должна быть ошибка RPC, которая передается через err .

RPC потоковой передачи на стороне клиента

Метод потоковой передачи на стороне клиента RecordRoute аналогичен методу на стороне сервера, за исключением того, что мы передаем методу только контекст и получаем обратно поток RouteGuide_RecordRouteClient , который можно использовать как для записи, так и для чтения сообщений.

// Create a random number of random points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
  points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
c2sStream, err := client.RecordRoute(context.TODO())
if err != nil {
  log.Fatalf("client.RecordRoute failed: %v", err)
}
// Stream each point to the server.
for _, point := range points {
  if err := c2sStream.Send(point); err != nil {
    log.Fatalf("client.RecordRoute: stream.Send(%v) failed: %v", point, err)
  }
}
// Close the stream and receive the RouteSummary from the server.
reply, err := c2sStream.CloseAndRecv()
if err != nil {
  log.Fatalf("client.RecordRoute failed: %v", err)
}
log.Printf("Route summary: %v", reply)

Объект RouteGuide_RecordRouteClient имеет метод Send() , который мы можем использовать для отправки запросов на сервер. После того, как мы закончим запись запросов клиента в поток с помощью Send() , нам необходимо вызвать CloseAndRecv() для этого потока, чтобы сообщить gRPC, что мы закончили запись и ожидаем получить ответ. Мы получаем статус RPC из ошибки, возвращаемой функцией CloseAndRecv() . Если статус равен nil , то первое возвращаемое значение из CloseAndRecv() будет действительным ответом сервера.

Двунаправленный потоковый RPC

Наконец, давайте рассмотрим наш двунаправленный потоковый RPC- RouteChat() . Как и в случае с RecordRoute , мы передаем методу только объект контекста и получаем обратно поток, который можно использовать как для записи, так и для чтения сообщений. Однако на этот раз мы возвращаем значения через поток нашего метода, пока сервер еще записывает сообщения в свой поток сообщений.

biDiStream, err := client.RouteChat(context.Background())
if err != nil {
  log.Fatalf("client.RouteChat failed: %v", err)
}
// this channel is used to wait for the receive goroutine to finish.
recvDoneCh := make(chan struct{})
// receive goroutine.
go func() {
  for {
    in, err := biDiStream.Recv()
    if err == io.EOF {
      // read done.
      close(recvDoneCh)
      return
    }
    if err != nil {
      log.Fatalf("client.RouteChat failed: %v", err)
    }
    log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
  }
}()
// send messages simultaneously.
for _, note := range notes {
  if err := biDiStream.Send(note); err != nil {
    log.Fatalf("client.RouteChat: stream.Send(%v) failed: %v", note, err)
  }
}
biDiStream.CloseSend()
// wait for the receive goroutine to finish.
<-recvDoneCh

Синтаксис чтения и записи здесь очень похож на наш метод потоковой передачи на стороне клиента, за исключением того, что мы используем метод CloseSend() потока после завершения вызова. Хотя каждая сторона всегда будет получать сообщения другой стороны в том порядке, в котором они были записаны, и клиент, и сервер могут читать и записывать в любом порядке — потоки работают полностью независимо.

7. Попробуйте.

Убедитесь в корректной работе сервера и клиента, выполнив следующие команды в рабочем каталоге приложения:

  1. Запустите сервер в одном терминале:
cd server
go run .
  1. Запустите клиент из другого терминала:
cd client
go run .

В результате вы увидите примерно такой вывод, при этом временные метки для наглядности опущены:

Looking for features within lo:<latitude:400000000 longitude:-750000000 > hi:<latitude:420000000 longitude:-730000000 >
name:"Patriots Path, Mendham, NJ 07945, USA" location:<latitude:407838351 longitude:-746143763 >
...
name:"3 Hasta Way, Newton, NJ 07860, USA" location:<latitude:410248224 longitude:-747127767 >
Traversing 56 points.
Route summary: point_count:56 distance:497013163
Got message First message at point(0, 1)
Got message Second message at point(0, 2)
Got message Third message at point(0, 3)
Got message First message at point(0, 1)
Got message Fourth message at point(0, 1)
Got message Second message at point(0, 2)
Got message Fifth message at point(0, 2)
Got message Third message at point(0, 3)
Got message Sixth message at point(0, 3)

8. Что дальше?