Primeiros passos com gRPC-Python: streaming

1. Introdução

Neste codelab, você vai usar o gRPC-Python para criar um cliente e um servidor que formam a base de um aplicativo de mapeamento de rotas escrito em Python.

Ao final do tutorial, você terá um cliente que se conecta a um servidor remoto usando gRPC para receber informações sobre recursos em uma rota de cliente, criar um resumo de uma rota de cliente e trocar informações de rota, como atualizações de trânsito, com o servidor e outros clientes.

O serviço é definido em um arquivo Protocol Buffers, que será usado para gerar código boilerplate para o cliente e o servidor, permitindo que eles se comuniquem entre si e economizando tempo e esforço na implementação dessa funcionalidade.

Esse código gerado cuida não apenas das complexidades da comunicação entre o servidor e o cliente, mas também da serialização e desserialização de dados.

O que você vai aprender

  • Como usar buffers de protocolo para definir uma API de serviço.
  • Como criar um cliente e um servidor baseados em gRPC com uma definição de buffers de protocolo usando a geração automática de código.
  • Entendimento da comunicação de streaming cliente-servidor com gRPC.

Este codelab é destinado a desenvolvedores Python que não conhecem o gRPC ou querem relembrar o assunto, ou a qualquer pessoa interessada em criar sistemas distribuídos. Não é necessário ter experiência com gRPC.

2. Antes de começar

O que é necessário

  • Python 3.9 ou mais recente. Recomendamos o Python 3.13. Para instruções de instalação específicas da plataforma, consulte Configuração e uso do Python. Se preferir, instale um Python não sistêmico usando ferramentas como uv ou pyenv.
  • pip para instalar pacotes Python.
  • venv para criar ambientes virtuais em Python.

Os pacotes ensurepip e venv fazem parte da biblioteca padrão do Python e geralmente estão disponíveis por padrão.

No entanto, algumas distribuições baseadas em Debian (incluindo o Ubuntu) optam por excluí-los ao redistribuir o Python. Para instalar os pacotes, execute:

sudo apt install python3-pip python3-venv

Acessar o código

Para facilitar o aprendizado, este codelab oferece um scaffold de código-fonte pré-criado para ajudar você a começar. As etapas a seguir vão orientar você na conclusão do aplicativo, incluindo a geração de código gRPC usando o plug-in do compilador de buffer de protocolo grpc_tools.protoc.

grpc-codelabs

O código-fonte do scaffold para este codelab está disponível no diretório codelabs/grpc-python-streaming/start_here. Se preferir não implementar o código por conta própria, o código-fonte concluído está disponível no diretório completed.

Primeiro, crie o diretório de trabalho do codelab e use cd para acessar ele:

mkdir grpc-python-streaming && cd grpc-python-streaming

Faça o download e extraia o codelab:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-python-streaming/start_here

Como alternativa, baixe o arquivo .zip que contém apenas o diretório do codelab e descompacte-o manualmente.

3. Definir mensagens e serviços

A primeira etapa é definir o serviço gRPC do aplicativo, o método RPC e os tipos de mensagens de solicitação e resposta usando buffers de protocolo. Seu serviço vai oferecer:

  • Métodos RPC chamados ListFeatures, RecordRoute e RouteChat que o servidor implementa e o cliente chama.
  • Os tipos de mensagem Point, Feature, Rectangle, RouteNote e RouteSummary, que são estruturas de dados trocadas entre o cliente e o servidor ao chamar os métodos RPC.

Esses métodos RPC e os tipos de mensagens deles serão definidos no arquivo protos/route_guide.proto do código-fonte fornecido.

Os buffers de protocolo são conhecidos como protobufs. Para mais informações sobre a terminologia do gRPC, consulte Conceitos principais, arquitetura e ciclo de vida do gRPC.

Definir tipos de mensagem

No arquivo protos/route_guide.proto do código-fonte, primeiro defina o tipo de mensagem Point. Um Point representa um par de coordenadas de latitude e longitude em um mapa. Neste codelab, use números inteiros para as coordenadas:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

Os números 1 e 2 são IDs exclusivos para cada um dos campos na estrutura message.

Em seguida, defina o tipo de mensagem Feature. Um Feature usa um campo string para o nome ou endereço postal de algo em um local especificado por um Point:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

Para que vários pontos em uma área possam ser transmitidos para um cliente, você vai precisar de uma mensagem Rectangle que represente um retângulo de latitude e longitude, representado como dois pontos diagonalmente opostos lo e hi:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

Além disso, uma mensagem RouteNote que representa uma mensagem enviada em um determinado ponto:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

Por fim, você vai precisar de uma mensagem RouteSummary. Essa mensagem é recebida em resposta a uma RPC RecordRoute, que é explicada na próxima seção. Ele contém o número de pontos individuais recebidos, o número de recursos detectados e a distância total percorrida como a soma cumulativa da distância entre cada ponto.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

Definir métodos de serviço

Para definir um serviço, especifique um serviço nomeado no arquivo .proto. O arquivo route_guide.proto tem uma estrutura service chamada RouteGuide que define um ou mais métodos fornecidos pelo serviço do aplicativo.

Ao definir métodos RPC na definição do serviço, você especifica os tipos de solicitação e resposta. Nesta seção do codelab, vamos definir:

ListFeatures

Recebe os objetos Feature disponíveis no Rectangle especificado. Os resultados são transmitidos em vez de retornados de uma só vez, já que o retângulo pode abranger uma grande área e conter um grande número de recursos.

Para esse aplicativo, você vai usar uma RPC de streaming do lado do servidor: o cliente envia uma solicitação ao servidor e recebe um stream para ler uma sequência de mensagens. O cliente lê o stream retornado até que não haja mais mensagens. Como você pode ver no exemplo, especifique um método de streaming do lado do servidor colocando a palavra-chave "stream" antes do tipo de resposta.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

Aceita um fluxo de pontos em uma rota percorrida, retornando um RouteSummary quando o percurso é concluído.

Um RPC de streaming do lado do cliente é adequado nesse caso: o cliente grava uma sequência de mensagens e as envia ao servidor, novamente usando um stream fornecido. Depois que o cliente terminar de gravar as mensagens, ele vai aguardar o servidor ler todas elas e retornar a resposta. Para especificar um método de streaming do lado do cliente, coloque a palavra-chave "stream" antes do tipo de solicitação.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

Aceita um fluxo de RouteNotes enviado enquanto uma rota está sendo percorrida, recebendo outros RouteNotes (por exemplo, de outros usuários).

Esse é exatamente o tipo de caso de uso para streaming bidirecional. Um RPC de streaming bidirecional em que os dois lados enviam uma sequência de mensagens usando um stream de leitura e gravação. Os dois streams operam de forma independente. Assim, clientes e servidores podem ler e gravar na ordem que quiserem. Por exemplo, o servidor pode esperar para receber todas as mensagens do cliente antes de gravar as respostas ou pode ler uma mensagem e gravar uma mensagem alternadamente ou alguma outra combinação de leituras e gravações. A ordem das mensagens em cada stream é preservada. Para especificar esse tipo de método, coloque a palavra-chave "stream" antes da solicitação e da resposta.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. Gerar o código do cliente e do servidor

Em seguida, gere o código gRPC clichê para o cliente e o servidor do arquivo .proto usando o compilador de buffer de protocolo.

Para a geração de código Python do gRPC, criamos o grpcio-tools. Ela inclui:

  1. O compilador protoc comum que gera código Python com base em definições message.
  2. Plug-in protobuf do gRPC que gera código Python (stubs de cliente e servidor) das definições service.

Vamos instalar o pacote Python grpcio-tools usando pip. Vamos criar um novo ambiente virtual do Python (venv) para isolar as dependências do projeto dos pacotes do sistema:

python3 -m venv --upgrade-deps .venv

Para ativar o ambiente virtual no shell bash/zsh:

source .venv/bin/activate

Para Windows e shells não padrão, consulte a tabela em https://docs.python.org/3/library/venv.html#how-venvs-work.

Em seguida, instale o grpcio-tools (isso também instala o pacote grpcio):

pip install grpcio-tools

Use o comando a seguir para gerar o código boilerplate do Python:

python -m grpc_tools.protoc --proto_path=./protos  \
 --python_out=. --pyi_out=. --grpc_python_out=. \
 ./protos/route_guide.proto

Isso vai gerar os seguintes arquivos para as interfaces definidas em route_guide.proto:

  1. route_guide_pb2.py contém o código que cria classes dinamicamente geradas das definições message.
  2. route_guide_pb2.pyi é um "arquivo stub" ou "arquivo de dica de tipo" gerado das definições message. Ele contém apenas as assinaturas, sem implementação. Os arquivos stub podem ser usados por IDEs para oferecer melhor preenchimento automático e detecção de erros.
  3. O route_guide_pb2_grpc.py é gerado com base nas definições service e contém classes e funções específicas do gRPC.

O código específico do gRPC contém:

  1. RouteGuideStub, que pode ser usado por um cliente gRPC para invocar RPCs do RouteGuide.
  2. RouteGuideServicer, que define a interface para implementações do serviço RouteGuide.
  3. Função add_RouteGuideServicer_to_server usada para registrar um RouteGuideServicer em um servidor gRPC.

5. Criar o servidor

Primeiro, vamos ver como criar um servidor RouteGuide. A criação e a execução de um servidor RouteGuide são divididas em dois itens de trabalho:

  • Implementar a interface do serviço gerada da nossa definição de serviço com funções que realizam o "trabalho" real do serviço.
  • Executar um servidor gRPC para detectar solicitações de clientes e transmitir respostas.

Vamos analisar route_guide_server.py.

Implementar o RouteGuide

route_guide_server.py tem uma classe RouteGuideServicer que é subclasse da classe gerada route_guide_pb2_grpc.RouteGuideServicer:

# RouteGuideServicer provides an implementation of the methods of the RouteGuide service.
class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):

RouteGuideServicer implementa todos os métodos de serviço RouteGuide.

RPC de streaming do lado do servidor

ListFeatures é uma RPC de streaming de respostas que envia vários Features ao cliente:

def ListFeatures(self, request, context):
    """List all features contained within the given Rectangle."""
    left = min(request.lo.longitude, request.hi.longitude)
    right = max(request.lo.longitude, request.hi.longitude)
    top = max(request.lo.latitude, request.hi.latitude)
    bottom = min(request.lo.latitude, request.hi.latitude)
    for feature in self.db:
        lat, lng = feature.location.latitude, feature.location.longitude
        if left <= lng <= right and bottom <= lat <= top:
            yield feature

Aqui, a mensagem de solicitação é um route_guide_pb2.Rectangle em que o cliente quer encontrar Features. Em vez de retornar uma única resposta, o método gera zero ou mais respostas.

RPC de streaming do lado do cliente

O método de transmissão de solicitação RecordRoute usa um iterador de valores de solicitação e retorna um único valor de resposta.

def RecordRoute(self, request_iterator, context):
    """Calculate statistics about the trip composed of Points."""
    point_count = 0
    feature_count = 0
    distance = 0.0
    prev_point = None

    start_time = time.time()
    for point in request_iterator:
        point_count += 1
        if get_feature(self.db, point):
            feature_count += 1
        if prev_point:
            distance += get_distance(prev_point, point)
        prev_point = point

    elapsed_time = time.time() - start_time
    return route_guide_pb2.RouteSummary(
        point_count=point_count,
        feature_count=feature_count,
        distance=int(distance),
        elapsed_time=int(elapsed_time),
    )

RPC de streaming bidirecional

Por fim, vamos analisar nossa RPC de streaming bidirecional RouteChat():

def RouteChat(self, request_iterator, context):
    """
    Receive a stream of message/location pairs, and responds with
    a stream of all previous messages for the given location.
    """
    prev_notes = []
    for new_note in request_iterator:
        for prev_note in prev_notes:
            if prev_note.location == new_note.location:
                yield prev_note
        prev_notes.append(new_note)

A semântica desse método é uma combinação dos métodos de streaming de solicitação e de resposta. Ele recebe um iterador de valores de solicitação e também é um iterador de valores de resposta.

Iniciar o servidor

Depois de implementar todos os métodos RouteGuide, a próxima etapa é iniciar um servidor gRPC para que os clientes possam usar seu serviço:

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    route_guide_pb2_grpc.add_RouteGuideServicer_to_server(
        RouteGuideServicer(),
        server,
    )
    listen_addr = "localhost:50051"
    server.add_insecure_port(listen_addr)
    print(f"Starting server on {listen_addr}")
    server.start()
    server.wait_for_termination()

O método do servidor start() não é bloqueador. Um novo thread será instanciado para processar solicitações. A linha de execução que chama server.start() geralmente não tem outro trabalho a fazer nesse meio tempo. Nesse caso, chame server.wait_for_termination() para bloquear corretamente a linha de execução da chamada até que o servidor seja encerrado.

6. Criar o cliente

Vamos analisar route_guide_client.py.

Criar um stub

Para chamar métodos de serviço, primeiro precisamos criar um stub.

Vamos instanciar a classe RouteGuideStub do módulo route_guide_pb2_grpc, gerada pelo método .proto. In run():

with grpc.insecure_channel("localhost:50051") as channel:
    stub = route_guide_pb2_grpc.RouteGuideStub(channel)

Observe que channel é usado como um gerenciador de contexto e será fechado automaticamente quando o interpretador sair do bloco with.

Chamar métodos de serviço

Para métodos RPC que retornam uma única resposta (métodos "response-unary"), o gRPC Python é compatível com semântica de fluxo de controle síncrona (bloqueio) e assíncrona (sem bloqueio). Para métodos RPC de streaming de resposta, as chamadas retornam imediatamente um iterador de valores de resposta. As chamadas para o método next() desse iterador são bloqueadas até que a resposta a ser gerada pelo iterador fique disponível.

RPC de streaming do lado do servidor

Chamar o ListFeatures de transmissão de resposta é semelhante a trabalhar com tipos de sequência:

def guide_list_features(stub):
    _lo = route_guide_pb2.Point(latitude=400000000, longitude=-750000000)
    _hi = route_guide_pb2.Point(latitude=420000000, longitude=-730000000)
    rectangle = route_guide_pb2.Rectangle(
        lo=_lo,
        hi=_hi,
    )
    print("Looking for features between 40, -75 and 42, -73")

    features = stub.ListFeatures(rectangle)
    for feature in features:
        print(
            f"Feature called '{feature.name}'"
            f" at {format_point(feature.location)}"
        )

RPC de streaming do lado do cliente

Chamar o RecordRoute de transmissão de solicitação é semelhante a transmitir um iterador para um método local. Assim como o RPC simples acima, que também retorna uma única resposta, ele pode ser chamado de forma síncrona:

def guide_record_route(stub):
    feature_list = route_guide_resources.read_route_guide_database()
    route_iterator = generate_route(feature_list)

    route_summary = stub.RecordRoute(route_iterator)
    print(f"Finished trip with {route_summary.point_count} points")
    print(f"Passed {route_summary.feature_count} features")
    print(f"Traveled {route_summary.distance} meters")
    print(f"It took {route_summary.elapsed_time} seconds")

RPC de streaming bidirecional

Chamar o RouteChat de streaming bidirecional tem (como é o caso no lado do serviço) uma combinação das semânticas de streaming de solicitação e resposta.

Gere as mensagens de solicitação e envie uma por uma usando yield.

def generate_notes():
    home = route_guide_pb2.Point(latitude=1, longitude=1)
    work = route_guide_pb2.Point(latitude=2, longitude=2)
    notes = [
        make_route_note("Departing from home", home),
        make_route_note("Arrived at work", work),
        make_route_note("Having lunch at work", work),
        make_route_note("Departing from work", work),
        make_route_note("Arrived home", home),
    ]
    for note in notes:
        print(
            f"Sending RouteNote for {format_point(note.location)}:"
            f" {note.message}"
        )
        yield note
        # Sleep to simulate moving from one point to another.
        # Only for demonstrating the order of the messages.
        time.sleep(0.1)

Receber e processar respostas do servidor:

def guide_route_chat(stub):
    responses = stub.RouteChat(generate_notes())
    for response in responses:
        print(
            "< Found previous note at"
            f" {format_point(response.location)}: {response.message}"
        )

Chamar os métodos auxiliares

Em "run", execute os métodos que acabamos de criar e transmita a stub.

print("-------------- ListFeatures --------------")
guide_list_features(stub)
print("-------------- RecordRoute --------------")
guide_record_route(stub)
print("-------------- RouteChat --------------")
guide_route_chat(stub)

7. Faça um teste

Execute o servidor:

python route_guide_server.py

Em outro terminal, ative o ambiente virtual novamente (source .venv/bin/activate)) e execute o cliente:

python route_guide_client.py

Vamos conferir o resultado.

ListFeatures

Primeiro, você vai encontrar a lista de recursos. Cada recurso é transmitido do servidor (RPC de streaming do lado do servidor) à medida que ele os descobre dentro do retângulo solicitado:

-------------- ListFeatures --------------
Looking for features between 40, -75 and 42, -73
Feature called 'Patriots Path, Mendham, NJ 07945, USA' at (lat=407838351, lng=-746143763)
Feature called '101 New Jersey 10, Whippany, NJ 07981, USA' at (lat=408122808, lng=-743999179)
Feature called 'U.S. 6, Shohola, PA 18458, USA' at (lat=413628156, lng=-749015468)
Feature called '5 Conners Road, Kingston, NY 12401, USA' at (lat=419999544, lng=-740371136)
...

RecordRoute

Em segundo lugar, RecordRoute demonstra a lista de pontos visitados aleatoriamente transmitidos do cliente para o servidor (RPC de streaming do lado do cliente):

-------------- RecordRoute --------------
Visiting point (lat=410395868, lng=-744972325)
Visiting point (lat=404310607, lng=-740282632)
Visiting point (lat=403966326, lng=-748519297)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=406589790, lng=-743560121)
Visiting point (lat=410322033, lng=-747871659)
Visiting point (lat=415464475, lng=-747175374)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=402647019, lng=-747071791)
Visiting point (lat=414638017, lng=-745957854)

Depois que o cliente terminar de transmitir todos os pontos visitados, ele vai receber uma resposta sem streaming (um RPC unário) do servidor. Essa resposta vai conter um resumo dos cálculos realizados no trajeto completo do cliente.

Finished trip with 10 points
Passed 10 features
Traveled 654743 meters
It took 0 seconds

RouteChat

Por fim, a saída RouteChat demonstra o streaming bidirecional. Quando o cliente está "visitando" os pontos home ou work, ele grava uma observação para o ponto enviando um RouteNote ao servidor. Quando um ponto já foi visitado, o servidor transmite todas as observações anteriores para esse ponto.

-------------- RouteChat --------------
Sending RouteNote for (lat=1, lng=1): Departing from home
Sending RouteNote for (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Having lunch at work
< Found previous note at (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Departing from work
< Found previous note at (lat=2, lng=2): Arrived at work
< Found previous note at (lat=2, lng=2): Having lunch at work
Sending RouteNote for (lat=1, lng=1): Arrived home
< Found previous note at (lat=1, lng=1): Departing from home

8. A seguir