1. Introduction
Dans cet atelier de programmation, vous allez utiliser gRPC-Rust pour créer un client et un serveur qui constituent la base d'une application de cartographie d'itinéraire écrite en Rust.
À la fin du tutoriel, vous disposerez d'un client qui se connecte à un serveur distant à l'aide de gRPC pour obtenir des informations sur les caractéristiques d'un itinéraire client, créer un récapitulatif d'un itinéraire client et échanger des informations sur l'itinéraire, telles que les mises à jour du trafic, avec le serveur et d'autres clients.
Le service est défini dans un fichier Protocol Buffers, qui sera utilisé pour générer du code passe-partout pour le client et le serveur afin qu'ils puissent communiquer entre eux. Vous gagnerez ainsi du temps et des efforts pour implémenter cette fonctionnalité.
Ce code généré gère non seulement les complexités de la communication entre le serveur et le client, mais aussi la sérialisation et la désérialisation des données.
Points abordés
- Utiliser Protocol Buffers pour définir une API de service.
- Comment créer un client et un serveur basés sur gRPC à partir d'une définition Protocol Buffers à l'aide de la génération de code automatisée.
- Comprendre la communication de streaming client-serveur avec gRPC.
Cet atelier de programmation s'adresse aux développeurs Rust qui découvrent gRPC ou qui souhaitent se rafraîchir la mémoire sur gRPC, ou à toute personne intéressée par la création de systèmes distribués. Aucune expérience préalable avec gRPC n'est requise.
2. Avant de commencer
Prérequis
Assurez-vous d'avoir installé les éléments suivants :
- GCC. Suivez les instructions ici.
- Rust, dernière version. Suivez les instructions d'installation ici.
Obtenir le code
Pour que vous n'ayez pas à partir de zéro, cet atelier de programmation fournit un échafaudage du code source de l'application que vous devez compléter. Les étapes suivantes vous montreront comment finaliser l'application, y compris en utilisant les plug-ins du compilateur de tampon de protocole pour générer le code gRPC standard.
Commencez par créer le répertoire de travail de l'atelier de programmation et accédez-y :cd
mkdir streaming-grpc-rust-getting-started && cd streaming-grpc-rust-getting-started
Téléchargez et extrayez l'atelier de programmation :
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \ | tar xvz --strip-components=4 \ grpc-codelabs-1/codelabs/grpc-rust-streaming/start_here
Vous pouvez également télécharger le fichier .zip contenant uniquement le répertoire de l'atelier de programmation et le décompresser manuellement.
Le code source complet est disponible sur GitHub si vous ne souhaitez pas saisir d'implémentation.
3. Définir les messages et les services
La première étape consiste à définir le service gRPC de l'application, ses méthodes RPC, ainsi que ses types de messages de requête et de réponse à l'aide de Protocol Buffers. Votre service fournira :
- Méthodes RPC appelées
ListFeatures
,RecordRoute
etRouteChat
que le serveur implémente et que le client appelle. - Les types de messages
Point
,Feature
,Rectangle
,RouteNote
etRouteSummary
, qui sont des structures de données échangées entre le client et le serveur lors de l'appel des méthodes ci-dessus.
Ces méthodes RPC et leurs types de messages seront tous définis dans le fichier proto/routeguide.proto
du code source fourni.
Les tampons de protocole sont communément appelés "protobufs". Pour en savoir plus sur la terminologie gRPC, consultez Concepts fondamentaux, architecture et cycle de vie de gRPC.
Définir les types de messages
Commençons par définir les messages qui seront utilisés par nos RPC. Dans le fichier routeguide/route_guide.proto
du code source, définissez d'abord le type de message Point
. Un Point
représente une paire de coordonnées (latitude et longitude) sur une carte. Pour cet atelier de programmation, utilisez des nombres entiers pour les coordonnées :
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
Les nombres 1
et 2
sont des ID uniques pour chacun des champs de la structure message
.
Ensuite, définissez le type de message Feature
. Un Feature
utilise un champ string
pour le nom ou l'adresse postale d'un élément à un emplacement spécifié par un Point
:
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
Ensuite, un message Rectangle
représentant un rectangle de latitude-longitude, représenté par deux points diagonalement opposés "lo" et "hi".
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
Il s'agit également d'un message RouteNote
qui représente un message envoyé à un moment donné.
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
Nous aurons également besoin d'un message RouteSummary
. Ce message est reçu en réponse à un RPC RecordRoute
, qui est expliqué dans la section suivante. Il contient le nombre de points individuels reçus, le nombre de caractéristiques détectées et la distance totale parcourue, qui correspond à la somme cumulée de la distance entre chaque point.
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
Définir les méthodes de service
Commençons par définir notre service, puis définissons nos messages. Pour définir un service, vous devez spécifier un service nommé dans votre fichier .proto
. Le fichier proto/routeguide.proto
possède une structure service
nommée RouteGuide
qui définit une ou plusieurs méthodes fournies par le service de l'application.
Définissez les méthodes RPC dans la définition de votre service, en spécifiant leurs types de requête et de réponse. Dans cette section de l'atelier de programmation, définissons les éléments suivants :
ListFeatures
Obtient les Feature
disponibles dans le Rectangle
spécifié. Les résultats sont diffusés en flux continu plutôt que renvoyés en une seule fois (par exemple, dans un message de réponse avec un champ répété), car le rectangle peut couvrir une grande surface et contenir un grand nombre de caractéristiques.
Un type approprié pour ce RPC est un RPC de streaming côté serveur : le client envoie une requête au serveur et obtient un flux pour lire une séquence de messages en retour. Le client lit le flux renvoyé jusqu'à ce qu'il n'y ait plus de messages. Comme vous pouvez le voir dans notre exemple, vous spécifiez une méthode de streaming côté serveur en plaçant le mot clé stream
avant le type de réponse.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
Accepte un flux de Point
sur un itinéraire parcouru et renvoie un RouteSummary
lorsque le parcours est terminé.
Un RPC de streaming côté client semble approprié dans ce cas : le client écrit une séquence de messages et les envoie au serveur, à nouveau à l'aide d'un flux fourni. Une fois que le client a terminé d'écrire les messages, il attend que le serveur les lise tous et renvoie sa réponse. Pour spécifier une méthode de streaming côté client, placez le mot clé stream
avant le type de requête.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
Accepte un flux de RouteNote
envoyés lors du parcours d'un itinéraire, tout en recevant d'autres RouteNote
(par exemple, d'autres utilisateurs).
C'est exactement le type de cas d'utilisation pour le streaming bidirectionnel. Dans un RPC de streaming bidirectionnel, les deux parties envoient une séquence de messages à l'aide d'un flux en lecture-écriture. Les deux flux fonctionnent indépendamment, de sorte que les clients et les serveurs peuvent lire et écrire dans l'ordre de leur choix.
Par exemple, le serveur peut attendre de recevoir tous les messages du client avant d'écrire ses réponses, ou il peut lire un message, puis en écrire un, ou encore effectuer une autre combinaison de lectures et d'écritures.
L'ordre des messages dans chaque flux est conservé. Pour spécifier ce type de méthode, placez le mot clé stream
avant la requête et la réponse.
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. Générer le code client et serveur
Nous vous avons déjà fourni le code généré à partir du fichier .proto
dans le répertoire généré.
Si vous souhaitez apprendre à générer du code à partir du fichier .proto
vous-même ou apporter des modifications au fichier .proto
et les tester, consultez ces instructions.
Le code généré contient les éléments suivants :
- Définitions de struct pour les types de messages
Point
,Feature
,Rectangle
,RouteNote
etRouteSummary
. - Un trait de service que nous devrons implémenter :
route_guide_server::RouteGuide
. - Type de client que nous utiliserons pour appeler le serveur :
route_guide_client::RouteGuideClient<T>
.
Ensuite, nous implémenterons les méthodes côté serveur afin que le serveur puisse répondre lorsqu'un client envoie une requête.
5. Implémenter le service
Commençons par examiner comment créer un serveur RouteGuide
. Pour que notre service RouteGuide
fonctionne correctement, deux éléments sont nécessaires :
- Implémenter l'interface de service générée à partir de notre définition de service : effectuer le "travail" réel de notre service.
- Exécuter un serveur gRPC pour écouter les requêtes des clients et les envoyer à l'implémentation de méthode appropriée.
Dans src/server/server.rs
, nous pouvons mettre le code généré dans le champ d'application à l'aide de la macro include_generated_proto!
de gRPC et importer le trait RouteGuide
et Point
.
mod grpc_pb {
grpc::include_generated_proto!("generated", "routeguide");
}
pub use grpc_pb::{
route_guide_server::{RouteGuideServer, RouteGuide},
Point, Feature, Rectangle, RouteNote, RouteSummary
};
Nous pouvons commencer par définir une structure pour représenter notre service. Pour l'instant, nous pouvons le faire sur src/server/server.rs
:
#[derive(Debug)]
pub struct RouteGuideService {
features: Vec<Feature>,
}
Nous devons maintenant implémenter le trait route_guide_server::RouteGuide
à partir du code généré.
Implémenter RouteGuide
Nous devons implémenter l'interface RouteGuide
générée. Voici à quoi ressemblerait l'implémentation. Cette information figure déjà dans le modèle.
#[tonic::async_trait] impl RouteGuide for RouteGuideService { async fn list_features( &self, request: Request<Rectangle>, ) -> Result<Response<ListFeaturesStream>, Status> { ... } async fn record_route( &self, request: Request<tonic::Streaming<Point>>, ) -> Result<Response<RouteSummary>, Status> { ... } async fn route_chat( &self, request: Request<tonic::Streaming<RouteNote>>, ) -> Result<Response<RouteChatStream>, Status> { ... } }
Examinons en détail chaque implémentation RPC.
RPC de streaming côté serveur
Commençons par ListFeatures
. Il s'agit d'un RPC de streaming côté serveur. Nous devons donc renvoyer plusieurs Feature
à notre client.
async fn list_features(
&self,
request: Request<Rectangle>,
) -> Result<Response<ListFeaturesStream>, Status> {
println!("ListFeatures = {:?}", request);
let (tx, rx) = mpsc::channel(4);
let features = self.features.clone();
tokio::spawn(async move {
for feature in &features[..] {
if in_range(&feature.location().to_owned(), request.get_ref()) {
println!(" => send {feature:?}");
tx.send(Ok(feature.clone())).await.unwrap();
}
}
println!(" /// done sending");
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream)))
}
Comme vous pouvez le voir, nous obtenons un objet de requête (le Rectangle
dans lequel notre client souhaite trouver Features
). Cette fois, nous devons renvoyer un flux de valeurs. Nous créons un canal et générons une nouvelle tâche asynchrone dans laquelle nous effectuons une recherche, en envoyant dans le canal les caractéristiques qui répondent à nos contraintes. La moitié du flux du canal est renvoyée à l'appelant, enveloppée dans un tonic::Response
.
RPC de streaming côté client
Examinons maintenant quelque chose d'un peu plus compliqué : la méthode de streaming côté client RecordRoute
, où nous obtenons un flux de Points
du client et renvoyons un seul RouteSummary
avec des informations sur son trajet. Il reçoit un flux en entrée, que le serveur peut utiliser pour lire et écrire des messages. Il peut parcourir les messages du client à l'aide de sa méthode next()
et renvoyer sa réponse unique.
async fn record_route(
&self,
request: Request<tonic::Streaming<Point>>,
) -> Result<Response<RouteSummary>, Status> {
println!("RecordRoute");
let mut stream = request.into_inner();
let mut summary = RouteSummary::default();
let mut last_point = None;
let now = Instant::now();
while let Some(point) = stream.next().await {
let point = point?;
println!(" ==> Point = {point:?}");
// Increment the point count
summary.set_point_count(summary.point_count() + 1);
// Find features
for feature in &self.features[..] {
if feature.location().latitude() == point.latitude() {
if feature.location().longitude() == point.longitude(){
summary.set_feature_count(summary.feature_count() + 1);
}
}
}
// Calculate the distance
if let Some(ref last_point) = last_point {
let new_dist = summary.distance() + calc_distance(last_point, &point);
summary.set_distance(new_dist);
}
last_point = Some(point);
}
summary.set_elapsed_time(now.elapsed().as_secs() as i32);
Ok(Response::new(summary))
}
Dans le corps de la méthode, nous utilisons la méthode next()
du flux pour lire à plusieurs reprises les requêtes de notre client dans un objet de requête (dans ce cas, un Point
) jusqu'à ce qu'il n'y ait plus de messages. Si la valeur est "None", le flux est toujours valide et la lecture peut se poursuivre.
RPC de streaming bidirectionnel
Enfin, examinons notre RPC de streaming bidirectionnel RouteChat()
.
async fn route_chat(
&self,
request: Request<tonic::Streaming<RouteNote>>,
) -> Result<Response<RouteChatStream>, Status> {
println!("RouteChat");
let mut notes: HashMap<(i32, i32), Vec<RouteNote>> = HashMap::new();
let mut stream = request.into_inner();
let output = async_stream::try_stream! {
while let Some(note) = stream.next().await {
let note = note?;
let location = note.location();
let key = (location.latitude(), location.longitude());
let location_notes = notes.entry(key).or_insert(vec![]);
location_notes.push(note);
for note in location_notes {
yield note.clone();
}
}
};
Ok(Response::new(Box::pin(output)))
}
Cette fois, nous obtenons un flux qui, comme dans notre exemple de streaming côté client, peut être utilisé pour lire et écrire des messages. Cette fois, nous renvoyons des valeurs via le flux de notre méthode pendant que le client continue d'écrire des messages dans son flux de messages. La syntaxe de lecture et d'écriture est très semblable à celle de notre méthode de streaming client, sauf que le serveur renvoie un RouteChatStream
. Bien que chaque côté reçoive toujours les messages de l'autre dans l'ordre dans lequel ils ont été écrits, le client et le serveur peuvent lire et écrire dans n'importe quel ordre. Les flux fonctionnent de manière totalement indépendante.
Nous créons le flux output
à l'aide de try_stream!
, ce qui indique que le flux peut renvoyer des erreurs.
Démarrer le serveur
Une fois cette méthode implémentée, nous devons également démarrer un serveur gRPC pour que les clients puissent réellement utiliser notre service. Remplissez main()
.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:10000".parse().unwrap();
println!("RouteGuideServer listening on: {addr}");
let route_guide = RouteGuideService {
features: load(),
};
let svc = RouteGuideServer::new(route_guide);
Server::builder().add_service(svc).serve(addr).await?;
Ok(())
}
Voici ce qui se passe dans main()
, étape par étape :
- Spécifiez le port que nous souhaitons utiliser pour écouter les requêtes du client.
- Créer un
RouteGuideService
avec des fonctionnalités chargées - Créez une instance du serveur gRPC à l'aide de
RouteGuideServer::new()
et du service que nous avons créé. - Enregistrez l'implémentation de notre service auprès du serveur gRPC.
- Appelez
serve()
sur le serveur avec les détails de notre port pour effectuer une attente bloquante jusqu'à ce que le processus soit arrêté.
6. Créer le client
Dans cette section, nous allons voir comment créer un client Rust pour notre service RouteGuide dans src/client/client.rs
.
Commencez par inclure le code généré dans le champ d'application.
mod grpc_pb {
grpc::include_generated_proto!("generated", "routeguide");
}
use grpc_pb::route_guide_client::RouteGuideClient;
use grpc_pb::{Point, Rectangle, RouteNote};
Appeler les méthodes de service
Voyons maintenant comment appeler les méthodes de service. Dans gRPC-Rust, les RPC fonctionnent en mode bloquant/synchrone, ce qui signifie que l'appel RPC attend la réponse du serveur et renvoie une réponse ou une erreur.
RPC de streaming côté serveur
C'est ici que nous appelons la méthode de streaming côté serveur ListFeatures
, qui renvoie un flux d'objets Feature
géographiques.
async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let rectangle = proto!(Rectangle {
lo: proto!(Point {
latitude: 400_000_000,
longitude: -750_000_000,
}),
hi: proto!(Point {
latitude: 420_000_000,
longitude: -730_000_000,
}),
});
let mut stream = client
.list_features(Request::new(rectangle))
.await?
.into_inner();
while let Some(feature) = stream.message().await? {
println!("FEATURE: Name = \"{}\", Lat = {}, Lon = {}",
feature.name(),
feature.location().latitude(),
feature.location().longitude());
}
Ok(())
}
Nous transmettons une requête à la méthode et obtenons une instance de ListFeaturesStream
. Le client peut utiliser le flux ListFeaturesStream
pour lire les réponses du serveur. Nous utilisons la méthode message()
de ListFeaturesStream
pour lire à plusieurs reprises les réponses du serveur dans un objet de tampon de protocole de réponse (dans ce cas, un Feature
) jusqu'à ce qu'il n'y ait plus de messages.
RPC de streaming côté client
Ici, pour record_route
, nous transformons un vecteur de points en flux. Nous transmettons ensuite ce flux à record_route()
en tant que requête et obtenons une seule réponse RouteSummary
une fois que le flux a été entièrement traité par le serveur.
async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let mut rng = rand::rng();
let point_count: i32 = rng.random_range(2..100);
let mut points = vec![];
for _ in 0..=point_count {
points.push(random_point(&mut rng))
}
println!("Traversing {} points", points.len());
let request = Request::new(tokio_stream::iter(points));
match client.record_route(request).await {
Ok(response) => {
let response = response.into_inner();
println!("SUMMARY: Feature Count = {}, Distance = {}", response.feature_count(), response.distance())},
Err(e) => println!("something went wrong: {e:?}"),
}
Ok(())
}
RPC de streaming bidirectionnel
Enfin, examinons notre RPC de streaming bidirectionnel RouteChat()
. Nous transmettons à la méthode une requête de flux dans laquelle nous écrivons et nous récupérons un flux à partir duquel nous pouvons lire des messages. Cette fois, nous renvoyons des valeurs via le flux de notre méthode pendant que le serveur écrit encore des messages dans son flux de messages.
async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let start = time::Instant::now();
let outbound = async_stream::stream! {
let mut interval = time::interval(Duration::from_secs(1));
for _ in 0..10 {
let time = interval.tick().await;
let elapsed = time.duration_since(start);
let note = proto!(RouteNote {
location: proto!(Point {
latitude: 409146138 + elapsed.as_secs() as i32,
longitude: -746188906,
}),
message: format!("at {elapsed:?}"),
});
yield note;
}
};
let response = client.route_chat(Request::new(outbound)).await?;
let mut inbound = response.into_inner();
while let Some(note) = inbound.message().await? {
println!("Note: Latitude = {}, Longitude = {}, Message = \"{}\"",
note.location().latitude(),
note.location().longitude(),
note.message());
}
Ok(())
}
Bien que chaque côté reçoive toujours les messages de l'autre dans l'ordre dans lequel ils ont été écrits, le client et le serveur peuvent lire et écrire dans n'importe quel ordre. Les flux fonctionnent de manière totalement indépendante.
Appeler des méthodes d'assistance
Pour appeler des méthodes de service, nous devons d'abord créer un canal pour communiquer avec le serveur. Pour ce faire, nous créons d'abord un point de terminaison, nous nous y connectons et nous transmettons le canal créé lors de la connexion à RouteGuideClient::new()
comme suit :
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoint to connect to
let endpoint = Endpoint::new("http://[::1]:10000")?;
let channel = endpoint.connect().await?;
// Create a new client
let mut client = RouteGuideClient::new(channel);
Ok(())
}
Dans main()
, exécutez les méthodes que nous venons de créer.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoint to connect to
let endpoint = Endpoint::new("http://[::1]:10000")?;
let channel = endpoint.connect().await?;
// Create a new client
let mut client = RouteGuideClient::new(channel);
println!("\n*** SERVER STREAMING ***");
print_features(&mut client).await?;
println!("\n*** CLIENT STREAMING ***");
run_record_route(&mut client).await?;
println!("\n*** BIDIRECTIONAL STREAMING ***");
run_route_chat(&mut client).await?;
Ok(())
}
7. Essayer
Tout d'abord, pour exécuter notre client et notre serveur, ajoutons-les en tant que cibles binaires à notre crate. Nous devons modifier notre Cargo.toml en conséquence :
[[bin]]
name = "routeguide-server"
path = "src/server/server.rs"
[[bin]]
name = "routeguide-client"
path = "src/client/client.rs"
Comme pour tout projet, nous devons également réfléchir aux dépendances nécessaires au bon fonctionnement de notre code. Pour les projets Rust, les dépendances se trouveront dans Cargo.toml
. Nous avons déjà listé les dépendances nécessaires dans le fichier Cargo.toml
.
Ensuite, exécutez les commandes suivantes à partir de nos répertoires de travail :
- Exécutez le serveur dans un terminal :
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-server
- Exécutez le client à partir d'un autre terminal :
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-client
Un résultat semblable à celui-ci s'affiche :
*** SERVER STREAMING *** FEATURE: Name = "Patriots Path, Mendham, NJ 07945, USA", Lat = 407838351, Lon = -746143763 FEATURE: Name = "101 New Jersey 10, Whippany, NJ 07981, USA", Lat = 408122808, Lon = -743999179 FEATURE: Name = "U.S. 6, Shohola, PA 18458, USA", Lat = 413628156, Lon = -749015468 ... *** CLIENT STREAMING *** Traversing 86 points SUMMARY: Feature Count = 0, Distance = 803709356 *** BIDIRECTIONAL STREAMING *** Note: Latitude = 409146138, Longitude = -746188906, Message = "at 112.45µs" Note: Latitude = 409146139, Longitude = -746188906, Message = "at 1.00011245s" Note: Latitude = 409146140, Longitude = -746188906, Message = "at 2.00011245s"
8. Étape suivante
- Découvrez comment fonctionne gRPC dans Introduction à gRPC et Concepts de base.
- Parcourez le tutoriel sur les principes de base.