1. Введение
В этой лабораторной работе вы будете использовать gRPC-Rust для создания клиента и сервера, которые составят основу приложения для сопоставления маршрутов, написанного на Rust.
К концу руководства у вас будет клиент, который подключается к удаленному серверу с помощью gRPC для получения информации об объектах на маршруте клиента, создания сводки маршрута клиента и обмена информацией о маршруте, такой как обновления трафика, с сервером и другими клиентами.
Служба определена в файле Protocol Buffers, который будет использоваться для генерации шаблонного кода для клиента и сервера, чтобы они могли взаимодействовать друг с другом, экономя ваше время и усилия при реализации этой функциональности.
Сгенерированный код учитывает не только сложности взаимодействия между сервером и клиентом, но также сериализацию и десериализацию данных.
Чему вы научитесь
- Как использовать Protocol Buffers для определения API сервиса.
- Как создать клиент и сервер на основе gRPC из определения Protocol Buffers с использованием автоматической генерации кода.
- Понимание потокового взаимодействия клиент-сервер с помощью gRPC.
Эта практическая работа предназначена для разработчиков Rust, впервые использующих gRPC или желающих освежить свои знания, а также для всех, кто интересуется разработкой распределённых систем. Опыт работы с gRPC не требуется.
2. Прежде чем начать
Предпосылки
Убедитесь, что у вас установлено следующее:
Получить код
Чтобы вам не пришлось начинать всё с нуля, эта лабораторная работа предоставляет вам заготовку исходного кода приложения. Следующие шаги покажут вам, как завершить приложение, включая использование плагинов компилятора буфера протокола для генерации шаблонного кода gRPC.
Сначала создайте рабочий каталог codelab и cd
в него:
mkdir streaming-grpc-rust-getting-started && cd streaming-grpc-rust-getting-started
Загрузите и распакуйте кодовую лабораторию:
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \ | tar xvz --strip-components=4 \ grpc-codelabs-1/codelabs/grpc-rust-streaming/start_here
Кроме того, вы можете загрузить .zip-файл, содержащий только каталог codelab, и вручную распаковать его.
Если вы не хотите вводить реализацию вручную, готовый исходный код доступен на GitHub .
3. Определите сообщения и услуги
Первым шагом будет определение службы gRPC приложения, её методов RPC и типов сообщений запросов и ответов с помощью Protocol Buffers . Ваша служба будет предоставлять:
- Методы RPC, называемые
ListFeatures
,RecordRoute
иRouteChat
, которые реализует сервер и вызывает клиент. - Типы сообщений
Point
,Feature
,Rectangle
,RouteNote
иRouteSummary
представляют собой структуры данных, которыми обмениваются клиент и сервер при вызове методов, указанных выше.
Все эти методы RPC и типы сообщений будут определены в файле proto/routeguide.proto
предоставленного исходного кода.
Буферы протоколов обычно называются protobuf. Подробнее о терминологии gRPC см. в разделе «Основные концепции, архитектура и жизненный цикл gRPC».
Определить типы сообщений
Давайте сначала определим сообщения, которые будут использоваться нашими RPC. В файле routeguide/route_guide.proto
исходного кода сначала определите тип сообщения Point
. Point
представляет собой пару координат (широта-долгота) на карте. В этой практической работе используйте целые числа для координат:
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
Цифры 1
и 2
— это уникальные идентификационные номера для каждого из полей в структуре message
.
Затем определите тип сообщения Feature
. Feature
использует string
поле для имени или почтового адреса объекта, расположенного в месте, указанном Point
:
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
Далее следует сообщение Rectangle
, которое представляет собой прямоугольник широты и долготы, представленный двумя диагонально противоположными точками «lo» и «hi».
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
Также сообщение RouteNote
, которое представляет собой сообщение, отправленное в заданной точке.
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
Нам также потребуется сообщение RouteSummary
. Это сообщение приходит в ответ на RPC-запрос RecordRoute
, который описан в следующем разделе. Оно содержит количество полученных отдельных точек, количество обнаруженных объектов и общее пройденное расстояние, представляющее собой сумму расстояний между точками.
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
Определить методы обслуживания
Давайте сначала определим нашу службу, а затем определим сообщения. Чтобы определить службу, укажите её название в файле .proto
. Файл proto/routeguide.proto
содержит структуру service
RouteGuide
, которая определяет один или несколько методов, предоставляемых службой приложения.
Определите методы RPC в определении вашего сервиса, указав типы их запросов и ответов. В этом разделе практикума давайте определим:
СписокОсобенности
Получает Feature
, доступные в заданном Rectangle
. Результаты передаются потоком, а не возвращаются сразу (например, в ответном сообщении с повторяющимся полем), поскольку прямоугольник может охватывать большую область и содержать огромное количество объектов.
Подходящим типом для этого RPC является потоковый RPC на стороне сервера : клиент отправляет запрос серверу и получает поток для чтения последовательности сообщений. Клиент читает данные из возвращаемого потока до тех пор, пока сообщения не закончатся. Как видно из нашего примера, метод потоковой передачи на стороне сервера указывается ключевым словом stream
перед типом ответа.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
Принимает поток Point
на проходимом маршруте и возвращает RouteSummary
после завершения обхода.
В данном случае RPC -вызов на стороне клиента кажется уместным: клиент записывает последовательность сообщений и отправляет их на сервер, снова используя предоставленный поток. После завершения записи сообщений клиент ожидает, пока сервер прочитает их все и вернет ответ. Метод потоковой передачи на стороне клиента указывается ключевым словом stream
перед типом запроса.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
Принимает поток RouteNote
, отправленных во время прохождения маршрута, одновременно получая другие RouteNote
(например, от других пользователей).
Именно такой вариант использования двунаправленной потоковой передачи данных . Двунаправленный потоковый RPC предполагает, что обе стороны отправляют последовательность сообщений, используя поток чтения-записи. Два потока работают независимо, поэтому клиенты и серверы могут читать и записывать данные в любом порядке.
Например, сервер может дождаться получения всех клиентских сообщений, прежде чем писать свои ответы, или он может поочередно читать сообщение, а затем писать сообщение или использовать какую-либо другую комбинацию чтений и записей.
Порядок сообщений в каждом потоке сохраняется. Этот метод задаётся путём добавления ключевого слова stream
перед запросом и ответом.
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. Сгенерируйте клиентский и серверный код.
Мы уже предоставили вам сгенерированный код из файла .proto
в сгенерированном каталоге.
Если вы хотите узнать, как самостоятельно сгенерировать код из файла .proto
или внести какие-либо изменения в файл .proto
и протестировать их, обратитесь к этим инструкциям .
Сгенерированный код содержит:
- Определения структур для типов сообщений
Point
,Feature
,Rectangle
,RouteNote
иRouteSummary
. - Сервисный признак, который нам необходимо реализовать:
route_guide_server::RouteGuide
. - Тип клиента, который мы будем использовать для вызова сервера:
route_guide_client::RouteGuideClient<T>
.
Далее мы реализуем методы на стороне сервера, чтобы при отправке клиентом запроса сервер мог ответить.
5. Реализуйте услугу
Сначала давайте рассмотрим, как создать сервер RouteGuide
. Чтобы наш сервис RouteGuide
заработал, нужно выполнить два шага:
- Реализация интерфейса сервиса, созданного на основе определения нашего сервиса: выполнение фактической «работы» нашего сервиса.
- Запуск сервера gRPC для прослушивания запросов от клиентов и отправки их в нужную реализацию метода.
В src/server/server.rs
мы можем добавить сгенерированный код в область действия с помощью макроса gRPC include_generated_proto!
и импортировать трейт RouteGuide
и Point
.
mod grpc_pb {
grpc::include_generated_proto!("generated", "routeguide");
}
pub use grpc_pb::{
route_guide_server::{RouteGuideServer, RouteGuide},
Point, Feature, Rectangle, RouteNote, RouteSummary
};
Начнём с определения структуры, представляющей нашу службу. На данный момент это можно сделать в файле src/server/server.rs
:
#[derive(Debug)]
pub struct RouteGuideService {
features: Vec<Feature>,
}
Теперь нам нужно реализовать черту route_guide_server::RouteGuide
из нашего сгенерированного кода.
Реализовать RouteGuide
Нам нужно реализовать сгенерированный интерфейс RouteGuide
. Вот как будет выглядеть реализация. Она уже есть в шаблоне.
#[tonic::async_trait] impl RouteGuide for RouteGuideService { async fn list_features( &self, request: Request<Rectangle>, ) -> Result<Response<ListFeaturesStream>, Status> { ... } async fn record_route( &self, request: Request<tonic::Streaming<Point>>, ) -> Result<Response<RouteSummary>, Status> { ... } async fn route_chat( &self, request: Request<tonic::Streaming<RouteNote>>, ) -> Result<Response<RouteChatStream>, Status> { ... } }
Давайте подробно рассмотрим каждую реализацию RPC.
Потоковая передача RPC на стороне сервера
Начнём с ListFeatures
. Это потоковый RPC-запрос на стороне сервера, поэтому нам нужно отправить несколько Feature
клиенту.
async fn list_features(
&self,
request: Request<Rectangle>,
) -> Result<Response<ListFeaturesStream>, Status> {
println!("ListFeatures = {:?}", request);
let (tx, rx) = mpsc::channel(4);
let features = self.features.clone();
tokio::spawn(async move {
for feature in &features[..] {
if in_range(&feature.location().to_owned(), request.get_ref()) {
println!(" => send {feature:?}");
tx.send(Ok(feature.clone())).await.unwrap();
}
}
println!(" /// done sending");
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream)))
}
Как видите, мы получаем объект запроса ( Rectangle
, в котором наш клиент хочет найти Features
). На этот раз нам нужно вернуть поток значений. Мы создаём канал и запускаем новую асинхронную задачу, в которой выполняем поиск, отправляя в канал объекты, удовлетворяющие нашим ограничениям. Половина Stream канала возвращается вызывающей стороне, обёрнутая в tonic::Response
.
Клиентская потоковая передача RPC
Теперь рассмотрим что-то более сложное: клиентский потоковый метод RecordRoute
, который получает поток Points
от клиента и возвращает один RouteSummary
с информацией о его поездке. В качестве входных данных он получает поток, который сервер может использовать как для чтения, так и для записи сообщений. Он может перебирать клиентские сообщения с помощью метода next()
и возвращать единственный ответ.
async fn record_route(
&self,
request: Request<tonic::Streaming<Point>>,
) -> Result<Response<RouteSummary>, Status> {
println!("RecordRoute");
let mut stream = request.into_inner();
let mut summary = RouteSummary::default();
let mut last_point = None;
let now = Instant::now();
while let Some(point) = stream.next().await {
let point = point?;
println!(" ==> Point = {point:?}");
// Increment the point count
summary.set_point_count(summary.point_count() + 1);
// Find features
for feature in &self.features[..] {
if feature.location().latitude() == point.latitude() {
if feature.location().longitude() == point.longitude(){
summary.set_feature_count(summary.feature_count() + 1);
}
}
}
// Calculate the distance
if let Some(ref last_point) = last_point {
let new_dist = summary.distance() + calc_distance(last_point, &point);
summary.set_distance(new_dist);
}
last_point = Some(point);
}
summary.set_elapsed_time(now.elapsed().as_secs() as i32);
Ok(Response::new(summary))
}
В теле метода мы используем метод next()
потока для многократного чтения клиентских запросов к объекту запроса (в данном случае Point
), пока не будут получены все сообщения. Если значение равно None, поток всё ещё в порядке и может продолжать чтение.
Двунаправленный потоковый RPC
Наконец, давайте рассмотрим наш двунаправленный потоковый RPC RouteChat()
.
async fn route_chat(
&self,
request: Request<tonic::Streaming<RouteNote>>,
) -> Result<Response<RouteChatStream>, Status> {
println!("RouteChat");
let mut notes: HashMap<(i32, i32), Vec<RouteNote>> = HashMap::new();
let mut stream = request.into_inner();
let output = async_stream::try_stream! {
while let Some(note) = stream.next().await {
let note = note?;
let location = note.location();
let key = (location.latitude(), location.longitude());
let location_notes = notes.entry(key).or_insert(vec![]);
location_notes.push(note);
for note in location_notes {
yield note.clone();
}
}
};
Ok(Response::new(Box::pin(output)))
}
На этот раз мы получаем поток, который, как и в нашем примере с потоковой передачей на стороне клиента, можно использовать для чтения и записи сообщений. Однако на этот раз мы возвращаем значения через поток нашего метода, пока клиент продолжает записывать сообщения в свой поток сообщений. Синтаксис чтения и записи здесь очень похож на наш метод потоковой передачи на стороне клиента, за исключением того, что сервер возвращает RouteChatStream
. Хотя каждая сторона всегда получает сообщения другой стороны в том порядке, в котором они были записаны, и клиент, и сервер могут читать и записывать сообщения в любом порядке — потоки работают совершенно независимо.
Мы создаем output
поток с помощью try_stream!
что указывает на то, что поток может возвращать ошибки.
Запустить сервер
После реализации этого метода нам также необходимо запустить gRPC-сервер, чтобы клиенты могли использовать наш сервис. Заполните main()
.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:10000".parse().unwrap();
println!("RouteGuideServer listening on: {addr}");
let route_guide = RouteGuideService {
features: load(),
};
let svc = RouteGuideServer::new(route_guide);
Server::builder().add_service(svc).serve(addr).await?;
Ok(())
}
Вот что происходит в main()
, шаг за шагом:
- Укажите порт, который мы хотим использовать для прослушивания клиентских запросов.
- Создайте
RouteGuideService
с функциями, загруженными в - Создайте экземпляр сервера gRPC с помощью
RouteGuideServer::new()
используя созданную нами службу. - Зарегистрируйте реализацию нашей службы на сервере gRPC.
- Вызовите
serve()
на сервере с данными нашего порта, чтобы выполнить блокирующее ожидание, пока процесс не будет завершен.
6. Создайте клиента
В этом разделе мы рассмотрим создание клиента Rust для нашей службы RouteGuide в src/client/client.rs
.
Сначала добавьте сгенерированный код в область действия.
mod grpc_pb {
grpc::include_generated_proto!("generated", "routeguide");
}
use grpc_pb::route_guide_client::RouteGuideClient;
use grpc_pb::{Point, Rectangle, RouteNote};
Методы обслуживания вызовов
Теперь давайте посмотрим, как мы вызываем наши сервисные методы. В gRPC-Rust RPC работают в блокирующем/синхронном режиме, то есть RPC-вызов ожидает ответа сервера и либо возвращает ответ, либо ошибку.
Потоковая передача RPC на стороне сервера
Здесь мы вызываем серверный потоковый метод ListFeatures
, который возвращает поток географических объектов Feature
.
async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let rectangle = proto!(Rectangle {
lo: proto!(Point {
latitude: 400_000_000,
longitude: -750_000_000,
}),
hi: proto!(Point {
latitude: 420_000_000,
longitude: -730_000_000,
}),
});
let mut stream = client
.list_features(Request::new(rectangle))
.await?
.into_inner();
while let Some(feature) = stream.message().await? {
println!("FEATURE: Name = \"{}\", Lat = {}, Lon = {}",
feature.name(),
feature.location().latitude(),
feature.location().longitude());
}
Ok(())
}
Мы передаём методу запрос и получаем обратно экземпляр ListFeaturesStream
. Клиент может использовать поток ListFeaturesStream
для чтения ответов сервера. Мы используем метод message()
объекта ListFeaturesStream
для многократного чтения ответов сервера в объект буфера протокола ответа (в данном случае Feature
), пока сообщения не закончатся.
Клиентская потоковая передача RPC
Здесь для record_route
мы преобразуем вектор точек в поток. Затем мы передаём этот поток в record_route()
как запрос и получаем один ответ RouteSummary
после полной обработки потока сервером.
async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let mut rng = rand::rng();
let point_count: i32 = rng.random_range(2..100);
let mut points = vec![];
for _ in 0..=point_count {
points.push(random_point(&mut rng))
}
println!("Traversing {} points", points.len());
let request = Request::new(tokio_stream::iter(points));
match client.record_route(request).await {
Ok(response) => {
let response = response.into_inner();
println!("SUMMARY: Feature Count = {}, Distance = {}", response.feature_count(), response.distance())},
Err(e) => println!("something went wrong: {e:?}"),
}
Ok(())
}
Двунаправленный потоковый RPC
Наконец, давайте рассмотрим наш двунаправленный потоковый RPC RouteChat()
. Мы передаём методу запрос потока, в который записываем данные, и получаем обратно поток, из которого можем читать сообщения. На этот раз мы возвращаем значения через поток нашего метода, пока сервер всё ещё записывает сообщения в свой поток.
async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let start = time::Instant::now();
let outbound = async_stream::stream! {
let mut interval = time::interval(Duration::from_secs(1));
for _ in 0..10 {
let time = interval.tick().await;
let elapsed = time.duration_since(start);
let note = proto!(RouteNote {
location: proto!(Point {
latitude: 409146138 + elapsed.as_secs() as i32,
longitude: -746188906,
}),
message: format!("at {elapsed:?}"),
});
yield note;
}
};
let response = client.route_chat(Request::new(outbound)).await?;
let mut inbound = response.into_inner();
while let Some(note) = inbound.message().await? {
println!("Note: Latitude = {}, Longitude = {}, Message = \"{}\"",
note.location().latitude(),
note.location().longitude(),
note.message());
}
Ok(())
}
Хотя каждая сторона всегда будет получать сообщения другой стороны в том порядке, в котором они были написаны, и клиент, и сервер могут читать и писать в любом порядке — потоки работают совершенно независимо.
Вызов вспомогательных методов
Для вызова методов сервиса необходимо сначала создать канал для взаимодействия с сервером. Для этого сначала создаём конечную точку, подключаемся к ней и передаём созданный при подключении канал в RouteGuideClient::new()
следующим образом:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoint to connect to
let endpoint = Endpoint::new("http://[::1]:10000")?;
let channel = endpoint.connect().await?;
// Create a new client
let mut client = RouteGuideClient::new(channel);
Ok(())
}
В main()
выполните методы, которые мы только что создали.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoint to connect to
let endpoint = Endpoint::new("http://[::1]:10000")?;
let channel = endpoint.connect().await?;
// Create a new client
let mut client = RouteGuideClient::new(channel);
println!("\n*** SERVER STREAMING ***");
print_features(&mut client).await?;
println!("\n*** CLIENT STREAMING ***");
run_record_route(&mut client).await?;
println!("\n*** BIDIRECTIONAL STREAMING ***");
run_route_chat(&mut client).await?;
Ok(())
}
7. Попробуйте
Для начала, чтобы запустить наш клиент и сервер, добавим их в качестве бинарных целей в наш контейнер. Необходимо отредактировать файл Cargo.toml соответствующим образом:
[[bin]]
name = "routeguide-server"
path = "src/server/server.rs"
[[bin]]
name = "routeguide-client"
path = "src/client/client.rs"
Как и в любом проекте, нам также необходимо продумать зависимости, необходимые для работы нашего кода. Для проектов Rust зависимости будут находиться в Cargo.toml
. Мы уже перечислили необходимые зависимости в файле Cargo.toml
.
Затем выполните следующие команды из наших рабочих каталогов:
- Запустите сервер в одном терминале:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-server
- Запустите клиент с другого терминала:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-client
Вы увидите примерно такой вывод:
*** SERVER STREAMING *** FEATURE: Name = "Patriots Path, Mendham, NJ 07945, USA", Lat = 407838351, Lon = -746143763 FEATURE: Name = "101 New Jersey 10, Whippany, NJ 07981, USA", Lat = 408122808, Lon = -743999179 FEATURE: Name = "U.S. 6, Shohola, PA 18458, USA", Lat = 413628156, Lon = -749015468 ... *** CLIENT STREAMING *** Traversing 86 points SUMMARY: Feature Count = 0, Distance = 803709356 *** BIDIRECTIONAL STREAMING *** Note: Latitude = 409146138, Longitude = -746188906, Message = "at 112.45µs" Note: Latitude = 409146139, Longitude = -746188906, Message = "at 1.00011245s" Note: Latitude = 409146140, Longitude = -746188906, Message = "at 2.00011245s"
8. Что дальше?
- Узнайте, как работает gRPC, из раздела Введение в gRPC и Основные концепции .
- Проработайте учебник по основам .