開始使用 gRPC-Rust - Streaming

1. 簡介

在本程式碼研究室中,您將使用 gRPC-Rust 建立用戶端和伺服器,做為以 Rust 編寫的路線對應應用程式基礎。

完成本教學課程後,您將擁有一個用戶端,可使用 gRPC 連線至遠端伺服器,取得用戶端路線上的功能資訊、建立用戶端路線摘要,以及與伺服器和其他用戶端交換路線資訊 (例如交通資訊更新)。

服務定義於 Protocol Buffers 檔案中,用於產生用戶端和伺服器的樣板程式碼,以便彼此通訊,節省您實作該功能的時間和精力。

這段產生的程式碼不僅會處理伺服器與用戶端之間複雜的通訊,也會處理資料序列化和還原序列化。

課程內容

  • 如何使用通訊協定緩衝區定義服務 API。
  • 如何使用自動程式碼生成功能,從通訊協定緩衝區定義建構以 gRPC 為基礎的用戶端和伺服器。
  • 瞭解如何使用 gRPC 進行用戶端與伺服器之間的串流通訊。

這個程式碼研究室適合剛接觸 gRPC 的 Rust 開發人員、想複習 gRPC 的開發人員,以及對建構分散式系統感興趣的任何人。不需要有 gRPC 相關經驗。

2. 事前準備

必要條件

請確認已安裝下列項目:

  • GCC。請按照這裡的指示操作
  • Rust 最新版本。請按照這裡的安裝說明操作。

取得程式碼

為避免您必須從頭開始,本程式碼研究室提供應用程式原始碼的架構,供您完成。下列步驟將說明如何完成應用程式,包括使用通訊協定緩衝區編譯器外掛程式產生樣板 gRPC 程式碼。

首先,建立程式碼研究室工作目錄,然後 cd 到該目錄:

mkdir streaming-grpc-rust-getting-started && cd streaming-grpc-rust-getting-started

下載並擷取程式碼研究室:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-rust-streaming/start_here

或者,您也可以下載只包含 Codelab 目錄的 .zip 檔案,然後手動解壓縮。

如要略過實作的輸入作業,可以前往 GitHub 取得完整的原始碼

3. 定義訊息和服務

首先,請使用通訊協定緩衝區定義應用程式的 gRPC 服務、RPC 方法,以及要求和回應訊息類型。你的服務將提供:

  • 伺服器實作且用戶端呼叫的 RPC 方法 (ListFeaturesRecordRouteRouteChat)。
  • 訊息類型 PointFeatureRectangleRouteNoteRouteSummary,這些是呼叫上述方法時,用戶端和伺服器之間交換的資料結構。

這些 RPC 方法及其訊息類型都會在提供的原始碼 proto/routeguide.proto 檔案中定義。

通訊協定緩衝區通常稱為 protobuf。如要進一步瞭解 gRPC 術語,請參閱 gRPC 的「核心概念、架構和生命週期」。

定義訊息類型

首先,請定義 RPC 會使用的訊息。在原始碼的 routeguide/route_guide.proto 檔案中,請先定義 Point 訊息型別。Point 代表地圖上的經緯度座標組合。在本程式碼研究室中,請使用整數做為座標:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

數字 12message 結構中每個欄位的專屬 ID 編號。

接著,定義 Feature 訊息類型。Feature 會使用 string 欄位,指定 Point 所指定位置的名稱或郵寄地址:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

接著是 Rectangle 訊息,代表經緯度矩形,以兩個對角點「lo」和「hi」表示。

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

此外,RouteNote 訊息代表在特定時間點傳送的訊息。

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

我們也需要 RouteSummary 訊息。這則訊息是針對 RecordRoute RPC 收到的回應,下一節將說明這項 RPC。其中包含收到的個別點數、偵測到的特徵數量,以及每個點之間距離的累計總和,也就是涵蓋的總距離。

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

定義服務方法

我們先定義服務,稍後再定義訊息。如要定義服務,請在 .proto 檔案中指定具名服務。proto/routeguide.proto 檔案具有名為 RouteGuideservice 結構,可定義應用程式服務提供的一或多個方法。

在服務定義中定義 RPC 方法,並指定要求和回應類型。在本程式碼研究室的這一節中,我們將定義:

ListFeatures

取得指定 Rectangle 內可用的 Feature。由於矩形可能涵蓋大範圍,並包含大量特徵,因此系統會串流處理結果,而不是一次傳回 (例如在含有重複欄位的訊息中)。

這個 RPC 的適當類型是伺服器端串流 RPC:用戶端會將要求傳送至伺服器,並取得串流來讀取一系列訊息。用戶端會從傳回的串流讀取訊息,直到沒有更多訊息為止。如範例所示,您可以在回應型別前加上 stream 關鍵字,指定伺服器端串流方法。

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

接受所遍歷路徑上的 Point 串流,並在遍歷完成時傳回 RouteSummary

在這種情況下,用戶端串流 RPC 似乎很合適:用戶端會寫入一系列訊息並傳送至伺服器,同樣是使用提供的串流。用戶端寫完訊息後,會等待伺服器讀取所有訊息並傳回回應。如要指定用戶端串流方法,請在要求型別前加上 stream 關鍵字。

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

接受在路線遍歷期間傳送的 RouteNote 串流,同時接收其他 RouteNote (例如來自其他使用者)。

這正是雙向串流的適用用途。雙向串流 RPC 的兩端都會使用讀寫串流傳送一連串訊息。這兩個串流各自獨立運作,因此用戶端和伺服器可以依任何順序讀取和寫入。

舉例來說,伺服器可以等待接收所有用戶端訊息,再撰寫回覆內容;也可以讀取訊息,然後撰寫訊息;或是讀取和撰寫訊息的某種組合。

每個串流中的訊息順序都會保留。如要指定這類方法,請在要求和回應前加上 stream 關鍵字。

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. 產生用戶端和伺服器程式碼

我們已在產生的目錄中,提供 .proto 檔案產生的程式碼。

如要瞭解如何自行從 .proto 檔案產生程式碼,或對 .proto 檔案進行任何變更並測試,請參閱這些操作說明

產生的程式碼包含:

  • 訊息類型 PointFeatureRectangleRouteNoteRouteSummary 的結構體定義。
  • 我們需要實作的服務特徵:route_guide_server::RouteGuide
  • 我們將用來呼叫伺服器的用戶端類型:route_guide_client::RouteGuideClient<T>

接著,我們會在伺服器端實作方法,以便在用戶端傳送要求時,伺服器可以回覆答案。

5. 實作服務

首先,我們來看看如何建立 RouteGuide 伺服器。如要讓 RouteGuide 服務正常運作,需要完成以下兩個步驟:

  • 實作從服務定義產生的服務介面:執行服務的實際「工作」。
  • 執行 gRPC 伺服器,監聽來自用戶端的要求,並將要求分派至正確的方法實作。

src/server/server.rs 中,我們可以透過 gRPC 的 include_generated_proto! 巨集,將產生的程式碼帶入範圍,並匯入 RouteGuide 特徵和 Point

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

pub use grpc_pb::{
    route_guide_server::{RouteGuideServer, RouteGuide},
    Point, Feature, Rectangle, RouteNote, RouteSummary
};

首先,我們可以定義一個結構體來代表服務。目前可以在 src/server/server.rs 上執行這項操作:

#[derive(Debug)]
pub struct RouteGuideService {
    features: Vec<Feature>,
}

現在,我們需要從產生的程式碼實作 route_guide_server::RouteGuide 特徵。

實作 RouteGuide

我們需要實作產生的 RouteGuide 介面。實作方式如下所示。範本中已有這項功能。

#[tonic::async_trait]
impl RouteGuide for RouteGuideService {
    async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        ...
    }

    async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        ...
    }

    async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        ...
    }
}

接著詳細瞭解各項 RPC 實作。

伺服器端串流 RPC

首先播放《ListFeatures》。這是伺服器端串流 RPC,因此我們需要將多個 Feature 傳送回用戶端。

async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        println!("ListFeatures = {:?}", request);

        let (tx, rx) = mpsc::channel(4);
        let features = self.features.clone();

        tokio::spawn(async move {
            for feature in &features[..] {
                if in_range(&feature.location().to_owned(), request.get_ref()) {
                    println!("  => send {feature:?}");
                    tx.send(Ok(feature.clone())).await.unwrap();
                }
            }
            println!(" /// done sending");
        });

        let output_stream = ReceiverStream::new(rx);
        Ok(Response::new(Box::pin(output_stream)))
    }

如您所見,我們會取得要求物件 (用戶端想在 Rectangle 中尋找 Features)。這次我們需要傳回值串流。我們會建立管道並產生新的非同步工作,在其中執行查閱作業,然後將符合限制的特徵傳送至管道。系統會將管道的 Stream 半部包裝在 tonic::Response 中,然後傳回給呼叫端。

用戶端串流 RPC

現在來看看稍微複雜一點的內容:用戶端串流方法 RecordRoute,我們會從用戶端取得 Points 串流,並傳回單一 RouteSummary,其中包含行程資訊。這個函式會取得串流做為輸入內容,伺服器可使用該串流讀取及寫入訊息。它可以透過 next() 方法疊代處理用戶端訊息,並傳回單一回應。

async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        println!("RecordRoute");
        let mut stream = request.into_inner();
        let mut summary = RouteSummary::default();
        let mut last_point = None;
        let now = Instant::now();

        while let Some(point) = stream.next().await {
            let point = point?;
            println!("  ==> Point = {point:?}");

            // Increment the point count
            summary.set_point_count(summary.point_count() + 1);

            // Find features
            for feature in &self.features[..] {
                if feature.location().latitude() == point.latitude() {
                    if feature.location().longitude() == point.longitude(){
                        summary.set_feature_count(summary.feature_count() + 1);
                    }
                }
            }

            // Calculate the distance
            if let Some(ref last_point) = last_point {
                let new_dist = summary.distance() + calc_distance(last_point, &point);
                summary.set_distance(new_dist);
            }
            last_point = Some(point);
        }
        summary.set_elapsed_time(now.elapsed().as_secs() as i32);
        Ok(Response::new(summary))
    }

在方法主體中,我們會使用串流的 next() 方法,重複讀取用戶端對要求物件 (本例中為 Point) 的要求,直到沒有其他訊息為止。如果為 None,表示串流仍正常,可以繼續讀取。

雙向串流 RPC

最後,我們來看看雙向串流 RPC RouteChat()

async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        println!("RouteChat");

        let mut notes: HashMap<(i32, i32), Vec<RouteNote>> = HashMap::new();
        let mut stream = request.into_inner();

        let output = async_stream::try_stream! {
            while let Some(note) = stream.next().await {
                let note = note?;
                let location = note.location();
                let key = (location.latitude(), location.longitude());
                let location_notes = notes.entry(key).or_insert(vec![]);
                location_notes.push(note);
                for note in location_notes {
                    yield note.clone();
                }
            }
        };
        Ok(Response::new(Box::pin(output)))
    }

這次我們取得的串流與用戶端串流範例中的串流相同,可用於讀取及寫入訊息。不過,這次我們會在用戶端仍將訊息寫入訊息串流時,透過方法串流傳回值。這裡的讀取和寫入語法與用戶端串流方法非常類似,但伺服器會傳回 RouteChatStream。雖然雙方一律會依撰寫順序收到對方的訊息,但用戶端和伺服器可以依任意順序讀取及寫入資料,因為資料串是完全獨立運作。

我們使用 try_stream! 建立 output 串流,表示串流可能會傳回錯誤。

啟動伺服器

實作這個方法後,我們也需要啟動 gRPC 伺服器,讓用戶端實際使用我們的服務。填入 main()

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:10000".parse().unwrap();
    println!("RouteGuideServer listening on: {addr}");
    let route_guide = RouteGuideService {
        features: load(),
    };
    let svc = RouteGuideServer::new(route_guide);
    Server::builder().add_service(svc).serve(addr).await?;
    Ok(())
}

以下是 main() 的運作方式:

  1. 指定要用來接聽用戶端要求的通訊埠
  2. 建立載入功能的 RouteGuideService
  3. 使用我們建立的服務,透過 RouteGuideServer::new() 建立 gRPC 伺服器執行個體。
  4. 向 gRPC 伺服器註冊服務實作。
  5. 使用我們的連接埠詳細資料在伺服器上呼叫 serve(),進行封鎖等待,直到程序終止為止。

6. 建立用戶端

在本節中,我們將說明如何在 src/client/client.rs 中為 RouteGuide 服務建立 Rust 用戶端。

首先,將產生的程式碼納入範圍。

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

use grpc_pb::route_guide_client::RouteGuideClient;
use grpc_pb::{Point, Rectangle, RouteNote};

呼叫服務方法

接著來看看如何呼叫服務方法。在 gRPC-Rust 中,RPC 會以封鎖/同步模式運作,也就是說,RPC 呼叫會等待伺服器回應,並傳回回應或錯誤。

伺服器端串流 RPC

我們在此呼叫伺服器端串流方法 ListFeatures,該方法會傳回地理 Feature 物件的串流。

async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let rectangle = proto!(Rectangle {
        lo: proto!(Point {
            latitude: 400_000_000,
            longitude: -750_000_000,
        }),
        hi: proto!(Point {
            latitude: 420_000_000,
            longitude: -730_000_000,
        }),
    });

    let mut stream = client
        .list_features(Request::new(rectangle))
        .await?
        .into_inner();

    while let Some(feature) = stream.message().await? {
        println!("FEATURE: Name = \"{}\", Lat = {}, Lon = {}",
            feature.name(),
            feature.location().latitude(),
            feature.location().longitude());
        }
    Ok(())
}

我們將要求傳遞給這個方法,並取得 ListFeaturesStream 的例項。用戶端可以使用 ListFeaturesStream 串流讀取伺服器的回應。我們會使用 ListFeaturesStreammessage() 方法,重複讀取伺服器對回應通訊協定緩衝區物件 (在本例中為 Feature) 的回應,直到沒有其他訊息為止。

用戶端串流 RPC

在這裡,我們將點向量轉換為串流。record_route接著,我們會將這個串流做為要求傳遞至 record_route(),並在伺服器完全處理串流後,取得單一 RouteSummary 回應。

async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let mut rng = rand::rng();
    let point_count: i32 = rng.random_range(2..100);

    let mut points = vec![];
    for _ in 0..=point_count {
        points.push(random_point(&mut rng))
    }

    println!("Traversing {} points", points.len());
    let request = Request::new(tokio_stream::iter(points));

    match client.record_route(request).await {
        Ok(response) => {
            let response = response.into_inner();
            println!("SUMMARY: Feature Count = {}, Distance = {}", response.feature_count(), response.distance())},
        Err(e) => println!("something went wrong: {e:?}"),
    }

    Ok(())
}

雙向串流 RPC

最後,我們來看看雙向串流 RPC RouteChat()。我們將寫入的串流要求傳遞至方法,並取得可從中讀取訊息的串流。這次我們透過方法串流傳回值,同時伺服器仍將訊息寫入訊息串流。

async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let start = time::Instant::now();
    let outbound = async_stream::stream! {
        let mut interval = time::interval(Duration::from_secs(1));
        for _ in 0..10 {
            let time = interval.tick().await;
            let elapsed = time.duration_since(start);
            let note = proto!(RouteNote {
                location: proto!(Point {
                    latitude: 409146138 + elapsed.as_secs() as i32,
                    longitude: -746188906,
                }),
                message: format!("at {elapsed:?}"),
            });
            yield note;
        }
    };
    let response = client.route_chat(Request::new(outbound)).await?;
    let mut inbound = response.into_inner();
    while let Some(note) = inbound.message().await? {
        println!("Note: Latitude = {}, Longitude = {}, Message = \"{}\"",
            note.location().latitude(),
            note.location().longitude(),
            note.message());
        }
    Ok(())
}

雖然雙方一律會依撰寫順序收到對方的訊息,但用戶端和伺服器可以依任意順序讀取及寫入資料,因為資料串是完全獨立運作。

呼叫輔助方法

如要呼叫服務方法,我們必須先建立與伺服器通訊的管道。我們會先建立端點,連線至該端點,並在連線至 RouteGuideClient::new() 時傳遞所建立的管道,藉此建立這個項目,如下所示:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel); 
    Ok(())
}

main() 中,執行我們剛建立的方法。

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel);

    println!("\n*** SERVER STREAMING ***");
    print_features(&mut client).await?;

    println!("\n*** CLIENT STREAMING ***");
    run_record_route(&mut client).await?;

    println!("\n*** BIDIRECTIONAL STREAMING ***");
    run_route_chat(&mut client).await?;

    Ok(())
}

7. 立即試用

首先,如要執行用戶端和伺服器,請將它們新增為 Crate 的二進位目標。我們需要相應編輯 Cargo.toml:

[[bin]]
name = "routeguide-server"
path = "src/server/server.rs"

[[bin]]
name = "routeguide-client"
path = "src/client/client.rs"

與任何專案一樣,我們也需要考量程式碼運作所需的依附元件。如果是 Rust 專案,依附元件會位於 Cargo.toml 中。我們已在 Cargo.toml 檔案中列出必要依附元件。

接著,從工作目錄執行下列指令:

  1. 在一個終端機中執行伺服器:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-server 
  1. 從另一個終端機執行用戶端:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-client

您會看到如下所示的輸出內容:

*** SERVER STREAMING ***
FEATURE: Name = "Patriots Path, Mendham, NJ 07945, USA", Lat = 407838351, Lon = -746143763
FEATURE: Name = "101 New Jersey 10, Whippany, NJ 07981, USA", Lat = 408122808, Lon = -743999179
FEATURE: Name = "U.S. 6, Shohola, PA 18458, USA", Lat = 413628156, Lon = -749015468
...
*** CLIENT STREAMING ***
Traversing 86 points
SUMMARY: Feature Count = 0, Distance = 803709356

*** BIDIRECTIONAL STREAMING ***
Note: Latitude = 409146138, Longitude = -746188906, Message = "at 112.45µs"
Note: Latitude = 409146139, Longitude = -746188906, Message = "at 1.00011245s"
Note: Latitude = 409146140, Longitude = -746188906, Message = "at 2.00011245s"

8. 後續步驟