Getting Started with gRPC-Rust - Streaming

1. Introduzione

In questo codelab utilizzerai gRPC-Rust per creare un client e un server che costituiscono la base di un'applicazione di mappatura di itinerari scritta in Rust.

Al termine del tutorial, avrai un client che si connette a un server remoto utilizzando gRPC per ottenere informazioni sulle funzionalità di un percorso del client, creare un riepilogo di un percorso del client e scambiare informazioni sul percorso, come gli aggiornamenti sul traffico, con il server e altri client.

Il servizio è definito in un file Protocol Buffers, che verrà utilizzato per generare codice boilerplate per il client e il server in modo che possano comunicare tra loro, risparmiando tempo e fatica nell'implementazione di questa funzionalità.

Questo codice generato si occupa non solo delle complessità della comunicazione tra il server e il client, ma anche della serializzazione e della deserializzazione dei dati.

Obiettivi didattici

  • Come utilizzare i buffer di protocollo per definire un'API di servizio.
  • Come creare un client e un server basati su gRPC da una definizione di Protocol Buffers utilizzando la generazione automatica del codice.
  • Comprensione della comunicazione di streaming client-server con gRPC.

Questo codelab è rivolto agli sviluppatori Rust che non hanno familiarità con gRPC o che vogliono ripassare gRPC, o a chiunque sia interessato a creare sistemi distribuiti. Non è richiesta alcuna esperienza precedente con gRPC.

2. Prima di iniziare

Prerequisiti

Assicurati di aver installato quanto segue:

  • GCC. Segui le istruzioni qui.
  • Rust, ultima versione. Segui le istruzioni di installazione qui.

Ottieni il codice

Per non dover iniziare da zero, questo codelab fornisce una struttura del codice sorgente dell'applicazione da completare. I passaggi seguenti mostrano come completare l'applicazione, incluso l'utilizzo dei plug-in del compilatore di protocol buffer per generare il codice gRPC boilerplate.

Innanzitutto, crea la directory di lavoro del codelab e cd al suo interno:

mkdir streaming-grpc-rust-getting-started && cd streaming-grpc-rust-getting-started

Scarica ed estrai il codelab:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-rust-streaming/start_here

In alternativa, puoi scaricare il file .zip contenente solo la directory del codelab e decomprimerlo manualmente.

Il codice sorgente completato è disponibile su GitHub se vuoi evitare di digitare un'implementazione.

3. Definisci messaggi e servizi

Il primo passaggio consiste nel definire il servizio gRPC dell'applicazione, i relativi metodi RPC e i tipi di messaggi di richiesta e risposta utilizzando Protocol Buffers. Il tuo servizio fornirà:

  • Metodi RPC chiamati ListFeatures, RecordRoute e RouteChat che il server implementa e il client chiama.
  • I tipi di messaggi Point, Feature, Rectangle, RouteNote e RouteSummary, che sono strutture di dati scambiate tra il client e il server quando vengono chiamati i metodi precedenti.

Questi metodi RPC e i relativi tipi di messaggi verranno tutti definiti nel file proto/routeguide.proto del codice sorgente fornito.

Protocol Buffers sono comunemente noti come protobuf. Per ulteriori informazioni sulla terminologia gRPC, consulta Concetti fondamentali, architettura e ciclo di vita di gRPC.

Definisci i tipi di messaggi

Definiamo innanzitutto i messaggi che verranno utilizzati dalle nostre RPC. Nel file routeguide/route_guide.proto del codice sorgente, definisci innanzitutto il tipo di messaggio Point. Un Point rappresenta una coppia di coordinate di latitudine e longitudine su una mappa. Per questo codelab, utilizza numeri interi per le coordinate:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

I numeri 1 e 2 sono numeri ID univoci per ciascuno dei campi nella struttura message.

Successivamente, definisci il tipo di messaggio Feature. Un Feature utilizza un campo string per il nome o l'indirizzo postale di un elemento in una località specificata da un Point:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

Poi un messaggio Rectangle che rappresenta un rettangolo di latitudine e longitudine, rappresentato da due punti diagonalmente opposti "lo" e "hi".

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

Inoltre, un messaggio RouteNote che rappresenta un messaggio inviato in un determinato momento.

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

Richiederemo anche un messaggio su RouteSummary. Questo messaggio viene ricevuto in risposta a una RPC RecordRoute, che viene spiegata nella sezione successiva. Contiene il numero di punti individuali ricevuti, il numero di caratteristiche rilevate e la distanza totale percorsa come somma cumulativa della distanza tra ogni punto.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

Definisci i metodi di servizio

Definiamo prima il servizio e poi i messaggi. Per definire un servizio, specifica un servizio denominato nel file .proto. Il file proto/routeguide.proto ha una struttura service denominata RouteGuide che definisce uno o più metodi forniti dal servizio dell'applicazione.

Definisci i metodi RPC all'interno della definizione del servizio, specificando i tipi di richiesta e risposta. In questa sezione del codelab, definiamo:

ListFeatures

Recupera gli Feature disponibili all'interno del Rectangle specificato. I risultati vengono trasmessi in streaming anziché restituiti contemporaneamente (ad es. in un messaggio di risposta con un campo ripetuto), poiché il rettangolo potrebbe coprire un'area di grandi dimensioni e contenere un numero elevato di funzionalità.

Un tipo appropriato per questa RPC è una RPC di streaming lato server: il client invia una richiesta al server e riceve un flusso per leggere una sequenza di messaggi. Il client legge dallo stream restituito finché non ci sono più messaggi. Come puoi vedere nel nostro esempio, devi specificare un metodo di streaming lato server inserendo la parola chiave stream prima del tipo di risposta.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

Accetta un flusso di Point su un percorso attraversato, restituendo un RouteSummary al termine dell'attraversamento.

In questo caso, una RPC di streaming lato client sembra appropriata: il client scrive una sequenza di messaggi e li invia al server, sempre utilizzando un flusso fornito. Una volta che il client ha finito di scrivere i messaggi, attende che il server li legga tutti e restituisca la risposta. Specifichi un metodo di streaming lato client inserendo la parola chiave stream prima del tipo di richiesta.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

Accetta un flusso di RouteNote inviati durante il percorso di un itinerario, mentre riceve altri RouteNote (ad esempio da altri utenti).

Questo è esattamente il tipo di caso d'uso per lo streaming bidirezionale. Una RPC di streaming bidirezionale fa sì che entrambe le parti inviino una sequenza di messaggi utilizzando uno stream di lettura/scrittura. I due flussi operano in modo indipendente, quindi client e server possono leggere e scrivere nell'ordine che preferiscono.

Ad esempio, il server potrebbe attendere di ricevere tutti i messaggi del client prima di scrivere le risposte oppure potrebbe leggere un messaggio e poi scriverne un altro o una combinazione di letture e scritture.

L'ordine dei messaggi in ogni stream viene mantenuto. Specifichi questo tipo di metodo inserendo la parola chiave stream prima della richiesta e della risposta.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. Genera il codice client e server

Ti abbiamo già fornito il codice generato dal file .proto nella directory generata.

Se vuoi scoprire come generare il codice dal file .proto o apportare modifiche al file .proto e testarle, consulta queste istruzioni.

Il codice generato contiene:

  • Definizioni di struct per i tipi di messaggio Point, Feature, Rectangle, RouteNote e RouteSummary.
  • Una caratteristica del servizio che dovremo implementare: route_guide_server::RouteGuide.
  • Un tipo di client che utilizzeremo per chiamare il server: route_guide_client::RouteGuideClient<T>.

Successivamente, implementeremo i metodi lato server, in modo che quando il client invia una richiesta, il server possa rispondere.

5. Implementare il servizio

Per prima cosa, vediamo come creare un server RouteGuide. Il funzionamento del nostro servizio RouteGuide si basa su due elementi:

  • Implementazione dell'interfaccia di servizio generata dalla nostra definizione di servizio: esecuzione del "lavoro" effettivo del nostro servizio.
  • Esecuzione di un server gRPC per ascoltare le richieste dei client e inviarle all'implementazione del metodo corretto.

In src/server/server.rs, possiamo portare il codice generato nell'ambito tramite la macro include_generated_proto! di gRPC e importare il tratto RouteGuide e Point.

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

pub use grpc_pb::{
    route_guide_server::{RouteGuideServer, RouteGuide},
    Point, Feature, Rectangle, RouteNote, RouteSummary
};

Possiamo iniziare definendo una struct per rappresentare il nostro servizio. Per il momento, possiamo farlo su src/server/server.rs:

#[derive(Debug)]
pub struct RouteGuideService {
    features: Vec<Feature>,
}

Ora dobbiamo implementare il tratto route_guide_server::RouteGuide dal codice generato.

Implementa RouteGuide

Dobbiamo implementare l'interfaccia RouteGuide generata. Ecco come apparirebbe l'implementazione. Questo elemento è già presente nel modello.

#[tonic::async_trait]
impl RouteGuide for RouteGuideService {
    async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        ...
    }

    async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        ...
    }

    async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        ...
    }
}

Esaminiamo in dettaglio ogni implementazione RPC.

RPC di streaming lato server

Iniziamo con ListFeatures. Si tratta di una RPC di streaming lato server, quindi dobbiamo inviare più Feature al nostro client.

async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        println!("ListFeatures = {:?}", request);

        let (tx, rx) = mpsc::channel(4);
        let features = self.features.clone();

        tokio::spawn(async move {
            for feature in &features[..] {
                if in_range(&feature.location().to_owned(), request.get_ref()) {
                    println!("  => send {feature:?}");
                    tx.send(Ok(feature.clone())).await.unwrap();
                }
            }
            println!(" /// done sending");
        });

        let output_stream = ReceiverStream::new(rx);
        Ok(Response::new(Box::pin(output_stream)))
    }

Come puoi vedere, otteniamo un oggetto richiesta (il Rectangle in cui il nostro cliente vuole trovare Features). Questa volta, dobbiamo restituire un flusso di valori. Creiamo un canale e generiamo una nuova attività asincrona in cui eseguiamo una ricerca, inviando al canale le funzionalità che soddisfano i nostri vincoli. La parte Stream del canale viene restituita al chiamante, racchiusa in un tonic::Response.

RPC di streaming lato client

Ora esaminiamo qualcosa di un po' più complicato: il metodo di streaming lato client RecordRoute, in cui riceviamo un flusso di Points dal client e restituiamo un singolo RouteSummary con informazioni sul viaggio. Riceve un flusso come input, che il server può utilizzare per leggere e scrivere messaggi. Può scorrere i messaggi del client utilizzando il metodo next() e restituire la sua singola risposta.

async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        println!("RecordRoute");
        let mut stream = request.into_inner();
        let mut summary = RouteSummary::default();
        let mut last_point = None;
        let now = Instant::now();

        while let Some(point) = stream.next().await {
            let point = point?;
            println!("  ==> Point = {point:?}");

            // Increment the point count
            summary.set_point_count(summary.point_count() + 1);

            // Find features
            for feature in &self.features[..] {
                if feature.location().latitude() == point.latitude() {
                    if feature.location().longitude() == point.longitude(){
                        summary.set_feature_count(summary.feature_count() + 1);
                    }
                }
            }

            // Calculate the distance
            if let Some(ref last_point) = last_point {
                let new_dist = summary.distance() + calc_distance(last_point, &point);
                summary.set_distance(new_dist);
            }
            last_point = Some(point);
        }
        summary.set_elapsed_time(now.elapsed().as_secs() as i32);
        Ok(Response::new(summary))
    }

Nel corpo del metodo, utilizziamo il metodo next() dello stream per leggere ripetutamente le richieste del client in un oggetto richiesta (in questo caso un Point) finché non ci sono più messaggi. Se il valore è Nessuno, lo stream è ancora valido e può continuare a essere letto.

RPC di streaming bidirezionale

Infine, diamo un'occhiata alla nostra RPC di streaming bidirezionale RouteChat().

async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        println!("RouteChat");

        let mut notes: HashMap<(i32, i32), Vec<RouteNote>> = HashMap::new();
        let mut stream = request.into_inner();

        let output = async_stream::try_stream! {
            while let Some(note) = stream.next().await {
                let note = note?;
                let location = note.location();
                let key = (location.latitude(), location.longitude());
                let location_notes = notes.entry(key).or_insert(vec![]);
                location_notes.push(note);
                for note in location_notes {
                    yield note.clone();
                }
            }
        };
        Ok(Response::new(Box::pin(output)))
    }

Questa volta otteniamo uno stream che, come nell'esempio di streaming lato client, può essere utilizzato per leggere e scrivere messaggi. Tuttavia, questa volta restituiamo i valori tramite lo stream del nostro metodo mentre il client sta ancora scrivendo messaggi nel suo stream di messaggi. La sintassi per la lettura e la scrittura qui è molto simile al nostro metodo di streaming lato client, tranne per il fatto che il server restituisce un RouteChatStream. Sebbene ogni parte riceva sempre i messaggi dell'altra nell'ordine in cui sono stati scritti, sia il client che il server possono leggere e scrivere in qualsiasi ordine. I flussi operano in modo completamente indipendente.

Creiamo lo stream output utilizzando try_stream!, il che indica che lo stream potrebbe restituire errori.

Avvia il server

Una volta implementato questo metodo, dobbiamo anche avviare un server gRPC in modo che i client possano effettivamente utilizzare il nostro servizio. Compila main().

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:10000".parse().unwrap();
    println!("RouteGuideServer listening on: {addr}");
    let route_guide = RouteGuideService {
        features: load(),
    };
    let svc = RouteGuideServer::new(route_guide);
    Server::builder().add_service(svc).serve(addr).await?;
    Ok(())
}

Ecco cosa succede in main(), passo dopo passo:

  1. Specifica la porta che vogliamo utilizzare per ascoltare le richieste dei client
  2. Crea un RouteGuideService con le funzionalità caricate
  3. Crea un'istanza del server gRPC utilizzando RouteGuideServer::new() utilizzando il servizio che abbiamo creato.
  4. Registra l'implementazione del servizio con il server gRPC.
  5. Chiama serve() sul server con i dettagli della porta per eseguire un'attesa di blocco fino all'interruzione della procedura.

6. Crea il client

In questa sezione, vedremo come creare un client Rust per il nostro servizio RouteGuide in src/client/client.rs.

Per prima cosa, porta il codice generato nell'ambito.

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

use grpc_pb::route_guide_client::RouteGuideClient;
use grpc_pb::{Point, Rectangle, RouteNote};

Metodi di servizio di chiamata

Ora vediamo come chiamiamo i metodi del servizio. In gRPC-Rust, gli RPC operano in modalità di blocco/sincrona, il che significa che la chiamata RPC attende la risposta del server e restituisce una risposta o un errore.

RPC di streaming lato server

Qui chiamiamo il metodo di streaming lato server ListFeatures, che restituisce un flusso di oggetti geografici Feature.

async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let rectangle = proto!(Rectangle {
        lo: proto!(Point {
            latitude: 400_000_000,
            longitude: -750_000_000,
        }),
        hi: proto!(Point {
            latitude: 420_000_000,
            longitude: -730_000_000,
        }),
    });

    let mut stream = client
        .list_features(Request::new(rectangle))
        .await?
        .into_inner();

    while let Some(feature) = stream.message().await? {
        println!("FEATURE: Name = \"{}\", Lat = {}, Lon = {}",
            feature.name(),
            feature.location().latitude(),
            feature.location().longitude());
        }
    Ok(())
}

Passiamo una richiesta al metodo e riceviamo un'istanza di ListFeaturesStream. Il client può utilizzare lo stream ListFeaturesStream per leggere le risposte del server. Utilizziamo il metodo message() di ListFeaturesStream per leggere ripetutamente le risposte del server a un oggetto buffer di protocollo di risposta (in questo caso un Feature) finché non ci sono più messaggi.

RPC di streaming lato client

Qui, per record_route, trasformiamo un vettore di punti in uno stream. Successivamente, trasmettiamo questo flusso a record_route() come richiesta e otteniamo una singola risposta RouteSummary dopo che il flusso è stato completamente elaborato dal server.

async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let mut rng = rand::rng();
    let point_count: i32 = rng.random_range(2..100);

    let mut points = vec![];
    for _ in 0..=point_count {
        points.push(random_point(&mut rng))
    }

    println!("Traversing {} points", points.len());
    let request = Request::new(tokio_stream::iter(points));

    match client.record_route(request).await {
        Ok(response) => {
            let response = response.into_inner();
            println!("SUMMARY: Feature Count = {}, Distance = {}", response.feature_count(), response.distance())},
        Err(e) => println!("something went wrong: {e:?}"),
    }

    Ok(())
}

RPC di streaming bidirezionale

Infine, diamo un'occhiata alla nostra RPC di streaming bidirezionale RouteChat(). Passiamo al metodo una richiesta di flusso a cui scriviamo e otteniamo un flusso da cui possiamo leggere i messaggi. Questa volta restituiamo i valori tramite lo stream del nostro metodo mentre il server sta ancora scrivendo messaggi nel suo stream di messaggi.

async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let start = time::Instant::now();
    let outbound = async_stream::stream! {
        let mut interval = time::interval(Duration::from_secs(1));
        for _ in 0..10 {
            let time = interval.tick().await;
            let elapsed = time.duration_since(start);
            let note = proto!(RouteNote {
                location: proto!(Point {
                    latitude: 409146138 + elapsed.as_secs() as i32,
                    longitude: -746188906,
                }),
                message: format!("at {elapsed:?}"),
            });
            yield note;
        }
    };
    let response = client.route_chat(Request::new(outbound)).await?;
    let mut inbound = response.into_inner();
    while let Some(note) = inbound.message().await? {
        println!("Note: Latitude = {}, Longitude = {}, Message = \"{}\"",
            note.location().latitude(),
            note.location().longitude(),
            note.message());
        }
    Ok(())
}

Sebbene ogni parte riceva sempre i messaggi dell'altra nell'ordine in cui sono stati scritti, sia il client che il server possono leggere e scrivere in qualsiasi ordine. I flussi operano in modo completamente indipendente.

Metodi helper di chiamata

Per chiamare i metodi di servizio, dobbiamo prima creare un canale per comunicare con il server. Per farlo, creiamo prima un endpoint, ci connettiamo a questo endpoint e passiamo il canale creato quando ci connettiamo a RouteGuideClient::new() nel seguente modo:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel); 
    Ok(())
}

In main(), esegui i metodi appena creati.

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel);

    println!("\n*** SERVER STREAMING ***");
    print_features(&mut client).await?;

    println!("\n*** CLIENT STREAMING ***");
    run_record_route(&mut client).await?;

    println!("\n*** BIDIRECTIONAL STREAMING ***");
    run_route_chat(&mut client).await?;

    Ok(())
}

7. Prova

Innanzitutto, per eseguire il client e il server, aggiungiamoli come target binari al nostro crate. Dobbiamo modificare il file Cargo.toml di conseguenza:

[[bin]]
name = "routeguide-server"
path = "src/server/server.rs"

[[bin]]
name = "routeguide-client"
path = "src/client/client.rs"

Come per qualsiasi progetto, dobbiamo anche pensare alle dipendenze necessarie per il funzionamento del codice. Per i progetti Rust, le dipendenze si troveranno in Cargo.toml. Abbiamo già elencato le dipendenze necessarie nel file Cargo.toml.

Quindi, esegui i seguenti comandi dalle nostre directory di lavoro:

  1. Esegui il server in un terminale:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-server 
  1. Esegui il client da un altro terminale:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-client

Vedrai un output simile al seguente:

*** SERVER STREAMING ***
FEATURE: Name = "Patriots Path, Mendham, NJ 07945, USA", Lat = 407838351, Lon = -746143763
FEATURE: Name = "101 New Jersey 10, Whippany, NJ 07981, USA", Lat = 408122808, Lon = -743999179
FEATURE: Name = "U.S. 6, Shohola, PA 18458, USA", Lat = 413628156, Lon = -749015468
...
*** CLIENT STREAMING ***
Traversing 86 points
SUMMARY: Feature Count = 0, Distance = 803709356

*** BIDIRECTIONAL STREAMING ***
Note: Latitude = 409146138, Longitude = -746188906, Message = "at 112.45µs"
Note: Latitude = 409146139, Longitude = -746188906, Message = "at 1.00011245s"
Note: Latitude = 409146140, Longitude = -746188906, Message = "at 2.00011245s"

8. Passaggi successivi