Comienza a usar gRPC-Rust: transmisión

1. Introducción

En este codelab, usarás gRPC-Rust para crear un cliente y un servidor que formen la base de una aplicación de asignación de rutas escrita en Rust.

Al final del instructivo, tendrás un cliente que se conecta a un servidor remoto con gRPC para obtener información sobre las funciones en la ruta de un cliente, crear un resumen de la ruta de un cliente y compartir información de la ruta, como actualizaciones de tráfico, con el servidor y otros clientes.

El servicio se define en un archivo de Protocol Buffers, que se usará para generar código estándar para el cliente y el servidor, de modo que puedan comunicarse entre sí, lo que te ahorrará tiempo y esfuerzo en la implementación de esa funcionalidad.

Este código generado se encarga no solo de las complejidades de la comunicación entre el servidor y el cliente, sino también de la serialización y deserialización de datos.

Qué aprenderás

  • Cómo usar los búferes de protocolo para definir una API de servicio
  • Cómo compilar un cliente y un servidor basados en gRPC a partir de una definición de Protocol Buffers con la generación de código automatizada
  • Conocimiento de la comunicación de transmisión cliente-servidor con gRPC

Este codelab está dirigido a desarrolladores de Rust que no conocen gRPC o que desean repasar sus conceptos, o a cualquier otra persona interesada en crear sistemas distribuidos. No se requiere experiencia previa con gRPC.

2. Antes de comenzar

Requisitos previos

Asegúrate de haber instalado lo siguiente:

  • GCC. Sigue las instrucciones aquí.
  • Rust, versión más reciente Sigue las instrucciones de instalación aquí.

Obtén el código

Para que no tengas que empezar desde cero, este codelab proporciona un andamio del código fuente de la aplicación para que lo completes. En los siguientes pasos, se muestra cómo finalizar la aplicación, incluido el uso de los complementos del compilador de búfer de protocolo para generar el código gRPC estándar.

Primero, crea el directorio de trabajo del codelab y cd en él:

mkdir streaming-grpc-rust-getting-started && cd streaming-grpc-rust-getting-started

Descarga y extrae el codelab:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-rust-streaming/start_here

También puedes descargar el archivo .zip que contiene solo el directorio del codelab y descomprimirlo de forma manual.

El código fuente completo está disponible en GitHub si no quieres escribir una implementación.

3. Define mensajes y servicios

El primer paso es definir el servicio de gRPC de la aplicación, sus métodos de RPC y sus tipos de mensajes de solicitud y respuesta con búferes de protocolo. Tu servicio proporcionará lo siguiente:

  • Métodos de RPC llamados ListFeatures, RecordRoute y RouteChat que el servidor implementa y el cliente llama.
  • Los tipos de mensajes Point, Feature, Rectangle, RouteNote y RouteSummary, que son estructuras de datos que se intercambian entre el cliente y el servidor cuando se llaman a los métodos anteriores.

Estos métodos de RPC y sus tipos de mensajes se definirán en el archivo proto/routeguide.proto del código fuente proporcionado.

Los búferes de protocolo se conocen comúnmente como protobufs. Para obtener más información sobre la terminología de gRPC, consulta los conceptos básicos, la arquitectura y el ciclo de vida de gRPC.

Define los tipos de mensajes

Primero, definamos los mensajes que usarán nuestras RPC. En el archivo routeguide/route_guide.proto del código fuente, primero define el tipo de mensaje Point. Un objeto Point representa un par de coordenadas de latitud y longitud en un mapa. En este codelab, usa números enteros para las coordenadas:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

Los números 1 y 2 son números de ID únicos para cada uno de los campos de la estructura message.

A continuación, define el tipo de mensaje Feature. Un Feature usa un campo string para el nombre o la dirección postal de algo en una ubicación especificada por un Point:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

A continuación, un mensaje Rectangle que representa un rectángulo de latitud y longitud, representado como dos puntos opuestos diagonalmente "lo" y "hi".

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

También es un mensaje de RouteNote que representa un mensaje enviado en un punto determinado.

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

También requerimos un mensaje de RouteSummary. Este mensaje se recibe en respuesta a una RPC de RecordRoute, que se explica en la siguiente sección. Contiene la cantidad de puntos individuales recibidos, la cantidad de atributos detectados y la distancia total recorrida como la suma acumulativa de la distancia entre cada punto.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

Cómo definir métodos de servicio

Primero, definamos nuestro servicio y, luego, definiremos nuestros mensajes. Para definir un servicio, especifica un servicio con nombre en tu archivo .proto. El archivo proto/routeguide.proto tiene una estructura service llamada RouteGuide que define uno o más métodos proporcionados por el servicio de la aplicación.

Define métodos de RPC dentro de la definición de tu servicio y especifica sus tipos de solicitud y respuesta. En esta sección del codelab, definiremos lo siguiente:

ListFeatures

Obtiene los Feature disponibles en el Rectangle determinado. Los resultados se transmiten en lugar de devolverse de una vez (p.ej., en un mensaje de respuesta con un campo repetido), ya que el rectángulo puede abarcar un área grande y contener una gran cantidad de entidades.

Un tipo adecuado para esta RPC es una RPC de transmisión del servidor: el cliente envía una solicitud al servidor y obtiene una transmisión para leer una secuencia de mensajes. El cliente lee la transmisión que se muestra hasta que no haya más mensajes. Como puedes ver en nuestro ejemplo, debes especificar un método de transmisión del servidor colocando la palabra clave stream antes del tipo de respuesta.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

Acepta un flujo de Points en una ruta que se recorre y devuelve un RouteSummary cuando se completa el recorrido.

En este caso, parece apropiada una RPC de transmisión del cliente: el cliente escribe una secuencia de mensajes y los envía al servidor, nuevamente a través de una transmisión proporcionada. Una vez que el cliente termina de escribir los mensajes, espera a que el servidor los lea todos y muestre la respuesta. Para especificar un método de transmisión por Internet del cliente, coloca la palabra clave stream antes del tipo de solicitud.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

Acepta un flujo de objetos RouteNote enviados mientras se recorre una ruta y recibe otros objetos RouteNote (p.ej., de otros usuarios).

Este es exactamente el tipo de caso de uso para la transmisión bidireccional. En una RPC de transmisión bidireccional, ambos extremos envían una secuencia de mensajes a través de una transmisión de lectura y escritura. Las dos transmisiones operan de forma independiente, por lo que los clientes y los servidores pueden leer y escribir en el orden que deseen.

Por ejemplo, el servidor podría esperar a recibir todos los mensajes del cliente antes de escribir sus respuestas, o bien podría leer un mensaje y, luego, escribir uno, o alguna otra combinación de lecturas y escrituras.

Se conserva el orden de los mensajes en cada transmisión. Para especificar este tipo de método, coloca la palabra clave stream antes de la solicitud y la respuesta.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. Genera el código del cliente y del servidor

Ya te proporcionamos el código generado del archivo .proto en el directorio generado.

Si quieres aprender a generar código desde el archivo .proto por tu cuenta o realizar cambios en el archivo .proto y probarlos, consulta estas instrucciones.

El código generado contiene lo siguiente:

  • Definiciones de struct para los tipos de mensajes Point, Feature, Rectangle, RouteNote y RouteSummary.
  • Un rasgo de servicio que deberemos implementar: route_guide_server::RouteGuide.
  • Un tipo de cliente que usaremos para llamar al servidor: route_guide_client::RouteGuideClient<T>.

A continuación, implementaremos los métodos del servidor para que, cuando el cliente envíe una solicitud, el servidor pueda responder con una respuesta.

5. Implementa el servicio

Primero, veamos cómo crear un servidor RouteGuide. Para que nuestro servicio de RouteGuide funcione correctamente, se deben realizar dos pasos:

  • Implementar la interfaz de servicio generada a partir de nuestra definición de servicio: realizar el "trabajo" real de nuestro servicio
  • Ejecutar un servidor de gRPC para escuchar las solicitudes de los clientes y enviarlas a la implementación del método correcto

En src/server/server.rs, podemos incluir el código generado en el alcance a través de la macro include_generated_proto! de gRPC y, luego, importar el rasgo RouteGuide y Point.

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

pub use grpc_pb::{
    route_guide_server::{RouteGuideServer, RouteGuide},
    Point, Feature, Rectangle, RouteNote, RouteSummary
};

Podemos comenzar definiendo una estructura para representar nuestro servicio. Por ahora, podemos hacer esto en src/server/server.rs:

#[derive(Debug)]
pub struct RouteGuideService {
    features: Vec<Feature>,
}

Ahora, debemos implementar el rasgo route_guide_server::RouteGuide desde nuestro código generado.

Implementa RouteGuide

Debemos implementar la interfaz RouteGuide generada. Así se vería la implementación. Esto ya está en la plantilla.

#[tonic::async_trait]
impl RouteGuide for RouteGuideService {
    async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        ...
    }

    async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        ...
    }

    async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        ...
    }
}

Analicemos en detalle cada implementación de RPC.

RPC de transmisión del servidor

Comencemos con ListFeatures. Esta es una RPC de transmisión del servidor, por lo que debemos enviar varios Features a nuestro cliente.

async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        println!("ListFeatures = {:?}", request);

        let (tx, rx) = mpsc::channel(4);
        let features = self.features.clone();

        tokio::spawn(async move {
            for feature in &features[..] {
                if in_range(&feature.location().to_owned(), request.get_ref()) {
                    println!("  => send {feature:?}");
                    tx.send(Ok(feature.clone())).await.unwrap();
                }
            }
            println!(" /// done sending");
        });

        let output_stream = ReceiverStream::new(rx);
        Ok(Response::new(Box::pin(output_stream)))
    }

Como puedes ver, obtenemos un objeto de solicitud (el Rectangle en el que nuestro cliente quiere encontrar Features). Esta vez, debemos devolver un flujo de valores. Creamos un canal y generamos una nueva tarea asíncrona en la que realizamos una búsqueda y enviamos al canal los atributos que satisfacen nuestras restricciones. La mitad de transmisión del canal se devuelve a la persona que llama, envuelta en un tonic::Response.

RPC de transmisión del cliente

Ahora veamos algo un poco más complicado: el método de transmisión del cliente RecordRoute, en el que obtenemos un flujo de Points del cliente y devolvemos un solo RouteSummary con información sobre su viaje. Recibe una transmisión como entrada, que el servidor puede usar para leer y escribir mensajes. Puede iterar a través de los mensajes del cliente con su método next() y devolver su única respuesta.

async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        println!("RecordRoute");
        let mut stream = request.into_inner();
        let mut summary = RouteSummary::default();
        let mut last_point = None;
        let now = Instant::now();

        while let Some(point) = stream.next().await {
            let point = point?;
            println!("  ==> Point = {point:?}");

            // Increment the point count
            summary.set_point_count(summary.point_count() + 1);

            // Find features
            for feature in &self.features[..] {
                if feature.location().latitude() == point.latitude() {
                    if feature.location().longitude() == point.longitude(){
                        summary.set_feature_count(summary.feature_count() + 1);
                    }
                }
            }

            // Calculate the distance
            if let Some(ref last_point) = last_point {
                let new_dist = summary.distance() + calc_distance(last_point, &point);
                summary.set_distance(new_dist);
            }
            last_point = Some(point);
        }
        summary.set_elapsed_time(now.elapsed().as_secs() as i32);
        Ok(Response::new(summary))
    }

En el cuerpo del método, usamos el método next() de la transmisión para leer repetidamente las solicitudes de nuestro cliente en un objeto de solicitud (en este caso, un Point) hasta que no haya más mensajes. Si es None, el flujo sigue siendo válido y se puede seguir leyendo.

RPC de transmisión bidireccional

Por último, veamos nuestro RPC de transmisión bidireccional RouteChat().

async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        println!("RouteChat");

        let mut notes: HashMap<(i32, i32), Vec<RouteNote>> = HashMap::new();
        let mut stream = request.into_inner();

        let output = async_stream::try_stream! {
            while let Some(note) = stream.next().await {
                let note = note?;
                let location = note.location();
                let key = (location.latitude(), location.longitude());
                let location_notes = notes.entry(key).or_insert(vec![]);
                location_notes.push(note);
                for note in location_notes {
                    yield note.clone();
                }
            }
        };
        Ok(Response::new(Box::pin(output)))
    }

Esta vez, obtenemos una transmisión que, como en nuestro ejemplo de transmisión del cliente, se puede usar para leer y escribir mensajes. Sin embargo, esta vez devolvemos valores a través de la transmisión de nuestro método mientras el cliente sigue escribiendo mensajes en su transmisión de mensajes. La sintaxis para leer y escribir aquí es muy similar a nuestro método de transmisión por secuencias del cliente, excepto que el servidor devuelve un RouteChatStream. Aunque cada lado siempre recibirá los mensajes del otro en el orden en que se escribieron, tanto el cliente como el servidor pueden leer y escribir en cualquier orden. Las transmisiones operan de forma completamente independiente.

Creamos la transmisión output con try_stream!, lo que indica que la transmisión podría devolver errores.

Inicia el servidor

Una vez que implementamos este método, también debemos iniciar un servidor de gRPC para que los clientes puedan usar nuestro servicio. Completa main().

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:10000".parse().unwrap();
    println!("RouteGuideServer listening on: {addr}");
    let route_guide = RouteGuideService {
        features: load(),
    };
    let svc = RouteGuideServer::new(route_guide);
    Server::builder().add_service(svc).serve(addr).await?;
    Ok(())
}

Esto es lo que sucede en main(), paso a paso:

  1. Especifica el puerto que queremos usar para escuchar las solicitudes del cliente
  2. Crea un RouteGuideService con funciones cargadas
  3. Crea una instancia del servidor de gRPC con RouteGuideServer::new() usando el servicio que creamos.
  4. Registra nuestra implementación del servicio con el servidor de gRPC.
  5. Llama a serve() en el servidor con los detalles de nuestro puerto para realizar una espera de bloqueo hasta que se detenga el proceso.

6. Crea el cliente

En esta sección, veremos cómo crear un cliente de Rust para nuestro servicio RouteGuide en src/client/client.rs.

Primero, incluye el código generado en el alcance.

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

use grpc_pb::route_guide_client::RouteGuideClient;
use grpc_pb::{Point, Rectangle, RouteNote};

Llama a los métodos de servicio

Ahora veamos cómo llamamos a nuestros métodos de servicio. En gRPC-Rust, las RPC operan en un modo de bloqueo o síncrono, lo que significa que la llamada a la RPC espera a que responda el servidor y mostrará una respuesta o un error.

RPC de transmisión del servidor

Aquí es donde llamamos al método de transmisión del servidor ListFeatures, que devuelve un flujo de objetos geográficos Feature.

async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let rectangle = proto!(Rectangle {
        lo: proto!(Point {
            latitude: 400_000_000,
            longitude: -750_000_000,
        }),
        hi: proto!(Point {
            latitude: 420_000_000,
            longitude: -730_000_000,
        }),
    });

    let mut stream = client
        .list_features(Request::new(rectangle))
        .await?
        .into_inner();

    while let Some(feature) = stream.message().await? {
        println!("FEATURE: Name = \"{}\", Lat = {}, Lon = {}",
            feature.name(),
            feature.location().latitude(),
            feature.location().longitude());
        }
    Ok(())
}

Le pasamos al método una solicitud y obtenemos una instancia de ListFeaturesStream. El cliente puede usar la transmisión ListFeaturesStream para leer las respuestas del servidor. Usamos el método message() de ListFeaturesStream para leer repetidamente las respuestas del servidor en un objeto de búfer de protocolo de respuesta (en este caso, un Feature) hasta que no haya más mensajes.

RPC de transmisión del cliente

Aquí, para record_route, convertimos un vector de puntos en una transmisión. Luego, pasamos esta transmisión a record_route() como una solicitud y obtenemos una sola respuesta RouteSummary después de que el servidor procesa por completo la transmisión.

async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let mut rng = rand::rng();
    let point_count: i32 = rng.random_range(2..100);

    let mut points = vec![];
    for _ in 0..=point_count {
        points.push(random_point(&mut rng))
    }

    println!("Traversing {} points", points.len());
    let request = Request::new(tokio_stream::iter(points));

    match client.record_route(request).await {
        Ok(response) => {
            let response = response.into_inner();
            println!("SUMMARY: Feature Count = {}, Distance = {}", response.feature_count(), response.distance())},
        Err(e) => println!("something went wrong: {e:?}"),
    }

    Ok(())
}

RPC de transmisión bidireccional

Por último, veamos nuestro RPC de transmisión bidireccional RouteChat(). Pasamos al método una solicitud de transmisión a la que escribimos y obtenemos una transmisión de la que podemos leer mensajes. Esta vez, devolvemos valores a través de la transmisión de nuestro método mientras el servidor sigue escribiendo mensajes en su transmisión de mensajes.

async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let start = time::Instant::now();
    let outbound = async_stream::stream! {
        let mut interval = time::interval(Duration::from_secs(1));
        for _ in 0..10 {
            let time = interval.tick().await;
            let elapsed = time.duration_since(start);
            let note = proto!(RouteNote {
                location: proto!(Point {
                    latitude: 409146138 + elapsed.as_secs() as i32,
                    longitude: -746188906,
                }),
                message: format!("at {elapsed:?}"),
            });
            yield note;
        }
    };
    let response = client.route_chat(Request::new(outbound)).await?;
    let mut inbound = response.into_inner();
    while let Some(note) = inbound.message().await? {
        println!("Note: Latitude = {}, Longitude = {}, Message = \"{}\"",
            note.location().latitude(),
            note.location().longitude(),
            note.message());
        }
    Ok(())
}

Aunque cada lado siempre recibirá los mensajes del otro en el orden en que se escribieron, tanto el cliente como el servidor pueden leer y escribir en cualquier orden. Las transmisiones operan de forma completamente independiente.

Llama a métodos auxiliares

Para llamar a los métodos del servicio, primero debemos crear un canal para comunicarnos con el servidor. Para crear esto, primero creamos un extremo, nos conectamos a él y pasamos el canal creado cuando nos conectamos a RouteGuideClient::new() de la siguiente manera:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel); 
    Ok(())
}

En main(), ejecuta los métodos que acabamos de crear.

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel);

    println!("\n*** SERVER STREAMING ***");
    print_features(&mut client).await?;

    println!("\n*** CLIENT STREAMING ***");
    run_record_route(&mut client).await?;

    println!("\n*** BIDIRECTIONAL STREAMING ***");
    run_route_chat(&mut client).await?;

    Ok(())
}

7. Probar

Primero, para ejecutar nuestro cliente y servidor, agreguémoslos como destinos binarios a nuestro crate. Debemos editar nuestro Cargo.toml según corresponda:

[[bin]]
name = "routeguide-server"
path = "src/server/server.rs"

[[bin]]
name = "routeguide-client"
path = "src/client/client.rs"

Al igual que con cualquier proyecto, también debemos pensar en las dependencias necesarias para que nuestro código funcione. En el caso de los proyectos de Rust, las dependencias estarán en Cargo.toml. Ya enumeramos las dependencias necesarias en el archivo Cargo.toml.

Luego, ejecuta los siguientes comandos desde nuestros directorios de trabajo:

  1. Ejecuta el servidor en una terminal:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-server 
  1. Ejecuta el cliente desde otra terminal:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-client

Verás un resultado como este:

*** SERVER STREAMING ***
FEATURE: Name = "Patriots Path, Mendham, NJ 07945, USA", Lat = 407838351, Lon = -746143763
FEATURE: Name = "101 New Jersey 10, Whippany, NJ 07981, USA", Lat = 408122808, Lon = -743999179
FEATURE: Name = "U.S. 6, Shohola, PA 18458, USA", Lat = 413628156, Lon = -749015468
...
*** CLIENT STREAMING ***
Traversing 86 points
SUMMARY: Feature Count = 0, Distance = 803709356

*** BIDIRECTIONAL STREAMING ***
Note: Latitude = 409146138, Longitude = -746188906, Message = "at 112.45µs"
Note: Latitude = 409146139, Longitude = -746188906, Message = "at 1.00011245s"
Note: Latitude = 409146140, Longitude = -746188906, Message = "at 2.00011245s"

8. ¿Qué sigue?