gRPC-Rust スタートガイド - ストリーミング

1. はじめに

この Codelab では、gRPC-Rust を使用して、Rust で記述されたルート マッピング アプリケーションの基盤となるクライアントとサーバーを作成します。

このチュートリアルを完了すると、gRPC を使用してリモート サーバーに接続し、クライアントのルート上の機能に関する情報を取得し、クライアントのルートの概要を作成し、サーバーや他のクライアントとトラフィックの更新などのルート情報を交換するクライアントが作成されます。

サービスは Protocol Buffers ファイルで定義されます。このファイルは、クライアントとサーバーが相互に通信できるように、クライアントとサーバーのボイラープレート コードを生成するために使用されます。これにより、この機能を実装する時間と労力を節約できます。

この生成されたコードは、サーバーとクライアント間の通信の複雑さだけでなく、データのシリアル化と逆シリアル化も処理します。

学習内容

  • プロトコル バッファを使用してサービス API を定義する方法。
  • 自動コード生成を使用して、プロトコル バッファ定義から gRPC ベースのクライアントとサーバーを構築する方法。
  • gRPC を使用したクライアント サーバー ストリーミング通信の理解。

この Codelab は、gRPC を初めて使用する Rust デベロッパー、gRPC の復習を希望するデベロッパー、分散システムの構築に関心のある方を対象としています。gRPC の経験は必要ありません。

2. 始める前に

前提条件

次のものがインストールされていることを確認します。

  • GCC。こちらの手順に沿って対応します。
  • Rust、最新バージョン。こちらのインストール手順に沿って操作します。

コードを取得する

この Codelab では、完全にゼロから始める必要がないように、アプリケーションのソースコードのスケルトンが用意されています。次の手順では、プロトコル バッファ コンパイラ プラグインを使用してボイラープレート gRPC コードを生成するなど、アプリケーションを完成させる方法について説明します。

まず、Codelab の作業ディレクトリを作成し、そのディレクトリに cd します。

mkdir streaming-grpc-rust-getting-started && cd streaming-grpc-rust-getting-started

Codelab をダウンロードして解凍します。

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-rust-streaming/start_here

または、Codelab ディレクトリのみを含む .zip ファイルをダウンロードして、手動で解凍することもできます。

実装の入力をスキップする場合は、完成したソースコードを GitHub で入手できます。

3. メッセージとサービスを定義する

まず、プロトコル バッファを使用して、アプリケーションの gRPC サービス、RPC メソッド、リクエストとレスポンスのメッセージ タイプを定義します。サービスでは以下を提供します。

  • サーバーが実装し、クライアントが呼び出す RPC メソッド ListFeaturesRecordRouteRouteChat
  • 上記のメソッドを呼び出すときにクライアントとサーバーの間で交換されるデータ構造であるメッセージ タイプ PointFeatureRectangleRouteNoteRouteSummary

これらの RPC メソッドとそのメッセージ型はすべて、提供されたソースコードの proto/routeguide.proto ファイルで定義されます。

Protocol Buffers は一般に protobuf と呼ばれます。gRPC の用語の詳細については、gRPC の コア コンセプト、アーキテクチャ、ライフサイクルをご覧ください。

メッセージのタイプを定義する

まず、RPC で使用されるメッセージを定義しましょう。ソースコードの routeguide/route_guide.proto ファイルで、まず Point メッセージ タイプを定義します。Point は、地図上の緯度と経度の座標ペアを表します。この Codelab では、座標に整数を使用します。

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

数値 12 は、message 構造内の各フィールドの一意の ID 番号です。

次に、Feature メッセージ タイプを定義します。Feature は、Point で指定された場所にあるものの名前または郵便番号に string フィールドを使用します。

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

次に、緯度と経度の長方形を表す Rectangle メッセージ。これは、対角線上の 2 つの点「lo」と「hi」で表されます。

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

また、特定の時点で送信されたメッセージを表す RouteNote メッセージもあります。

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

また、RouteSummary メッセージも必要です。このメッセージは、次のセクションで説明する RecordRoute RPC のレスポンスとして受信されます。受信した個々のポイントの数、検出された特徴の数、各ポイント間の距離の累積合計としてカバーされた合計距離が含まれます。

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

サービス メソッドを定義する

まずサービスを定義し、後でメッセージを定義しましょう。サービスを定義するには、.proto ファイルで名前付きサービスを指定します。proto/routeguide.proto ファイルには、アプリケーションのサービスによって提供される 1 つ以上のメソッドを定義する RouteGuide という名前の service 構造があります。

サービス定義内で RPC メソッドを定義し、リクエストとレスポンスの型を指定します。この Codelab のこのセクションでは、次のことを定義します。

ListFeatures

指定された Rectangle 内で使用可能な Feature を取得します。長方形が広い領域をカバーし、多数のフィーチャーを含む可能性があるため、結果は一度に返されるのではなく(繰り返しフィールドを含むレスポンス メッセージなど)、ストリーミングされます。

この RPC に適したタイプは、サーバーサイド ストリーミング RPC です。クライアントがサーバーにリクエストを送信し、一連のメッセージを読み取るストリームを取得します。クライアントは、受信したストリームをメッセージがなくなるまで読み取ります。例に示すように、サーバーサイド ストリーミング メソッドは、レスポンス タイプの前に stream キーワードを配置して指定します。

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

トラバースされるルート上の Point のストリームを受け取り、トラバースが完了すると RouteSummary を返します。

この場合は、クライアントサイド ストリーミング RPC が適切です。クライアントは、提供されたストリームを使用して、一連のメッセージを書き込み、サーバーに送信します。クライアントは、メッセージの書き込みを終了すると、サーバーがすべてのメッセージを読み取ってレスポンスを返すのを待ちます。クライアントサイド ストリーミング メソッドを指定するには、リクエスト タイプの前に stream キーワードを配置します。

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

ルートの移動中に送信された RouteNote のストリームを受け取り、他の RouteNote(他のユーザーからのものなど)も受け取ります。

これは、まさに双方向ストリーミングのユースケースです。双方向ストリーミング RPC では、両側が読み取り / 書き込みストリームを使用して一連のメッセージを送信します。2 つのストリームは独立して動作するため、クライアントとサーバーは任意の順序で読み取りと書き込みを行うことができます。

たとえば、サーバーはすべてのクライアント メッセージを受信してからレスポンスを書き込むことも、メッセージを読み取ってからメッセージを書き込むことを交互に行うことも、読み取りと書き込みの他の組み合わせを行うこともできます。

各ストリーム内のメッセージの順序は保持されます。このタイプのメソッドは、リクエストとレスポンスの両方の前に stream キーワードを配置して指定します。

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. クライアント コードとサーバーコードを生成する

生成されたディレクトリの .proto ファイルから生成されたコードは、すでに提供されています。

.proto ファイルからコードを生成する方法や、.proto ファイルに変更を加えてテストする方法については、こちらの手順をご覧ください。

生成されたコードには次のものが含まれています。

  • メッセージ タイプ PointFeatureRectangleRouteNoteRouteSummary の構造体定義。
  • 実装する必要があるサービストレイト: route_guide_server::RouteGuide
  • サーバーの呼び出しに使用するクライアント タイプ: route_guide_client::RouteGuideClient<T>

次に、クライアントがリクエストを送信したときにサーバーが回答を返せるように、サーバーサイドでメソッドを実装します。

5. サービスを実装する

まず、RouteGuide サーバーの作成方法を見てみましょう。RouteGuide サービスが機能するには、次の 2 つの要素が必要です。

  • サービス定義から生成されたサービス インターフェースの実装: サービスの実際の「作業」を行う。
  • gRPC サーバーを実行して、クライアントからのリクエストをリッスンし、適切なメソッド実装にディスパッチします。

src/server/server.rs では、gRPC の include_generated_proto! マクロを使用して生成されたコードをスコープに含め、RouteGuide トレイトと Point をインポートできます。

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

pub use grpc_pb::{
    route_guide_server::{RouteGuideServer, RouteGuide},
    Point, Feature, Rectangle, RouteNote, RouteSummary
};

まず、サービスを表す構造体を定義します。現時点では、src/server/server.rs でこれを行うことができます。

#[derive(Debug)]
pub struct RouteGuideService {
    features: Vec<Feature>,
}

次に、生成されたコードから route_guide_server::RouteGuide トレイトを実装する必要があります。

RouteGuide を実装する

生成された RouteGuide インターフェースを実装する必要があります。実装は次のようになります。これはすでにテンプレートに含まれています。

#[tonic::async_trait]
impl RouteGuide for RouteGuideService {
    async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        ...
    }

    async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        ...
    }

    async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        ...
    }
}

各 RPC 実装について詳しく見ていきましょう。

サーバーサイド ストリーミング RPC

ListFeatures から始めましょう。これはサーバーサイド ストリーミング RPC であるため、複数の Feature をクライアントに送り返す必要があります。

async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        println!("ListFeatures = {:?}", request);

        let (tx, rx) = mpsc::channel(4);
        let features = self.features.clone();

        tokio::spawn(async move {
            for feature in &features[..] {
                if in_range(&feature.location().to_owned(), request.get_ref()) {
                    println!("  => send {feature:?}");
                    tx.send(Ok(feature.clone())).await.unwrap();
                }
            }
            println!(" /// done sending");
        });

        let output_stream = ReceiverStream::new(rx);
        Ok(Response::new(Box::pin(output_stream)))
    }

ご覧のとおり、リクエスト オブジェクト(クライアントが Features を検索する Rectangle)を取得します。今回は、値のストリームを返す必要があります。チャンネルを作成し、新しい非同期タスクを生成してルックアップを実行し、制約を満たす特徴をチャンネルに送信します。チャネルの Stream の半分が tonic::Response でラップされて呼び出し元に返されます。

クライアントサイド ストリーミング RPC

次に、もう少し複雑なクライアントサイド ストリーミング メソッド RecordRoute を見てみましょう。ここでは、クライアントから Points のストリームを取得し、旅行に関する情報を含む単一の RouteSummary を返します。入力としてストリームを取得します。サーバーはこれを使用して、メッセージの読み取りと書き込みの両方を行うことができます。next() メソッドを使用してクライアント メッセージを反復処理し、単一のレスポンスを返すことができます。

async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        println!("RecordRoute");
        let mut stream = request.into_inner();
        let mut summary = RouteSummary::default();
        let mut last_point = None;
        let now = Instant::now();

        while let Some(point) = stream.next().await {
            let point = point?;
            println!("  ==> Point = {point:?}");

            // Increment the point count
            summary.set_point_count(summary.point_count() + 1);

            // Find features
            for feature in &self.features[..] {
                if feature.location().latitude() == point.latitude() {
                    if feature.location().longitude() == point.longitude(){
                        summary.set_feature_count(summary.feature_count() + 1);
                    }
                }
            }

            // Calculate the distance
            if let Some(ref last_point) = last_point {
                let new_dist = summary.distance() + calc_distance(last_point, &point);
                summary.set_distance(new_dist);
            }
            last_point = Some(point);
        }
        summary.set_elapsed_time(now.elapsed().as_secs() as i32);
        Ok(Response::new(summary))
    }

メソッド本文では、ストリームの next() メソッドを使用して、メッセージがなくなるまでクライアントのリクエストをリクエスト オブジェクト(この場合は Point)に繰り返し読み込みます。None の場合、ストリームはまだ有効で、読み取りを続行できます。

双方向ストリーミング RPC

最後に、双方向ストリーミング RPC RouteChat() を見てみましょう。

async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        println!("RouteChat");

        let mut notes: HashMap<(i32, i32), Vec<RouteNote>> = HashMap::new();
        let mut stream = request.into_inner();

        let output = async_stream::try_stream! {
            while let Some(note) = stream.next().await {
                let note = note?;
                let location = note.location();
                let key = (location.latitude(), location.longitude());
                let location_notes = notes.entry(key).or_insert(vec![]);
                location_notes.push(note);
                for note in location_notes {
                    yield note.clone();
                }
            }
        };
        Ok(Response::new(Box::pin(output)))
    }

今回は、クライアントサイドのストリーミングの例と同様に、メッセージの読み取りと書き込みに使用できるストリームを取得します。ただし、今回はクライアントがメッセージ ストリームにメッセージを書き込んでいる間に、メソッドのストリームを介して値を返します。ここでの読み取りと書き込みの構文は、サーバーが RouteChatStream を返す点を除いて、クライアント ストリーミング メソッドとよく似ています。各サイドは常に相手のメッセージを書き込まれた順に取得しますが、クライアントとサーバーの両方が任意の順序で読み取りと書き込みを行うことができます。ストリームは完全に独立して動作します。

try_stream! を使用して output ストリームを作成します。これは、ストリームがエラーを返す可能性があることを示します。

サーバーを起動する

このメソッドを実装したら、クライアントが実際にサービスを使用できるように、gRPC サーバーも起動する必要があります。main() に入力します。

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:10000".parse().unwrap();
    println!("RouteGuideServer listening on: {addr}");
    let route_guide = RouteGuideService {
        features: load(),
    };
    let svc = RouteGuideServer::new(route_guide);
    Server::builder().add_service(svc).serve(addr).await?;
    Ok(())
}

main() で起こっていることをステップごとに説明します。

  1. クライアント リクエストをリッスンするために使用するポートを指定します。
  2. 特徴が読み込まれた RouteGuideService を作成する
  3. 作成したサービスを使用して、RouteGuideServer::new() を使用して gRPC サーバーのインスタンスを作成します。
  4. サービス実装を gRPC サーバーに登録します。
  5. ポートの詳細を使用してサーバーで serve() を呼び出し、プロセスが強制終了されるまでブロック待機を行います。

6. クライアントを作成する

このセクションでは、src/client/client.rs で RouteGuide サービス用の Rust クライアントを作成する方法について説明します。

まず、生成されたコードをスコープに含めます。

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

use grpc_pb::route_guide_client::RouteGuideClient;
use grpc_pb::{Point, Rectangle, RouteNote};

サービス メソッドを呼び出す

次に、サービス メソッドを呼び出す方法を見てみましょう。gRPC-Rust では、RPC はブロッキング/同期モードで動作します。つまり、RPC 呼び出しはサーバーからの応答を待機し、応答またはエラーを返します。

サーバーサイド ストリーミング RPC

ここでは、サーバーサイド ストリーミング メソッド ListFeatures を呼び出します。このメソッドは、地理的な Feature オブジェクトのストリームを返します。

async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let rectangle = proto!(Rectangle {
        lo: proto!(Point {
            latitude: 400_000_000,
            longitude: -750_000_000,
        }),
        hi: proto!(Point {
            latitude: 420_000_000,
            longitude: -730_000_000,
        }),
    });

    let mut stream = client
        .list_features(Request::new(rectangle))
        .await?
        .into_inner();

    while let Some(feature) = stream.message().await? {
        println!("FEATURE: Name = \"{}\", Lat = {}, Lon = {}",
            feature.name(),
            feature.location().latitude(),
            feature.location().longitude());
        }
    Ok(())
}

メソッドにリクエストを渡すと、ListFeaturesStream のインスタンスが返されます。クライアントは ListFeaturesStream ストリームを使用して、サーバーのレスポンスを読み取ることができます。ListFeaturesStreammessage() メソッドを使用して、メッセージがなくなるまで、サーバーのレスポンスをレスポンス プロトコル バッファ オブジェクト(この場合は Feature)に繰り返し読み込みます。

クライアントサイド ストリーミング RPC

ここでは、record_route の場合、ポイントのベクトルをストリームに変換します。次に、このストリームをリクエストとして record_route() に渡し、サーバーでストリームが完全に処理された後に単一の RouteSummary レスポンスを取得します。

async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let mut rng = rand::rng();
    let point_count: i32 = rng.random_range(2..100);

    let mut points = vec![];
    for _ in 0..=point_count {
        points.push(random_point(&mut rng))
    }

    println!("Traversing {} points", points.len());
    let request = Request::new(tokio_stream::iter(points));

    match client.record_route(request).await {
        Ok(response) => {
            let response = response.into_inner();
            println!("SUMMARY: Feature Count = {}, Distance = {}", response.feature_count(), response.distance())},
        Err(e) => println!("something went wrong: {e:?}"),
    }

    Ok(())
}

双方向ストリーミング RPC

最後に、双方向ストリーミング RPC RouteChat() を見てみましょう。メソッドに書き込むストリーム リクエストを渡し、メッセージを読み取ることができるストリームを取得します。今回は、サーバーがメッセージ ストリームにメッセージを書き込んでいる間に、メソッドのストリームを介して値を返します。

async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let start = time::Instant::now();
    let outbound = async_stream::stream! {
        let mut interval = time::interval(Duration::from_secs(1));
        for _ in 0..10 {
            let time = interval.tick().await;
            let elapsed = time.duration_since(start);
            let note = proto!(RouteNote {
                location: proto!(Point {
                    latitude: 409146138 + elapsed.as_secs() as i32,
                    longitude: -746188906,
                }),
                message: format!("at {elapsed:?}"),
            });
            yield note;
        }
    };
    let response = client.route_chat(Request::new(outbound)).await?;
    let mut inbound = response.into_inner();
    while let Some(note) = inbound.message().await? {
        println!("Note: Latitude = {}, Longitude = {}, Message = \"{}\"",
            note.location().latitude(),
            note.location().longitude(),
            note.message());
        }
    Ok(())
}

各サイドは常に相手のメッセージを書き込まれた順に取得しますが、クライアントとサーバーの両方が任意の順序で読み取りと書き込みを行うことができます。ストリームは完全に独立して動作します。

ヘルパー メソッドを呼び出す

サービス メソッドを呼び出すには、まずサーバーと通信するためのチャネルを作成する必要があります。これを作成するには、まずエンドポイントを作成し、そのエンドポイントに接続して、接続時に作成されたチャネルを RouteGuideClient::new() に渡します。

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel); 
    Ok(())
}

main() で、作成したメソッドを実行します。

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel);

    println!("\n*** SERVER STREAMING ***");
    print_features(&mut client).await?;

    println!("\n*** CLIENT STREAMING ***");
    run_record_route(&mut client).await?;

    println!("\n*** BIDIRECTIONAL STREAMING ***");
    run_route_chat(&mut client).await?;

    Ok(())
}

7. 試してみる

まず、クライアントとサーバーを実行するために、それらをバイナリ ターゲットとしてクレートに追加しましょう。Cargo.toml を次のように編集する必要があります。

[[bin]]
name = "routeguide-server"
path = "src/server/server.rs"

[[bin]]
name = "routeguide-client"
path = "src/client/client.rs"

どのプロジェクトでも、コードが動作するために必要な依存関係を考慮する必要があります。Rust プロジェクトの場合、依存関係は Cargo.toml にあります。必要な依存関係は Cargo.toml ファイルにすでに記載されています。

次に、作業ディレクトリから次のコマンドを実行します。

  1. 1 つのターミナルでサーバーを実行します。
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-server 
  1. 別のターミナルからクライアントを実行します。
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-client

次のような出力が表示されます。

*** SERVER STREAMING ***
FEATURE: Name = "Patriots Path, Mendham, NJ 07945, USA", Lat = 407838351, Lon = -746143763
FEATURE: Name = "101 New Jersey 10, Whippany, NJ 07981, USA", Lat = 408122808, Lon = -743999179
FEATURE: Name = "U.S. 6, Shohola, PA 18458, USA", Lat = 413628156, Lon = -749015468
...
*** CLIENT STREAMING ***
Traversing 86 points
SUMMARY: Feature Count = 0, Distance = 803709356

*** BIDIRECTIONAL STREAMING ***
Note: Latitude = 409146138, Longitude = -746188906, Message = "at 112.45µs"
Note: Latitude = 409146139, Longitude = -746188906, Message = "at 1.00011245s"
Note: Latitude = 409146140, Longitude = -746188906, Message = "at 2.00011245s"

8. 次のステップ