1. はじめに
この Codelab では、gRPC-Rust を使用して、Rust で記述されたルート マッピング アプリケーションの基盤となるクライアントとサーバーを作成します。
このチュートリアルを完了すると、gRPC を使用してリモート サーバーに接続し、クライアントのルート上の機能に関する情報を取得し、クライアントのルートの概要を作成し、サーバーや他のクライアントとトラフィックの更新などのルート情報を交換するクライアントが作成されます。
サービスは Protocol Buffers ファイルで定義されます。このファイルは、クライアントとサーバーが相互に通信できるように、クライアントとサーバーのボイラープレート コードを生成するために使用されます。これにより、この機能を実装する時間と労力を節約できます。
この生成されたコードは、サーバーとクライアント間の通信の複雑さだけでなく、データのシリアル化と逆シリアル化も処理します。
学習内容
- プロトコル バッファを使用してサービス API を定義する方法。
- 自動コード生成を使用して、プロトコル バッファ定義から gRPC ベースのクライアントとサーバーを構築する方法。
- gRPC を使用したクライアント サーバー ストリーミング通信の理解。
この Codelab は、gRPC を初めて使用する Rust デベロッパー、gRPC の復習を希望するデベロッパー、分散システムの構築に関心のある方を対象としています。gRPC の経験は必要ありません。
2. 始める前に
前提条件
次のものがインストールされていることを確認します。
コードを取得する
この Codelab では、完全にゼロから始める必要がないように、アプリケーションのソースコードのスケルトンが用意されています。次の手順では、プロトコル バッファ コンパイラ プラグインを使用してボイラープレート gRPC コードを生成するなど、アプリケーションを完成させる方法について説明します。
まず、Codelab の作業ディレクトリを作成し、そのディレクトリに cd
します。
mkdir streaming-grpc-rust-getting-started && cd streaming-grpc-rust-getting-started
Codelab をダウンロードして解凍します。
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \ | tar xvz --strip-components=4 \ grpc-codelabs-1/codelabs/grpc-rust-streaming/start_here
または、Codelab ディレクトリのみを含む .zip ファイルをダウンロードして、手動で解凍することもできます。
実装の入力をスキップする場合は、完成したソースコードを GitHub で入手できます。
3. メッセージとサービスを定義する
まず、プロトコル バッファを使用して、アプリケーションの gRPC サービス、RPC メソッド、リクエストとレスポンスのメッセージ タイプを定義します。サービスでは以下を提供します。
- サーバーが実装し、クライアントが呼び出す RPC メソッド
ListFeatures
、RecordRoute
、RouteChat
。 - 上記のメソッドを呼び出すときにクライアントとサーバーの間で交換されるデータ構造であるメッセージ タイプ
Point
、Feature
、Rectangle
、RouteNote
、RouteSummary
。
これらの RPC メソッドとそのメッセージ型はすべて、提供されたソースコードの proto/routeguide.proto
ファイルで定義されます。
Protocol Buffers は一般に protobuf と呼ばれます。gRPC の用語の詳細については、gRPC の コア コンセプト、アーキテクチャ、ライフサイクルをご覧ください。
メッセージのタイプを定義する
まず、RPC で使用されるメッセージを定義しましょう。ソースコードの routeguide/route_guide.proto
ファイルで、まず Point
メッセージ タイプを定義します。Point
は、地図上の緯度と経度の座標ペアを表します。この Codelab では、座標に整数を使用します。
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
数値 1
と 2
は、message
構造内の各フィールドの一意の ID 番号です。
次に、Feature
メッセージ タイプを定義します。Feature
は、Point
で指定された場所にあるものの名前または郵便番号に string
フィールドを使用します。
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
次に、緯度と経度の長方形を表す Rectangle
メッセージ。これは、対角線上の 2 つの点「lo」と「hi」で表されます。
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
また、特定の時点で送信されたメッセージを表す RouteNote
メッセージもあります。
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
また、RouteSummary
メッセージも必要です。このメッセージは、次のセクションで説明する RecordRoute
RPC のレスポンスとして受信されます。受信した個々のポイントの数、検出された特徴の数、各ポイント間の距離の累積合計としてカバーされた合計距離が含まれます。
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
サービス メソッドを定義する
まずサービスを定義し、後でメッセージを定義しましょう。サービスを定義するには、.proto
ファイルで名前付きサービスを指定します。proto/routeguide.proto
ファイルには、アプリケーションのサービスによって提供される 1 つ以上のメソッドを定義する RouteGuide
という名前の service
構造があります。
サービス定義内で RPC メソッドを定義し、リクエストとレスポンスの型を指定します。この Codelab のこのセクションでは、次のことを定義します。
ListFeatures
指定された Rectangle
内で使用可能な Feature
を取得します。長方形が広い領域をカバーし、多数のフィーチャーを含む可能性があるため、結果は一度に返されるのではなく(繰り返しフィールドを含むレスポンス メッセージなど)、ストリーミングされます。
この RPC に適したタイプは、サーバーサイド ストリーミング RPC です。クライアントがサーバーにリクエストを送信し、一連のメッセージを読み取るストリームを取得します。クライアントは、受信したストリームをメッセージがなくなるまで読み取ります。例に示すように、サーバーサイド ストリーミング メソッドは、レスポンス タイプの前に stream
キーワードを配置して指定します。
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
トラバースされるルート上の Point
のストリームを受け取り、トラバースが完了すると RouteSummary
を返します。
この場合は、クライアントサイド ストリーミング RPC が適切です。クライアントは、提供されたストリームを使用して、一連のメッセージを書き込み、サーバーに送信します。クライアントは、メッセージの書き込みを終了すると、サーバーがすべてのメッセージを読み取ってレスポンスを返すのを待ちます。クライアントサイド ストリーミング メソッドを指定するには、リクエスト タイプの前に stream
キーワードを配置します。
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
ルートの移動中に送信された RouteNote
のストリームを受け取り、他の RouteNote
(他のユーザーからのものなど)も受け取ります。
これは、まさに双方向ストリーミングのユースケースです。双方向ストリーミング RPC では、両側が読み取り / 書き込みストリームを使用して一連のメッセージを送信します。2 つのストリームは独立して動作するため、クライアントとサーバーは任意の順序で読み取りと書き込みを行うことができます。
たとえば、サーバーはすべてのクライアント メッセージを受信してからレスポンスを書き込むことも、メッセージを読み取ってからメッセージを書き込むことを交互に行うことも、読み取りと書き込みの他の組み合わせを行うこともできます。
各ストリーム内のメッセージの順序は保持されます。このタイプのメソッドは、リクエストとレスポンスの両方の前に stream
キーワードを配置して指定します。
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. クライアント コードとサーバーコードを生成する
生成されたディレクトリの .proto
ファイルから生成されたコードは、すでに提供されています。
.proto
ファイルからコードを生成する方法や、.proto
ファイルに変更を加えてテストする方法については、こちらの手順をご覧ください。
生成されたコードには次のものが含まれています。
- メッセージ タイプ
Point
、Feature
、Rectangle
、RouteNote
、RouteSummary
の構造体定義。 - 実装する必要があるサービストレイト:
route_guide_server::RouteGuide
。 - サーバーの呼び出しに使用するクライアント タイプ:
route_guide_client::RouteGuideClient<T>
。
次に、クライアントがリクエストを送信したときにサーバーが回答を返せるように、サーバーサイドでメソッドを実装します。
5. サービスを実装する
まず、RouteGuide
サーバーの作成方法を見てみましょう。RouteGuide
サービスが機能するには、次の 2 つの要素が必要です。
- サービス定義から生成されたサービス インターフェースの実装: サービスの実際の「作業」を行う。
- gRPC サーバーを実行して、クライアントからのリクエストをリッスンし、適切なメソッド実装にディスパッチします。
src/server/server.rs
では、gRPC の include_generated_proto!
マクロを使用して生成されたコードをスコープに含め、RouteGuide
トレイトと Point
をインポートできます。
mod grpc_pb {
grpc::include_generated_proto!("generated", "routeguide");
}
pub use grpc_pb::{
route_guide_server::{RouteGuideServer, RouteGuide},
Point, Feature, Rectangle, RouteNote, RouteSummary
};
まず、サービスを表す構造体を定義します。現時点では、src/server/server.rs
でこれを行うことができます。
#[derive(Debug)]
pub struct RouteGuideService {
features: Vec<Feature>,
}
次に、生成されたコードから route_guide_server::RouteGuide
トレイトを実装する必要があります。
RouteGuide を実装する
生成された RouteGuide
インターフェースを実装する必要があります。実装は次のようになります。これはすでにテンプレートに含まれています。
#[tonic::async_trait] impl RouteGuide for RouteGuideService { async fn list_features( &self, request: Request<Rectangle>, ) -> Result<Response<ListFeaturesStream>, Status> { ... } async fn record_route( &self, request: Request<tonic::Streaming<Point>>, ) -> Result<Response<RouteSummary>, Status> { ... } async fn route_chat( &self, request: Request<tonic::Streaming<RouteNote>>, ) -> Result<Response<RouteChatStream>, Status> { ... } }
各 RPC 実装について詳しく見ていきましょう。
サーバーサイド ストリーミング RPC
ListFeatures
から始めましょう。これはサーバーサイド ストリーミング RPC であるため、複数の Feature
をクライアントに送り返す必要があります。
async fn list_features(
&self,
request: Request<Rectangle>,
) -> Result<Response<ListFeaturesStream>, Status> {
println!("ListFeatures = {:?}", request);
let (tx, rx) = mpsc::channel(4);
let features = self.features.clone();
tokio::spawn(async move {
for feature in &features[..] {
if in_range(&feature.location().to_owned(), request.get_ref()) {
println!(" => send {feature:?}");
tx.send(Ok(feature.clone())).await.unwrap();
}
}
println!(" /// done sending");
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream)))
}
ご覧のとおり、リクエスト オブジェクト(クライアントが Features
を検索する Rectangle
)を取得します。今回は、値のストリームを返す必要があります。チャンネルを作成し、新しい非同期タスクを生成してルックアップを実行し、制約を満たす特徴をチャンネルに送信します。チャネルの Stream の半分が tonic::Response
でラップされて呼び出し元に返されます。
クライアントサイド ストリーミング RPC
次に、もう少し複雑なクライアントサイド ストリーミング メソッド RecordRoute
を見てみましょう。ここでは、クライアントから Points
のストリームを取得し、旅行に関する情報を含む単一の RouteSummary
を返します。入力としてストリームを取得します。サーバーはこれを使用して、メッセージの読み取りと書き込みの両方を行うことができます。next()
メソッドを使用してクライアント メッセージを反復処理し、単一のレスポンスを返すことができます。
async fn record_route(
&self,
request: Request<tonic::Streaming<Point>>,
) -> Result<Response<RouteSummary>, Status> {
println!("RecordRoute");
let mut stream = request.into_inner();
let mut summary = RouteSummary::default();
let mut last_point = None;
let now = Instant::now();
while let Some(point) = stream.next().await {
let point = point?;
println!(" ==> Point = {point:?}");
// Increment the point count
summary.set_point_count(summary.point_count() + 1);
// Find features
for feature in &self.features[..] {
if feature.location().latitude() == point.latitude() {
if feature.location().longitude() == point.longitude(){
summary.set_feature_count(summary.feature_count() + 1);
}
}
}
// Calculate the distance
if let Some(ref last_point) = last_point {
let new_dist = summary.distance() + calc_distance(last_point, &point);
summary.set_distance(new_dist);
}
last_point = Some(point);
}
summary.set_elapsed_time(now.elapsed().as_secs() as i32);
Ok(Response::new(summary))
}
メソッド本文では、ストリームの next()
メソッドを使用して、メッセージがなくなるまでクライアントのリクエストをリクエスト オブジェクト(この場合は Point
)に繰り返し読み込みます。None の場合、ストリームはまだ有効で、読み取りを続行できます。
双方向ストリーミング RPC
最後に、双方向ストリーミング RPC RouteChat()
を見てみましょう。
async fn route_chat(
&self,
request: Request<tonic::Streaming<RouteNote>>,
) -> Result<Response<RouteChatStream>, Status> {
println!("RouteChat");
let mut notes: HashMap<(i32, i32), Vec<RouteNote>> = HashMap::new();
let mut stream = request.into_inner();
let output = async_stream::try_stream! {
while let Some(note) = stream.next().await {
let note = note?;
let location = note.location();
let key = (location.latitude(), location.longitude());
let location_notes = notes.entry(key).or_insert(vec![]);
location_notes.push(note);
for note in location_notes {
yield note.clone();
}
}
};
Ok(Response::new(Box::pin(output)))
}
今回は、クライアントサイドのストリーミングの例と同様に、メッセージの読み取りと書き込みに使用できるストリームを取得します。ただし、今回はクライアントがメッセージ ストリームにメッセージを書き込んでいる間に、メソッドのストリームを介して値を返します。ここでの読み取りと書き込みの構文は、サーバーが RouteChatStream
を返す点を除いて、クライアント ストリーミング メソッドとよく似ています。各サイドは常に相手のメッセージを書き込まれた順に取得しますが、クライアントとサーバーの両方が任意の順序で読み取りと書き込みを行うことができます。ストリームは完全に独立して動作します。
try_stream!
を使用して output
ストリームを作成します。これは、ストリームがエラーを返す可能性があることを示します。
サーバーを起動する
このメソッドを実装したら、クライアントが実際にサービスを使用できるように、gRPC サーバーも起動する必要があります。main()
に入力します。
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:10000".parse().unwrap();
println!("RouteGuideServer listening on: {addr}");
let route_guide = RouteGuideService {
features: load(),
};
let svc = RouteGuideServer::new(route_guide);
Server::builder().add_service(svc).serve(addr).await?;
Ok(())
}
main()
で起こっていることをステップごとに説明します。
- クライアント リクエストをリッスンするために使用するポートを指定します。
- 特徴が読み込まれた
RouteGuideService
を作成する - 作成したサービスを使用して、
RouteGuideServer::new()
を使用して gRPC サーバーのインスタンスを作成します。 - サービス実装を gRPC サーバーに登録します。
- ポートの詳細を使用してサーバーで
serve()
を呼び出し、プロセスが強制終了されるまでブロック待機を行います。
6. クライアントを作成する
このセクションでは、src/client/client.rs
で RouteGuide サービス用の Rust クライアントを作成する方法について説明します。
まず、生成されたコードをスコープに含めます。
mod grpc_pb {
grpc::include_generated_proto!("generated", "routeguide");
}
use grpc_pb::route_guide_client::RouteGuideClient;
use grpc_pb::{Point, Rectangle, RouteNote};
サービス メソッドを呼び出す
次に、サービス メソッドを呼び出す方法を見てみましょう。gRPC-Rust では、RPC はブロッキング/同期モードで動作します。つまり、RPC 呼び出しはサーバーからの応答を待機し、応答またはエラーを返します。
サーバーサイド ストリーミング RPC
ここでは、サーバーサイド ストリーミング メソッド ListFeatures
を呼び出します。このメソッドは、地理的な Feature
オブジェクトのストリームを返します。
async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let rectangle = proto!(Rectangle {
lo: proto!(Point {
latitude: 400_000_000,
longitude: -750_000_000,
}),
hi: proto!(Point {
latitude: 420_000_000,
longitude: -730_000_000,
}),
});
let mut stream = client
.list_features(Request::new(rectangle))
.await?
.into_inner();
while let Some(feature) = stream.message().await? {
println!("FEATURE: Name = \"{}\", Lat = {}, Lon = {}",
feature.name(),
feature.location().latitude(),
feature.location().longitude());
}
Ok(())
}
メソッドにリクエストを渡すと、ListFeaturesStream
のインスタンスが返されます。クライアントは ListFeaturesStream
ストリームを使用して、サーバーのレスポンスを読み取ることができます。ListFeaturesStream
の message()
メソッドを使用して、メッセージがなくなるまで、サーバーのレスポンスをレスポンス プロトコル バッファ オブジェクト(この場合は Feature
)に繰り返し読み込みます。
クライアントサイド ストリーミング RPC
ここでは、record_route
の場合、ポイントのベクトルをストリームに変換します。次に、このストリームをリクエストとして record_route()
に渡し、サーバーでストリームが完全に処理された後に単一の RouteSummary
レスポンスを取得します。
async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let mut rng = rand::rng();
let point_count: i32 = rng.random_range(2..100);
let mut points = vec![];
for _ in 0..=point_count {
points.push(random_point(&mut rng))
}
println!("Traversing {} points", points.len());
let request = Request::new(tokio_stream::iter(points));
match client.record_route(request).await {
Ok(response) => {
let response = response.into_inner();
println!("SUMMARY: Feature Count = {}, Distance = {}", response.feature_count(), response.distance())},
Err(e) => println!("something went wrong: {e:?}"),
}
Ok(())
}
双方向ストリーミング RPC
最後に、双方向ストリーミング RPC RouteChat()
を見てみましょう。メソッドに書き込むストリーム リクエストを渡し、メッセージを読み取ることができるストリームを取得します。今回は、サーバーがメッセージ ストリームにメッセージを書き込んでいる間に、メソッドのストリームを介して値を返します。
async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let start = time::Instant::now();
let outbound = async_stream::stream! {
let mut interval = time::interval(Duration::from_secs(1));
for _ in 0..10 {
let time = interval.tick().await;
let elapsed = time.duration_since(start);
let note = proto!(RouteNote {
location: proto!(Point {
latitude: 409146138 + elapsed.as_secs() as i32,
longitude: -746188906,
}),
message: format!("at {elapsed:?}"),
});
yield note;
}
};
let response = client.route_chat(Request::new(outbound)).await?;
let mut inbound = response.into_inner();
while let Some(note) = inbound.message().await? {
println!("Note: Latitude = {}, Longitude = {}, Message = \"{}\"",
note.location().latitude(),
note.location().longitude(),
note.message());
}
Ok(())
}
各サイドは常に相手のメッセージを書き込まれた順に取得しますが、クライアントとサーバーの両方が任意の順序で読み取りと書き込みを行うことができます。ストリームは完全に独立して動作します。
ヘルパー メソッドを呼び出す
サービス メソッドを呼び出すには、まずサーバーと通信するためのチャネルを作成する必要があります。これを作成するには、まずエンドポイントを作成し、そのエンドポイントに接続して、接続時に作成されたチャネルを RouteGuideClient::new()
に渡します。
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoint to connect to
let endpoint = Endpoint::new("http://[::1]:10000")?;
let channel = endpoint.connect().await?;
// Create a new client
let mut client = RouteGuideClient::new(channel);
Ok(())
}
main()
で、作成したメソッドを実行します。
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoint to connect to
let endpoint = Endpoint::new("http://[::1]:10000")?;
let channel = endpoint.connect().await?;
// Create a new client
let mut client = RouteGuideClient::new(channel);
println!("\n*** SERVER STREAMING ***");
print_features(&mut client).await?;
println!("\n*** CLIENT STREAMING ***");
run_record_route(&mut client).await?;
println!("\n*** BIDIRECTIONAL STREAMING ***");
run_route_chat(&mut client).await?;
Ok(())
}
7. 試してみる
まず、クライアントとサーバーを実行するために、それらをバイナリ ターゲットとしてクレートに追加しましょう。Cargo.toml を次のように編集する必要があります。
[[bin]]
name = "routeguide-server"
path = "src/server/server.rs"
[[bin]]
name = "routeguide-client"
path = "src/client/client.rs"
どのプロジェクトでも、コードが動作するために必要な依存関係を考慮する必要があります。Rust プロジェクトの場合、依存関係は Cargo.toml
にあります。必要な依存関係は Cargo.toml
ファイルにすでに記載されています。
次に、作業ディレクトリから次のコマンドを実行します。
- 1 つのターミナルでサーバーを実行します。
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-server
- 別のターミナルからクライアントを実行します。
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-client
次のような出力が表示されます。
*** SERVER STREAMING *** FEATURE: Name = "Patriots Path, Mendham, NJ 07945, USA", Lat = 407838351, Lon = -746143763 FEATURE: Name = "101 New Jersey 10, Whippany, NJ 07981, USA", Lat = 408122808, Lon = -743999179 FEATURE: Name = "U.S. 6, Shohola, PA 18458, USA", Lat = 413628156, Lon = -749015468 ... *** CLIENT STREAMING *** Traversing 86 points SUMMARY: Feature Count = 0, Distance = 803709356 *** BIDIRECTIONAL STREAMING *** Note: Latitude = 409146138, Longitude = -746188906, Message = "at 112.45µs" Note: Latitude = 409146139, Longitude = -746188906, Message = "at 1.00011245s" Note: Latitude = 409146140, Longitude = -746188906, Message = "at 2.00011245s"