Primeiros passos com gRPC-Rust: streaming

1. Introdução

Neste codelab, você vai usar o gRPC-Rust para criar um cliente e um servidor que formam a base de um aplicativo de mapeamento de rotas escrito em Rust.

Ao final do tutorial, você terá um cliente que se conecta a um servidor remoto usando gRPC para receber informações sobre recursos em uma rota de cliente, criar um resumo de uma rota de cliente e trocar informações de rota, como atualizações de trânsito, com o servidor e outros clientes.

O serviço é definido em um arquivo Protocol Buffers, que será usado para gerar código boilerplate para o cliente e o servidor, permitindo que eles se comuniquem entre si e economizando tempo e esforço na implementação dessa funcionalidade.

Esse código gerado cuida não apenas das complexidades da comunicação entre o servidor e o cliente, mas também da serialização e desserialização de dados.

O que você vai aprender

  • Como usar buffers de protocolo para definir uma API de serviço.
  • Como criar um cliente e um servidor baseados em gRPC com uma definição de buffers de protocolo usando a geração automática de código.
  • Entendimento da comunicação de streaming cliente-servidor com gRPC.

Este codelab é destinado a desenvolvedores do Rust que não conhecem o gRPC ou querem relembrar o gRPC, ou qualquer pessoa interessada em criar sistemas distribuídos. Não é necessário ter experiência com gRPC.

2. Antes de começar

Pré-requisitos

Verifique se você instalou o seguinte:

  • GCC. Siga as instruções aqui.
  • Rust, versão mais recente. Siga as instruções de instalação aqui.

Acessar o código

Para que você não precise começar do zero, este codelab oferece um scaffold do código-fonte do aplicativo para você concluir. As etapas a seguir mostram como concluir o aplicativo, incluindo o uso dos plug-ins do compilador de buffer de protocolo para gerar o código gRPC boilerplate.

Primeiro, crie o diretório de trabalho do codelab e cd nele:

mkdir streaming-grpc-rust-getting-started && cd streaming-grpc-rust-getting-started

Faça o download e extraia o codelab:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-rust-streaming/start_here

Como alternativa, baixe o arquivo .zip que contém apenas o diretório do codelab e descompacte-o manualmente.

O código-fonte completo está disponível no GitHub se você quiser pular a digitação de uma implementação.

3. Definir mensagens e serviços

A primeira etapa é definir o serviço gRPC do aplicativo, os métodos RPC e os tipos de mensagens de solicitação e resposta usando buffers de protocolo. Seu serviço vai oferecer:

  • Métodos RPC chamados ListFeatures, RecordRoute e RouteChat que o servidor implementa e o cliente chama.
  • Os tipos de mensagem Point, Feature, Rectangle, RouteNote e RouteSummary, que são estruturas de dados trocadas entre o cliente e o servidor ao chamar os métodos acima.

Esses métodos RPC e os tipos de mensagens serão definidos no arquivo proto/routeguide.proto do código-fonte fornecido.

Os buffers de protocolo são conhecidos como protobufs. Para mais informações sobre a terminologia do gRPC, consulte Conceitos principais, arquitetura e ciclo de vida do gRPC.

Definir tipos de mensagem

Primeiro, vamos definir as mensagens que serão usadas pelas nossas RPCs. No arquivo routeguide/route_guide.proto do código-fonte, primeiro defina o tipo de mensagem Point. Um Point representa um par de coordenadas de latitude e longitude em um mapa. Neste codelab, use números inteiros para as coordenadas:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

Os números 1 e 2 são IDs exclusivos para cada um dos campos na estrutura message.

Em seguida, defina o tipo de mensagem Feature. Um Feature usa um campo string para o nome ou endereço postal de algo em um local especificado por um Point:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

Em seguida, uma mensagem Rectangle que representa um retângulo de latitude-longitude, representado como dois pontos diagonalmente opostos "lo" e "hi".

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

Também uma mensagem RouteNote que representa uma mensagem enviada em um determinado ponto.

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

Também vamos precisar de uma mensagem RouteSummary. Essa mensagem é recebida em resposta a uma RPC RecordRoute, que é explicada na próxima seção. Ele contém o número de pontos individuais recebidos, o número de recursos detectados e a distância total percorrida como a soma cumulativa da distância entre cada ponto.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

Definir métodos de serviço

Primeiro, vamos definir nosso serviço e, depois, nossas mensagens. Para definir um serviço, especifique um serviço nomeado no arquivo .proto. O arquivo proto/routeguide.proto tem uma estrutura service chamada RouteGuide que define um ou mais métodos fornecidos pelo serviço do aplicativo.

Defina métodos RPC na definição do serviço, especificando os tipos de solicitação e resposta. Nesta seção do codelab, vamos definir:

ListFeatures

Recebe os Features disponíveis no Rectangle especificado. Os resultados são transmitidos em vez de retornados de uma só vez (por exemplo, em uma mensagem de resposta com um campo repetido), já que o retângulo pode abranger uma grande área e conter um grande número de recursos.

Um tipo adequado para essa RPC é uma RPC de streaming do lado do servidor: o cliente envia uma solicitação ao servidor e recebe um stream para ler uma sequência de mensagens. O cliente lê o stream retornado até que não haja mais mensagens. Como você pode ver no exemplo, especifique um método de streaming do lado do servidor colocando a palavra-chave stream antes do tipo de resposta.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

Aceita um fluxo de Points em uma rota sendo percorrida, retornando um RouteSummary quando o percurso é concluído.

Uma RPC de streaming do lado do cliente parece adequada nesse caso: o cliente grava uma sequência de mensagens e as envia ao servidor, novamente usando um stream fornecido. Depois que o cliente terminar de gravar as mensagens, ele vai aguardar o servidor ler todas elas e retornar a resposta. Para especificar um método de streaming do lado do cliente, coloque a palavra-chave stream antes do tipo de solicitação.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

Aceita um fluxo de RouteNotes enviados enquanto uma rota está sendo percorrida, recebendo outros RouteNotes (por exemplo, de outros usuários).

Esse é exatamente o tipo de caso de uso para streaming bidirecional. Em uma RPC de streaming bidirecional, os dois lados enviam uma sequência de mensagens usando um stream de leitura e gravação. Os dois streams operam de forma independente, então clientes e servidores podem ler e gravar na ordem que quiserem.

Por exemplo, o servidor pode esperar para receber todas as mensagens do cliente antes de escrever as respostas ou pode ler uma mensagem e escrever uma mensagem, ou alguma outra combinação de leituras e gravações.

A ordem das mensagens em cada stream é preservada. Para especificar esse tipo de método, coloque a palavra-chave stream antes da solicitação e da resposta.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. Gerar o código do cliente e do servidor

Já fornecemos o código gerado do arquivo .proto no diretório gerado.

Se quiser aprender a gerar código do arquivo .proto ou fazer mudanças nele e testá-las, consulte estas instruções..proto

O código gerado contém:

  • Definições de struct para os tipos de mensagem Point, Feature, Rectangle, RouteNote e RouteSummary.
  • Um traço de serviço que precisaremos implementar: route_guide_server::RouteGuide.
  • Um tipo de cliente que vamos usar para chamar o servidor: route_guide_client::RouteGuideClient<T>.

Em seguida, vamos implementar os métodos no lado do servidor para que, quando o cliente enviar uma solicitação, o servidor possa responder.

5. Implementar o serviço

Primeiro, vamos ver como criar um servidor RouteGuide. Há duas partes para fazer nosso serviço RouteGuide funcionar:

  • Implementar a interface de serviço gerada da nossa definição de serviço: fazer o "trabalho" real do nosso serviço.
  • Executar um servidor gRPC para detectar solicitações de clientes e enviá-las à implementação do método correto.

Em src/server/server.rs, podemos trazer o código gerado para o escopo usando a macro include_generated_proto! do gRPC e importar a característica RouteGuide e Point.

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

pub use grpc_pb::{
    route_guide_server::{RouteGuideServer, RouteGuide},
    Point, Feature, Rectangle, RouteNote, RouteSummary
};

Podemos começar definindo uma struct para representar nosso serviço. Por enquanto, podemos fazer isso em src/server/server.rs:

#[derive(Debug)]
pub struct RouteGuideService {
    features: Vec<Feature>,
}

Agora, precisamos implementar a característica route_guide_server::RouteGuide do código gerado.

Implementar o RouteGuide

Precisamos implementar a interface RouteGuide gerada. Veja como ficaria a implementação. Isso já está no modelo.

#[tonic::async_trait]
impl RouteGuide for RouteGuideService {
    async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        ...
    }

    async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        ...
    }

    async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        ...
    }
}

Vamos analisar cada implementação de RPC em detalhes.

RPC de streaming do lado do servidor

Vamos começar com ListFeatures. Essa é uma RPC de streaming do lado do servidor. Portanto, precisamos enviar vários Features de volta ao cliente.

async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        println!("ListFeatures = {:?}", request);

        let (tx, rx) = mpsc::channel(4);
        let features = self.features.clone();

        tokio::spawn(async move {
            for feature in &features[..] {
                if in_range(&feature.location().to_owned(), request.get_ref()) {
                    println!("  => send {feature:?}");
                    tx.send(Ok(feature.clone())).await.unwrap();
                }
            }
            println!(" /// done sending");
        });

        let output_stream = ReceiverStream::new(rx);
        Ok(Response::new(Box::pin(output_stream)))
    }

Como você pode ver, recebemos um objeto de solicitação (o Rectangle em que nosso cliente quer encontrar Features). Desta vez, precisamos retornar um fluxo de valores. Criamos um canal e geramos uma nova tarefa assíncrona em que fazemos uma pesquisa, enviando para o canal os recursos que atendem às nossas restrições. A metade do fluxo do canal é retornada ao autor da chamada, encapsulada em um tonic::Response.

RPC de streaming do lado do cliente

Agora vamos analisar algo um pouco mais complicado: o método de streaming do lado do cliente RecordRoute, em que recebemos um fluxo de Points do cliente e retornamos um único RouteSummary com informações sobre a viagem. Ele recebe um stream como entrada, que o servidor pode usar para ler e gravar mensagens. Ele pode iterar pelas mensagens do cliente usando o método next() e retornar uma única resposta.

async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        println!("RecordRoute");
        let mut stream = request.into_inner();
        let mut summary = RouteSummary::default();
        let mut last_point = None;
        let now = Instant::now();

        while let Some(point) = stream.next().await {
            let point = point?;
            println!("  ==> Point = {point:?}");

            // Increment the point count
            summary.set_point_count(summary.point_count() + 1);

            // Find features
            for feature in &self.features[..] {
                if feature.location().latitude() == point.latitude() {
                    if feature.location().longitude() == point.longitude(){
                        summary.set_feature_count(summary.feature_count() + 1);
                    }
                }
            }

            // Calculate the distance
            if let Some(ref last_point) = last_point {
                let new_dist = summary.distance() + calc_distance(last_point, &point);
                summary.set_distance(new_dist);
            }
            last_point = Some(point);
        }
        summary.set_elapsed_time(now.elapsed().as_secs() as i32);
        Ok(Response::new(summary))
    }

No corpo do método, usamos o método next() do fluxo para ler repetidamente as solicitações do cliente em um objeto de solicitação (neste caso, um Point) até que não haja mais mensagens. Se for "None", o fluxo ainda está bom e pode continuar lendo.

RPC de streaming bidirecional

Por fim, vamos analisar nossa RPC de streaming bidirecional RouteChat().

async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        println!("RouteChat");

        let mut notes: HashMap<(i32, i32), Vec<RouteNote>> = HashMap::new();
        let mut stream = request.into_inner();

        let output = async_stream::try_stream! {
            while let Some(note) = stream.next().await {
                let note = note?;
                let location = note.location();
                let key = (location.latitude(), location.longitude());
                let location_notes = notes.entry(key).or_insert(vec![]);
                location_notes.push(note);
                for note in location_notes {
                    yield note.clone();
                }
            }
        };
        Ok(Response::new(Box::pin(output)))
    }

Desta vez, recebemos um fluxo que, como no nosso exemplo de streaming do lado do cliente, pode ser usado para ler e gravar mensagens. No entanto, desta vez, retornamos valores pelo stream do nosso método enquanto o cliente ainda está gravando mensagens no stream dele. A sintaxe para leitura e gravação aqui é muito semelhante ao nosso método de streaming do cliente, exceto que o servidor retorna um RouteChatStream. Embora cada lado sempre receba as mensagens do outro na ordem em que foram escritas, o cliente e o servidor podem ler e gravar em qualquer ordem. Os streams operam de forma completamente independente.

Criamos o stream output usando try_stream!, o que indica que o stream pode retornar erros.

Iniciar o servidor

Depois de implementar esse método, também precisamos iniciar um servidor gRPC para que os clientes possam usar nosso serviço. Preencha main().

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:10000".parse().unwrap();
    println!("RouteGuideServer listening on: {addr}");
    let route_guide = RouteGuideService {
        features: load(),
    };
    let svc = RouteGuideServer::new(route_guide);
    Server::builder().add_service(svc).serve(addr).await?;
    Ok(())
}

Veja o que está acontecendo em main(), etapa por etapa:

  1. Especifique a porta que queremos usar para detectar solicitações do cliente.
  2. Crie um RouteGuideService com recursos carregados em
  3. Crie uma instância do servidor gRPC usando RouteGuideServer::new() com o serviço que criamos.
  4. Registre nossa implementação de serviço com o servidor gRPC.
  5. Chame serve() no servidor com os detalhes da porta para fazer uma espera de bloqueio até que o processo seja encerrado.

6. Criar o cliente

Nesta seção, vamos criar um cliente Rust para nosso serviço RouteGuide em src/client/client.rs.

Primeiro, coloque o código gerado no escopo.

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

use grpc_pb::route_guide_client::RouteGuideClient;
use grpc_pb::{Point, Rectangle, RouteNote};

Chamar métodos de serviço

Agora vamos ver como chamamos os métodos do serviço. No gRPC-Rust, os RPCs operam em um modo de bloqueio/síncrono, o que significa que a chamada de RPC aguarda a resposta do servidor e retorna uma resposta ou um erro.

RPC de streaming do lado do servidor

Aqui, chamamos o método de streaming do lado do servidor ListFeatures, que retorna um stream de objetos geográficos Feature.

async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let rectangle = proto!(Rectangle {
        lo: proto!(Point {
            latitude: 400_000_000,
            longitude: -750_000_000,
        }),
        hi: proto!(Point {
            latitude: 420_000_000,
            longitude: -730_000_000,
        }),
    });

    let mut stream = client
        .list_features(Request::new(rectangle))
        .await?
        .into_inner();

    while let Some(feature) = stream.message().await? {
        println!("FEATURE: Name = \"{}\", Lat = {}, Lon = {}",
            feature.name(),
            feature.location().latitude(),
            feature.location().longitude());
        }
    Ok(())
}

Transmitimos uma solicitação ao método e recebemos uma instância de ListFeaturesStream. O cliente pode usar o fluxo ListFeaturesStream para ler as respostas do servidor. Usamos o método message() do ListFeaturesStream para ler repetidamente as respostas do servidor em um objeto de buffer de protocolo de resposta (neste caso, um Feature) até que não haja mais mensagens.

RPC de streaming do lado do cliente

Aqui, para record_route, transformamos um vetor de pontos em um stream. Em seguida, transmitimos esse fluxo para record_route() como uma solicitação e recebemos uma única resposta RouteSummary depois que o fluxo é totalmente processado pelo servidor.

async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let mut rng = rand::rng();
    let point_count: i32 = rng.random_range(2..100);

    let mut points = vec![];
    for _ in 0..=point_count {
        points.push(random_point(&mut rng))
    }

    println!("Traversing {} points", points.len());
    let request = Request::new(tokio_stream::iter(points));

    match client.record_route(request).await {
        Ok(response) => {
            let response = response.into_inner();
            println!("SUMMARY: Feature Count = {}, Distance = {}", response.feature_count(), response.distance())},
        Err(e) => println!("something went wrong: {e:?}"),
    }

    Ok(())
}

RPC de streaming bidirecional

Por fim, vamos analisar nossa RPC de streaming bidirecional RouteChat(). Passamos para o método uma solicitação de stream em que gravamos e recebemos um stream de volta para ler mensagens. Desta vez, retornamos valores pelo stream do nosso método enquanto o servidor ainda está gravando mensagens no stream de mensagens.

async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let start = time::Instant::now();
    let outbound = async_stream::stream! {
        let mut interval = time::interval(Duration::from_secs(1));
        for _ in 0..10 {
            let time = interval.tick().await;
            let elapsed = time.duration_since(start);
            let note = proto!(RouteNote {
                location: proto!(Point {
                    latitude: 409146138 + elapsed.as_secs() as i32,
                    longitude: -746188906,
                }),
                message: format!("at {elapsed:?}"),
            });
            yield note;
        }
    };
    let response = client.route_chat(Request::new(outbound)).await?;
    let mut inbound = response.into_inner();
    while let Some(note) = inbound.message().await? {
        println!("Note: Latitude = {}, Longitude = {}, Message = \"{}\"",
            note.location().latitude(),
            note.location().longitude(),
            note.message());
        }
    Ok(())
}

Embora cada lado sempre receba as mensagens do outro na ordem em que foram escritas, o cliente e o servidor podem ler e gravar em qualquer ordem. Os streams operam de forma completamente independente.

Chamar métodos auxiliares

Para chamar métodos de serviço, primeiro precisamos criar um canal para se comunicar com o servidor. Para isso, primeiro criamos um endpoint, nos conectamos a ele e transmitimos o canal criado ao se conectar a RouteGuideClient::new() da seguinte maneira:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel); 
    Ok(())
}

Em main(), execute os métodos que acabamos de criar.

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel);

    println!("\n*** SERVER STREAMING ***");
    print_features(&mut client).await?;

    println!("\n*** CLIENT STREAMING ***");
    run_record_route(&mut client).await?;

    println!("\n*** BIDIRECTIONAL STREAMING ***");
    run_route_chat(&mut client).await?;

    Ok(())
}

7. Faça um teste

Primeiro, para executar o cliente e o servidor, vamos adicioná-los como destinos binários ao nosso crate. Precisamos editar nosso Cargo.toml de acordo:

[[bin]]
name = "routeguide-server"
path = "src/server/server.rs"

[[bin]]
name = "routeguide-client"
path = "src/client/client.rs"

Como em qualquer projeto, também precisamos pensar nas dependências necessárias para que nosso código funcione. Para projetos Rust, as dependências estarão em Cargo.toml. Já listamos as dependências necessárias no arquivo Cargo.toml.

Em seguida, execute os seguintes comandos nos diretórios de trabalho:

  1. Execute o servidor em um terminal:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-server 
  1. Execute o cliente em outro terminal:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-client

A saída será semelhante a esta:

*** SERVER STREAMING ***
FEATURE: Name = "Patriots Path, Mendham, NJ 07945, USA", Lat = 407838351, Lon = -746143763
FEATURE: Name = "101 New Jersey 10, Whippany, NJ 07981, USA", Lat = 408122808, Lon = -743999179
FEATURE: Name = "U.S. 6, Shohola, PA 18458, USA", Lat = 413628156, Lon = -749015468
...
*** CLIENT STREAMING ***
Traversing 86 points
SUMMARY: Feature Count = 0, Distance = 803709356

*** BIDIRECTIONAL STREAMING ***
Note: Latitude = 409146138, Longitude = -746188906, Message = "at 112.45µs"
Note: Latitude = 409146139, Longitude = -746188906, Message = "at 1.00011245s"
Note: Latitude = 409146140, Longitude = -746188906, Message = "at 2.00011245s"

8. A seguir