Primeiros passos com o gRPC-Go: streaming

1. Introdução

Neste codelab, você vai usar o gRPC-Go para criar um cliente e um servidor que formam a base de um aplicativo de mapeamento de rotas escrito em Go.

Ao final do tutorial, você terá um cliente que se conecta a um servidor remoto usando gRPC para receber informações sobre recursos em uma rota de cliente, criar um resumo de uma rota de cliente e trocar informações de rota, como atualizações de trânsito, com o servidor e outros clientes.

O serviço é definido em um arquivo Protocol Buffers, que será usado para gerar código boilerplate para o cliente e o servidor, permitindo que eles se comuniquem entre si e economizando tempo e esforço na implementação dessa funcionalidade.

Esse código gerado cuida não apenas das complexidades da comunicação entre o servidor e o cliente, mas também da serialização e desserialização de dados.

O que você vai aprender

  • Como usar buffers de protocolo para definir uma API de serviço.
  • Como criar um cliente e um servidor baseados em gRPC com uma definição de buffers de protocolo usando a geração automática de código.
  • Entendimento da comunicação de streaming cliente-servidor com gRPC.

Este codelab é destinado a desenvolvedores Go que não conhecem o gRPC ou querem relembrar o assunto, além de qualquer pessoa interessada em criar sistemas distribuídos. Não é necessário ter experiência com gRPC.

2. Antes de começar

Pré-requisitos

Verifique se você instalou o seguinte:

  • A versão 1.24.5 ou mais recente da cadeia de ferramentas do Go. Para instruções de instalação, consulte Primeiros passos do Go.
  • O compilador de buffers de protocolo, protoc, versão 3.27.1 ou mais recente. Para instruções de instalação, consulte o guia de instalação do compilador.
  • Os plug-ins do compilador de buffer de protocolo para Go e gRPC. Para instalar esses plug-ins, execute os seguintes comandos:
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

Atualize a variável PATH para que o compilador de buffer de protocolo possa encontrar os plug-ins:

export PATH="$PATH:$(go env GOPATH)/bin"

Acessar o código

Para que você não precise começar do zero, este codelab oferece um scaffold do código-fonte do aplicativo para você concluir. As etapas a seguir mostram como concluir o aplicativo, incluindo o uso dos plug-ins do compilador de buffer de protocolo para gerar o código gRPC padrão.

Primeiro, crie o diretório de trabalho do codelab e cd nele:

mkdir streaming-grpc-go-getting-started && cd streaming-grpc-go-getting-started

Faça o download e extraia o codelab:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-go-streaming/start_here

Como alternativa, baixe o arquivo .zip que contém apenas o diretório do codelab e descompacte-o manualmente.

O código-fonte completo está disponível no GitHub se você quiser pular a digitação de uma implementação.

3. Definir mensagens e serviços

A primeira etapa é definir o serviço gRPC do aplicativo, os métodos RPC e os tipos de mensagens de solicitação e resposta usando buffers de protocolo. Seu serviço vai oferecer:

  • Métodos RPC chamados ListFeatures, RecordRoute e RouteChat que o servidor implementa e o cliente chama.
  • Os tipos de mensagem Point, Feature, Rectangle, RouteNote e RouteSummary, que são estruturas de dados trocadas entre o cliente e o servidor ao chamar os métodos acima.

Esses métodos RPC e os tipos de mensagens serão definidos no arquivo routeguide/route_guide.proto do código-fonte fornecido.

Os buffers de protocolo são conhecidos como protobufs. Para mais informações sobre a terminologia do gRPC, consulte Conceitos principais, arquitetura e ciclo de vida do gRPC.

Definir tipos de mensagem

No arquivo routeguide/route_guide.proto do código-fonte, primeiro defina o tipo de mensagem Point. Um Point representa um par de coordenadas de latitude e longitude em um mapa. Neste codelab, use números inteiros para as coordenadas:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

Os números 1 e 2 são IDs exclusivos para cada um dos campos na estrutura message.

Em seguida, defina o tipo de mensagem Feature. Um Feature usa um campo string para o nome ou endereço postal de algo em um local especificado por um Point:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

Em seguida, uma mensagem Rectangle que representa um retângulo de latitude-longitude, representado como dois pontos diagonalmente opostos "lo" e "hi".

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

Também uma mensagem RouteNote que representa uma mensagem enviada em um determinado ponto.

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

Também vamos precisar de uma mensagem RouteSummary. Essa mensagem é recebida em resposta a uma RPC RecordRoute, que é explicada na próxima seção. Ele contém o número de pontos individuais recebidos, o número de recursos detectados e a distância total percorrida como a soma cumulativa da distância entre cada ponto.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

Definir métodos de serviço

Para definir um serviço, especifique um serviço nomeado no arquivo .proto. O arquivo route_guide.proto tem uma estrutura service chamada RouteGuide que define um ou mais métodos fornecidos pelo serviço do aplicativo.

Defina métodos RPC na definição do serviço, especificando os tipos de solicitação e resposta. Nesta seção do codelab, vamos definir:

ListFeatures

Recebe os Features disponíveis no Rectangle especificado. Os resultados são transmitidos em vez de retornados de uma só vez (por exemplo, em uma mensagem de resposta com um campo repetido), já que o retângulo pode abranger uma grande área e conter um grande número de recursos.

Um tipo adequado para essa RPC é uma RPC de streaming do lado do servidor: o cliente envia uma solicitação ao servidor e recebe um stream para ler uma sequência de mensagens. O cliente lê o stream retornado até que não haja mais mensagens. Como você pode ver no exemplo, especifique um método de streaming do lado do servidor colocando a palavra-chave "stream" antes do tipo de resposta.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

Aceita um fluxo de Points em uma rota sendo percorrida, retornando um RouteSummary quando o percurso é concluído.

Uma RPC de streaming do lado do cliente parece adequada nesse caso: o cliente grava uma sequência de mensagens e as envia ao servidor, novamente usando um stream fornecido. Depois que o cliente terminar de gravar as mensagens, ele vai aguardar o servidor ler todas elas e retornar a resposta. Para especificar um método de streaming do lado do cliente, coloque a palavra-chave "stream" antes do tipo de solicitação.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

Aceita um fluxo de RouteNotes enviados enquanto uma rota está sendo percorrida, recebendo outros RouteNotes (por exemplo, de outros usuários).

Esse é exatamente o tipo de caso de uso para streaming bidirecional. Em uma RPC de streaming bidirecional, os dois lados enviam uma sequência de mensagens usando um stream de leitura e gravação. Os dois streams operam de forma independente, então clientes e servidores podem ler e gravar na ordem que quiserem.

Por exemplo, o servidor pode esperar para receber todas as mensagens do cliente antes de escrever as respostas ou pode ler uma mensagem e escrever uma mensagem, ou alguma outra combinação de leituras e gravações.

A ordem das mensagens em cada stream é preservada. Para especificar esse tipo de método, coloque a palavra-chave "stream" antes da solicitação e da resposta.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. Gerar código de cliente e servidor

Em seguida, gere o código gRPC clichê para o cliente e o servidor do arquivo .proto usando o compilador de buffer de protocolo. No diretório routeguide, execute:

protoc --go_out=. --go_opt=paths=source_relative \
       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
       route_guide.proto

Esse comando gera os seguintes arquivos:

  • route_guide.pb.go, que contém funções para criar os tipos de mensagens do aplicativo e acessar os dados deles, além da definição dos tipos que representam as mensagens.
  • route_guide_grpc.pb.go, que contém funções usadas pelo cliente para chamar o método gRPC remoto do serviço e funções usadas pelo servidor para fornecer esse serviço remoto.

Em seguida, vamos implementar os métodos no lado do servidor para que, quando o cliente enviar uma solicitação, o servidor possa responder.

5. Implementar o serviço

Primeiro, vamos ver como criar um servidor RouteGuide. Há duas partes para fazer nosso serviço RouteGuide funcionar:

  • Implementar a interface de serviço gerada da nossa definição de serviço: fazer o "trabalho" real do nosso serviço.
  • Executar um servidor gRPC para detectar solicitações de clientes e enviá-las à implementação de serviço correta.

Vamos implementar o RouteGuide em server/server.go.

Implementar o RouteGuide

Precisamos implementar a interface RouteGuideService gerada. Veja como ficaria a implementação.

type routeGuideServer struct {
        ...
}
...
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
        ...
}
...

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
        ...
}
...

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
        ...
}

Vamos analisar cada implementação de RPC em detalhes.

RPC de streaming do lado do servidor

Comece com uma das nossas RPCs de streaming. ListFeatures é uma RPC de streaming do lado do servidor. Portanto, precisamos enviar vários Features de volta ao cliente.

func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
  for _, feature := range s.savedFeatures {
    if inRange(feature.Location, rect) {
      if err := stream.Send(feature); err != nil {
        return err
      }
    }
  }
  return nil
}

Como você pode ver, em vez de receber objetos simples de solicitação e resposta nos parâmetros do método, desta vez recebemos um objeto de solicitação (o Rectangle em que nosso cliente quer encontrar Features) e um objeto RouteGuide_ListFeaturesServer especial para gravar nossas respostas. No método, preenchemos quantos objetos Feature forem necessários para retornar, gravando-os no RouteGuide_ListFeaturesServer usando o método Send(). Por fim, como no nosso RPC simples, retornamos um erro nil para informar ao gRPC que terminamos de gravar as respostas. Se ocorrer algum erro nessa chamada, vamos retornar um erro não nulo. A camada gRPC vai traduzir isso em um status de RPC adequado para ser enviado pela rede.

RPC de streaming do lado do cliente

Agora vamos analisar algo um pouco mais complicado: o método de streaming do lado do cliente RecordRoute, em que recebemos um fluxo de Points do cliente e retornamos um único RouteSummary com informações sobre a viagem. Como você pode ver, desta vez o método não tem um parâmetro de solicitação. Em vez disso, ele recebe um fluxo RouteGuide_RecordRouteServer, que o servidor pode usar para ler e gravar mensagens. Ele pode receber mensagens do cliente usando o método Recv() e retornar a resposta única usando o método SendAndClose().

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
  var pointCount, featureCount, distance int32
  var lastPoint *pb.Point
  startTime := time.Now()
  for {
    point, err := stream.Recv()
    if err == io.EOF {
      endTime := time.Now()
      return stream.SendAndClose(&pb.RouteSummary{
        PointCount:   pointCount,
        FeatureCount: featureCount,
        Distance:     distance,
        ElapsedTime:  int32(endTime.Sub(startTime).Seconds()),
      })
    }
    if err != nil {
      return err
    }
    pointCount++
    for _, feature := range s.savedFeatures {
      if proto.Equal(feature.Location, point) {
        featureCount++
      }
    }
    if lastPoint != nil {
      distance += calcDistance(lastPoint, point)
    }
    lastPoint = point
  }
}

No corpo do método, usamos o método Recv() do RouteGuide_RecordRouteServer para ler repetidamente as solicitações do cliente em um objeto de solicitação (neste caso, um Point) até que não haja mais mensagens. O servidor precisa verificar o erro retornado de Recv() após cada chamada. Se for nil, o fluxo ainda estará bom e poderá continuar lendo. Se for io.EOF, o fluxo de mensagens terá terminado e o servidor poderá retornar o RouteSummary. Se ele tiver qualquer outro valor, vamos retornar o erro "como está" para que seja traduzido para um status de RPC pela camada gRPC.

RPC de streaming bidirecional

Por fim, vamos analisar nossa RPC de streaming bidirecional RouteChat().

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
  for {
    in, err := stream.Recv()
    if err == io.EOF {
      return nil
    }
    if err != nil {
      return err
    }
    key := serialize(in.Location)

    s.mu.Lock()
    s.routeNotes[key] = append(s.routeNotes[key], in)
    // Note: this copy prevents blocking other clients while serving this one.
    // We don't need to do a deep copy, because elements in the slice are
    // insert-only and never modified.
    rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
    copy(rn, s.routeNotes[key])
    s.mu.Unlock()

    for _, note := range rn {
      if err := stream.Send(note); err != nil {
        return err
      }
    }
  }
}

Desta vez, recebemos um fluxo RouteGuide_RouteChatServer que, como no nosso exemplo de streaming do lado do cliente, pode ser usado para ler e gravar mensagens. No entanto, desta vez, retornamos valores pelo stream do nosso método enquanto o cliente ainda está gravando mensagens no stream dele. A sintaxe para leitura e gravação aqui é muito semelhante ao nosso método de streaming do cliente, exceto que o servidor usa o método send() do fluxo em vez de SendAndClose() porque está gravando várias respostas. Embora cada lado sempre receba as mensagens do outro na ordem em que foram escritas, o cliente e o servidor podem ler e gravar em qualquer ordem. Os streams operam de forma completamente independente.

Iniciar o servidor

Depois de implementar todos os métodos, também precisamos iniciar um servidor gRPC para que os clientes possam usar nosso serviço. O snippet a seguir mostra como fazemos isso para nosso serviço RouteGuide:

lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
  log.Fatalf("failed to listen: %v", err)
}

grpcServer := grpc.NewServer()

s := &routeGuideServer{routeNotes: make(map[string][]*pb.RouteNote)}
s.loadFeatures()
pb.RegisterRouteGuideServer(grpcServer, s)
grpcServer.Serve(lis)

Veja o que está acontecendo em main(), etapa por etapa:

  1. Especifique a porta TCP a ser usada para detectar solicitações de clientes remotos usando lis, err := net.Listen(...). Por padrão, o aplicativo usa a porta TCP 50051, conforme especificado pela variável port ou transmitindo a chave --port na linha de comando ao executar o servidor. Se não for possível abrir a porta TCP, o aplicativo será encerrado com um erro fatal.
  2. Crie uma instância do servidor gRPC usando grpc.NewServer(...) e nomeie essa instância como grpcServer.
  3. Crie um ponteiro para routeGuideServer, uma estrutura que representa o serviço de API do aplicativo, nomeando o ponteiro s..
  4. Use s.loadFeatures() para preencher a matriz s.savedFeatures.
  5. Registre nossa implementação de serviço com o servidor gRPC.
  6. Chame Serve() no servidor com os detalhes da porta para fazer uma espera de bloqueio para solicitações do cliente. Isso continua até que o processo seja encerrado ou Stop() seja chamado.

A função loadFeatures() recebe os mapeamentos de coordenadas para localizações de server/testdata.go.

6. Criar o cliente

Agora edite client/client.go, que é onde você vai implementar o código do cliente.

Para chamar os métodos do serviço remoto, primeiro precisamos criar um canal gRPC para se comunicar com o servidor. Para isso, transmitimos a string URI de destino do servidor (que, neste caso, é simplesmente o endereço e o número da porta) para grpc.NewClient() na função main() do cliente da seguinte maneira:

// Set up a connection to the gRPC server.
conn, err := grpc.NewClient("dns:///"+*serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
  log.Fatalf("fail to dial: %v", err)
}
defer conn.Close()

O endereço do servidor, definido pela variável serverAddr, é localhost:50051 por padrão e pode ser substituído pela chave --addr na linha de comando ao executar o cliente.

Se o cliente precisar se conectar a um serviço que exige credenciais de autenticação, como TLS ou JWT, ele poderá transmitir um objeto DialOptions como parâmetro para grpc.NewClient que contenha as credenciais necessárias. O serviço RouteGuide não exige credenciais.

Depois que o canal gRPC é configurado, precisamos de um stub de cliente para realizar RPCs por chamadas de função Go. Recebemos esse stub usando o método NewRouteGuideClient fornecido pelo arquivo route_guide_grpc.pb.go gerado do arquivo .proto do aplicativo.

import (pb "github.com/grpc-ecosystem/codelabs/getting_started_streaming/routeguide")

client := pb.NewRouteGuideClient(conn)

Chamar métodos de serviço

Agora vamos ver como chamamos os métodos do serviço. No gRPC-Go, os RPCs operam em um modo de bloqueio/síncrono, o que significa que a chamada de RPC aguarda a resposta do servidor e retorna uma resposta ou um erro.

RPC de streaming do lado do servidor

Aqui, chamamos o método de streaming do lado do servidor ListFeatures, que retorna um stream de objetos geográficos Feature.

rect := &pb.Rectangle{ ... }  // initialize a pb.Rectangle
log.Printf("Looking for features within %v", rect)
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
  log.Fatalf("client.ListFeatures failed: %v", err)
}
for {
  // For server-to-client streaming RPCs, you call stream.Recv() until it
  // returns io.EOF.
  feature, err := stream.Recv()
  if err == io.EOF {
    break
  }
  if err != nil {
    log.Fatalf("client.ListFeatures failed: %v", err)
  }
  log.Printf("Feature: name: %q, point:(%v, %v)", feature.GetName(),
    feature.GetLocation().GetLatitude(), feature.GetLocation().GetLongitude())
}

Assim como no RPC simples, transmitimos ao método um contexto e uma solicitação. No entanto, em vez de receber um objeto de resposta, recebemos uma instância de RouteGuide_ListFeaturesClient. O cliente pode usar o fluxo RouteGuide_ListFeaturesClient para ler as respostas do servidor. Usamos o método Recv() de RouteGuide_ListFeaturesClient para ler repetidamente as respostas do servidor em um objeto de buffer de protocolo de resposta (neste caso, um Feature) até que não haja mais mensagens: o cliente precisa verificar o erro retornado de Recv() após cada chamada. Se for nil, o fluxo ainda está bom e pode continuar a leitura. Se for io.EOF, o fluxo de mensagens terminou. Caso contrário, há um erro de RPC, que é transmitido por err.

RPC de streaming do lado do cliente

O método de streaming do lado do cliente RecordRoute é semelhante ao método do lado do servidor, exceto que só transmitimos um contexto ao método e recebemos um stream RouteGuide_RecordRouteClient de volta, que pode ser usado para gravar e ler mensagens.

// Create a random number of random points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
  points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
c2sStream, err := client.RecordRoute(context.TODO())
if err != nil {
  log.Fatalf("client.RecordRoute failed: %v", err)
}
// Stream each point to the server.
for _, point := range points {
  if err := c2sStream.Send(point); err != nil {
    log.Fatalf("client.RecordRoute: stream.Send(%v) failed: %v", point, err)
  }
}
// Close the stream and receive the RouteSummary from the server.
reply, err := c2sStream.CloseAndRecv()
if err != nil {
  log.Fatalf("client.RecordRoute failed: %v", err)
}
log.Printf("Route summary: %v", reply)

O RouteGuide_RecordRouteClient tem um método Send() que pode ser usado para enviar solicitações ao servidor. Depois de terminar de gravar as solicitações do cliente no fluxo usando Send(), precisamos chamar CloseAndRecv() no fluxo para informar ao gRPC que terminamos de gravar e estamos esperando receber uma resposta. Recebemos o status da RPC do erro retornado de CloseAndRecv(). Se o status for nil, o primeiro valor de retorno de CloseAndRecv() será uma resposta válida do servidor.

RPC de streaming bidirecional

Por fim, vamos analisar nossa RPC de streaming bidirecional RouteChat(). Assim como no caso de RecordRoute, só transmitimos ao método um objeto de contexto e recebemos um fluxo que pode ser usado para gravar e ler mensagens. No entanto, desta vez, retornamos valores pelo stream do nosso método enquanto o servidor ainda está gravando mensagens no stream de mensagens.

biDiStream, err := client.RouteChat(context.Background())
if err != nil {
  log.Fatalf("client.RouteChat failed: %v", err)
}
// this channel is used to wait for the receive goroutine to finish.
recvDoneCh := make(chan struct{})
// receive goroutine.
go func() {
  for {
    in, err := biDiStream.Recv()
    if err == io.EOF {
      // read done.
      close(recvDoneCh)
      return
    }
    if err != nil {
      log.Fatalf("client.RouteChat failed: %v", err)
    }
    log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
  }
}()
// send messages simultaneously.
for _, note := range notes {
  if err := biDiStream.Send(note); err != nil {
    log.Fatalf("client.RouteChat: stream.Send(%v) failed: %v", note, err)
  }
}
biDiStream.CloseSend()
// wait for the receive goroutine to finish.
<-recvDoneCh

A sintaxe para leitura e gravação aqui é muito semelhante ao nosso método de streaming do lado do cliente, exceto que usamos o método CloseSend() do stream depois de terminar a chamada. Embora cada lado sempre receba as mensagens do outro na ordem em que foram escritas, o cliente e o servidor podem ler e gravar em qualquer ordem. Os streams operam de forma completamente independente.

7. Faça um teste

Confirme se o servidor e o cliente estão funcionando corretamente executando os seguintes comandos no diretório de trabalho do aplicativo:

  1. Execute o servidor em um terminal:
cd server
go run .
  1. Execute o cliente em outro terminal:
cd client
go run .

Você vai ver uma saída como esta, com carimbos de data/hora omitidos para maior clareza:

Looking for features within lo:<latitude:400000000 longitude:-750000000 > hi:<latitude:420000000 longitude:-730000000 >
name:"Patriots Path, Mendham, NJ 07945, USA" location:<latitude:407838351 longitude:-746143763 >
...
name:"3 Hasta Way, Newton, NJ 07860, USA" location:<latitude:410248224 longitude:-747127767 >
Traversing 56 points.
Route summary: point_count:56 distance:497013163
Got message First message at point(0, 1)
Got message Second message at point(0, 2)
Got message Third message at point(0, 3)
Got message First message at point(0, 1)
Got message Fourth message at point(0, 1)
Got message Second message at point(0, 2)
Got message Fifth message at point(0, 2)
Got message Third message at point(0, 3)
Got message Sixth message at point(0, 3)

8. A seguir