1. 簡介
在本程式碼研究室中,您將使用 gRPC-Rust 建立用戶端和伺服器,做為以 Rust 編寫的路線對應應用程式基礎。
完成本教學課程後,您將擁有一個用戶端,可使用 gRPC 連線至遠端伺服器,取得用戶端路線上的功能資訊、建立用戶端路線摘要,以及與伺服器和其他用戶端交換路線資訊 (例如交通資訊更新)。
服務定義於 Protocol Buffers 檔案中,用於產生用戶端和伺服器的樣板程式碼,以便彼此通訊,節省您實作該功能的時間和精力。
這段產生的程式碼不僅會處理伺服器與用戶端之間複雜的通訊,也會處理資料序列化和還原序列化。
課程內容
- 如何使用通訊協定緩衝區定義服務 API。
- 如何使用自動程式碼生成功能,從通訊協定緩衝區定義建構以 gRPC 為基礎的用戶端和伺服器。
- 瞭解如何使用 gRPC 進行用戶端與伺服器之間的串流通訊。
這個程式碼研究室適合剛接觸 gRPC 的 Rust 開發人員、想複習 gRPC 的開發人員,以及對建構分散式系統感興趣的任何人。不需要有 gRPC 相關經驗。
2. 事前準備
必要條件
請確認已安裝下列項目:
取得程式碼
為避免您必須從頭開始,本程式碼研究室提供應用程式原始碼的架構,供您完成。下列步驟將說明如何完成應用程式,包括使用通訊協定緩衝區編譯器外掛程式產生樣板 gRPC 程式碼。
首先,建立程式碼研究室工作目錄,然後 cd
到該目錄:
mkdir streaming-grpc-rust-getting-started && cd streaming-grpc-rust-getting-started
下載並擷取程式碼研究室:
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \ | tar xvz --strip-components=4 \ grpc-codelabs-1/codelabs/grpc-rust-streaming/start_here
或者,您也可以下載只包含 Codelab 目錄的 .zip 檔案,然後手動解壓縮。
如要略過實作的輸入作業,可以前往 GitHub 取得完整的原始碼。
3. 定義訊息和服務
首先,請使用通訊協定緩衝區定義應用程式的 gRPC 服務、RPC 方法,以及要求和回應訊息類型。你的服務將提供:
- 伺服器實作且用戶端呼叫的 RPC 方法 (
ListFeatures
、RecordRoute
和RouteChat
)。 - 訊息類型
Point
、Feature
、Rectangle
、RouteNote
和RouteSummary
,這些是呼叫上述方法時,用戶端和伺服器之間交換的資料結構。
這些 RPC 方法及其訊息類型都會在提供的原始碼 proto/routeguide.proto
檔案中定義。
通訊協定緩衝區通常稱為 protobuf。如要進一步瞭解 gRPC 術語,請參閱 gRPC 的「核心概念、架構和生命週期」。
定義訊息類型
首先,請定義 RPC 會使用的訊息。在原始碼的 routeguide/route_guide.proto
檔案中,請先定義 Point
訊息型別。Point
代表地圖上的經緯度座標組合。在本程式碼研究室中,請使用整數做為座標:
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
數字 1
和 2
是 message
結構中每個欄位的專屬 ID 編號。
接著,定義 Feature
訊息類型。Feature
會使用 string
欄位,指定 Point
所指定位置的名稱或郵寄地址:
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
接著是 Rectangle
訊息,代表經緯度矩形,以兩個對角點「lo」和「hi」表示。
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
此外,RouteNote
訊息代表在特定時間點傳送的訊息。
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
我們也需要 RouteSummary
訊息。這則訊息是針對 RecordRoute
RPC 收到的回應,下一節將說明這項 RPC。其中包含收到的個別點數、偵測到的特徵數量,以及每個點之間距離的累計總和,也就是涵蓋的總距離。
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
定義服務方法
我們先定義服務,稍後再定義訊息。如要定義服務,請在 .proto
檔案中指定具名服務。proto/routeguide.proto
檔案具有名為 RouteGuide
的 service
結構,可定義應用程式服務提供的一或多個方法。
在服務定義中定義 RPC 方法,並指定要求和回應類型。在本程式碼研究室的這一節中,我們將定義:
ListFeatures
取得指定 Rectangle
內可用的 Feature
。由於矩形可能涵蓋大範圍,並包含大量特徵,因此系統會串流處理結果,而不是一次傳回 (例如在含有重複欄位的訊息中)。
這個 RPC 的適當類型是伺服器端串流 RPC:用戶端會將要求傳送至伺服器,並取得串流來讀取一系列訊息。用戶端會從傳回的串流讀取訊息,直到沒有更多訊息為止。如範例所示,您可以在回應型別前加上 stream
關鍵字,指定伺服器端串流方法。
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
接受所遍歷路徑上的 Point
串流,並在遍歷完成時傳回 RouteSummary
。
在這種情況下,用戶端串流 RPC 似乎很合適:用戶端會寫入一系列訊息並傳送至伺服器,同樣是使用提供的串流。用戶端寫完訊息後,會等待伺服器讀取所有訊息並傳回回應。如要指定用戶端串流方法,請在要求型別前加上 stream
關鍵字。
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
接受在路線遍歷期間傳送的 RouteNote
串流,同時接收其他 RouteNote
(例如來自其他使用者)。
這正是雙向串流的適用用途。雙向串流 RPC 的兩端都會使用讀寫串流傳送一連串訊息。這兩個串流各自獨立運作,因此用戶端和伺服器可以依任何順序讀取和寫入。
舉例來說,伺服器可以等待接收所有用戶端訊息,再撰寫回覆內容;也可以讀取訊息,然後撰寫訊息;或是讀取和撰寫訊息的某種組合。
每個串流中的訊息順序都會保留。如要指定這類方法,請在要求和回應前加上 stream
關鍵字。
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. 產生用戶端和伺服器程式碼
我們已在產生的目錄中,提供 .proto
檔案產生的程式碼。
如要瞭解如何自行從 .proto
檔案產生程式碼,或對 .proto
檔案進行任何變更並測試,請參閱這些操作說明。
產生的程式碼包含:
- 訊息類型
Point
、Feature
、Rectangle
、RouteNote
和RouteSummary
的結構體定義。 - 我們需要實作的服務特徵:
route_guide_server::RouteGuide
。 - 我們將用來呼叫伺服器的用戶端類型:
route_guide_client::RouteGuideClient<T>
。
接著,我們會在伺服器端實作方法,以便在用戶端傳送要求時,伺服器可以回覆答案。
5. 實作服務
首先,我們來看看如何建立 RouteGuide
伺服器。如要讓 RouteGuide
服務正常運作,需要完成以下兩個步驟:
- 實作從服務定義產生的服務介面:執行服務的實際「工作」。
- 執行 gRPC 伺服器,監聽來自用戶端的要求,並將要求分派至正確的方法實作。
在 src/server/server.rs
中,我們可以透過 gRPC 的 include_generated_proto!
巨集,將產生的程式碼帶入範圍,並匯入 RouteGuide
特徵和 Point
。
mod grpc_pb {
grpc::include_generated_proto!("generated", "routeguide");
}
pub use grpc_pb::{
route_guide_server::{RouteGuideServer, RouteGuide},
Point, Feature, Rectangle, RouteNote, RouteSummary
};
首先,我們可以定義一個結構體來代表服務。目前可以在 src/server/server.rs
上執行這項操作:
#[derive(Debug)]
pub struct RouteGuideService {
features: Vec<Feature>,
}
現在,我們需要從產生的程式碼實作 route_guide_server::RouteGuide
特徵。
實作 RouteGuide
我們需要實作產生的 RouteGuide
介面。實作方式如下所示。範本中已有這項功能。
#[tonic::async_trait] impl RouteGuide for RouteGuideService { async fn list_features( &self, request: Request<Rectangle>, ) -> Result<Response<ListFeaturesStream>, Status> { ... } async fn record_route( &self, request: Request<tonic::Streaming<Point>>, ) -> Result<Response<RouteSummary>, Status> { ... } async fn route_chat( &self, request: Request<tonic::Streaming<RouteNote>>, ) -> Result<Response<RouteChatStream>, Status> { ... } }
接著詳細瞭解各項 RPC 實作。
伺服器端串流 RPC
首先播放《ListFeatures
》。這是伺服器端串流 RPC,因此我們需要將多個 Feature
傳送回用戶端。
async fn list_features(
&self,
request: Request<Rectangle>,
) -> Result<Response<ListFeaturesStream>, Status> {
println!("ListFeatures = {:?}", request);
let (tx, rx) = mpsc::channel(4);
let features = self.features.clone();
tokio::spawn(async move {
for feature in &features[..] {
if in_range(&feature.location().to_owned(), request.get_ref()) {
println!(" => send {feature:?}");
tx.send(Ok(feature.clone())).await.unwrap();
}
}
println!(" /// done sending");
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream)))
}
如您所見,我們會取得要求物件 (用戶端想在 Rectangle
中尋找 Features
)。這次我們需要傳回值串流。我們會建立管道並產生新的非同步工作,在其中執行查閱作業,然後將符合限制的特徵傳送至管道。系統會將管道的 Stream 半部包裝在 tonic::Response
中,然後傳回給呼叫端。
用戶端串流 RPC
現在來看看稍微複雜一點的內容:用戶端串流方法 RecordRoute
,我們會從用戶端取得 Points
串流,並傳回單一 RouteSummary
,其中包含行程資訊。這個函式會取得串流做為輸入內容,伺服器可使用該串流讀取及寫入訊息。它可以透過 next()
方法疊代處理用戶端訊息,並傳回單一回應。
async fn record_route(
&self,
request: Request<tonic::Streaming<Point>>,
) -> Result<Response<RouteSummary>, Status> {
println!("RecordRoute");
let mut stream = request.into_inner();
let mut summary = RouteSummary::default();
let mut last_point = None;
let now = Instant::now();
while let Some(point) = stream.next().await {
let point = point?;
println!(" ==> Point = {point:?}");
// Increment the point count
summary.set_point_count(summary.point_count() + 1);
// Find features
for feature in &self.features[..] {
if feature.location().latitude() == point.latitude() {
if feature.location().longitude() == point.longitude(){
summary.set_feature_count(summary.feature_count() + 1);
}
}
}
// Calculate the distance
if let Some(ref last_point) = last_point {
let new_dist = summary.distance() + calc_distance(last_point, &point);
summary.set_distance(new_dist);
}
last_point = Some(point);
}
summary.set_elapsed_time(now.elapsed().as_secs() as i32);
Ok(Response::new(summary))
}
在方法主體中,我們會使用串流的 next()
方法,重複讀取用戶端對要求物件 (本例中為 Point
) 的要求,直到沒有其他訊息為止。如果為 None,表示串流仍正常,可以繼續讀取。
雙向串流 RPC
最後,我們來看看雙向串流 RPC RouteChat()
。
async fn route_chat(
&self,
request: Request<tonic::Streaming<RouteNote>>,
) -> Result<Response<RouteChatStream>, Status> {
println!("RouteChat");
let mut notes: HashMap<(i32, i32), Vec<RouteNote>> = HashMap::new();
let mut stream = request.into_inner();
let output = async_stream::try_stream! {
while let Some(note) = stream.next().await {
let note = note?;
let location = note.location();
let key = (location.latitude(), location.longitude());
let location_notes = notes.entry(key).or_insert(vec![]);
location_notes.push(note);
for note in location_notes {
yield note.clone();
}
}
};
Ok(Response::new(Box::pin(output)))
}
這次我們取得的串流與用戶端串流範例中的串流相同,可用於讀取及寫入訊息。不過,這次我們會在用戶端仍將訊息寫入訊息串流時,透過方法串流傳回值。這裡的讀取和寫入語法與用戶端串流方法非常類似,但伺服器會傳回 RouteChatStream
。雖然雙方一律會依撰寫順序收到對方的訊息,但用戶端和伺服器可以依任意順序讀取及寫入資料,因為資料串是完全獨立運作。
我們使用 try_stream!
建立 output
串流,表示串流可能會傳回錯誤。
啟動伺服器
實作這個方法後,我們也需要啟動 gRPC 伺服器,讓用戶端實際使用我們的服務。填入 main()
。
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:10000".parse().unwrap();
println!("RouteGuideServer listening on: {addr}");
let route_guide = RouteGuideService {
features: load(),
};
let svc = RouteGuideServer::new(route_guide);
Server::builder().add_service(svc).serve(addr).await?;
Ok(())
}
以下是 main()
的運作方式:
- 指定要用來接聽用戶端要求的通訊埠
- 建立載入功能的
RouteGuideService
- 使用我們建立的服務,透過
RouteGuideServer::new()
建立 gRPC 伺服器執行個體。 - 向 gRPC 伺服器註冊服務實作。
- 使用我們的連接埠詳細資料在伺服器上呼叫
serve()
,進行封鎖等待,直到程序終止為止。
6. 建立用戶端
在本節中,我們將說明如何在 src/client/client.rs
中為 RouteGuide 服務建立 Rust 用戶端。
首先,將產生的程式碼納入範圍。
mod grpc_pb {
grpc::include_generated_proto!("generated", "routeguide");
}
use grpc_pb::route_guide_client::RouteGuideClient;
use grpc_pb::{Point, Rectangle, RouteNote};
呼叫服務方法
接著來看看如何呼叫服務方法。在 gRPC-Rust 中,RPC 會以封鎖/同步模式運作,也就是說,RPC 呼叫會等待伺服器回應,並傳回回應或錯誤。
伺服器端串流 RPC
我們在此呼叫伺服器端串流方法 ListFeatures
,該方法會傳回地理 Feature
物件的串流。
async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let rectangle = proto!(Rectangle {
lo: proto!(Point {
latitude: 400_000_000,
longitude: -750_000_000,
}),
hi: proto!(Point {
latitude: 420_000_000,
longitude: -730_000_000,
}),
});
let mut stream = client
.list_features(Request::new(rectangle))
.await?
.into_inner();
while let Some(feature) = stream.message().await? {
println!("FEATURE: Name = \"{}\", Lat = {}, Lon = {}",
feature.name(),
feature.location().latitude(),
feature.location().longitude());
}
Ok(())
}
我們將要求傳遞給這個方法,並取得 ListFeaturesStream
的例項。用戶端可以使用 ListFeaturesStream
串流讀取伺服器的回應。我們會使用 ListFeaturesStream
的 message()
方法,重複讀取伺服器對回應通訊協定緩衝區物件 (在本例中為 Feature
) 的回應,直到沒有其他訊息為止。
用戶端串流 RPC
在這裡,我們將點向量轉換為串流。record_route
接著,我們會將這個串流做為要求傳遞至 record_route()
,並在伺服器完全處理串流後,取得單一 RouteSummary
回應。
async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let mut rng = rand::rng();
let point_count: i32 = rng.random_range(2..100);
let mut points = vec![];
for _ in 0..=point_count {
points.push(random_point(&mut rng))
}
println!("Traversing {} points", points.len());
let request = Request::new(tokio_stream::iter(points));
match client.record_route(request).await {
Ok(response) => {
let response = response.into_inner();
println!("SUMMARY: Feature Count = {}, Distance = {}", response.feature_count(), response.distance())},
Err(e) => println!("something went wrong: {e:?}"),
}
Ok(())
}
雙向串流 RPC
最後,我們來看看雙向串流 RPC RouteChat()
。我們將寫入的串流要求傳遞至方法,並取得可從中讀取訊息的串流。這次我們透過方法串流傳回值,同時伺服器仍將訊息寫入訊息串流。
async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let start = time::Instant::now();
let outbound = async_stream::stream! {
let mut interval = time::interval(Duration::from_secs(1));
for _ in 0..10 {
let time = interval.tick().await;
let elapsed = time.duration_since(start);
let note = proto!(RouteNote {
location: proto!(Point {
latitude: 409146138 + elapsed.as_secs() as i32,
longitude: -746188906,
}),
message: format!("at {elapsed:?}"),
});
yield note;
}
};
let response = client.route_chat(Request::new(outbound)).await?;
let mut inbound = response.into_inner();
while let Some(note) = inbound.message().await? {
println!("Note: Latitude = {}, Longitude = {}, Message = \"{}\"",
note.location().latitude(),
note.location().longitude(),
note.message());
}
Ok(())
}
雖然雙方一律會依撰寫順序收到對方的訊息,但用戶端和伺服器可以依任意順序讀取及寫入資料,因為資料串是完全獨立運作。
呼叫輔助方法
如要呼叫服務方法,我們必須先建立與伺服器通訊的管道。我們會先建立端點,連線至該端點,並在連線至 RouteGuideClient::new()
時傳遞所建立的管道,藉此建立這個項目,如下所示:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoint to connect to
let endpoint = Endpoint::new("http://[::1]:10000")?;
let channel = endpoint.connect().await?;
// Create a new client
let mut client = RouteGuideClient::new(channel);
Ok(())
}
在 main()
中,執行我們剛建立的方法。
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoint to connect to
let endpoint = Endpoint::new("http://[::1]:10000")?;
let channel = endpoint.connect().await?;
// Create a new client
let mut client = RouteGuideClient::new(channel);
println!("\n*** SERVER STREAMING ***");
print_features(&mut client).await?;
println!("\n*** CLIENT STREAMING ***");
run_record_route(&mut client).await?;
println!("\n*** BIDIRECTIONAL STREAMING ***");
run_route_chat(&mut client).await?;
Ok(())
}
7. 立即試用
首先,如要執行用戶端和伺服器,請將它們新增為 Crate 的二進位目標。我們需要相應編輯 Cargo.toml:
[[bin]]
name = "routeguide-server"
path = "src/server/server.rs"
[[bin]]
name = "routeguide-client"
path = "src/client/client.rs"
與任何專案一樣,我們也需要考量程式碼運作所需的依附元件。如果是 Rust 專案,依附元件會位於 Cargo.toml
中。我們已在 Cargo.toml
檔案中列出必要依附元件。
接著,從工作目錄執行下列指令:
- 在一個終端機中執行伺服器:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-server
- 從另一個終端機執行用戶端:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-client
您會看到如下所示的輸出內容:
*** SERVER STREAMING *** FEATURE: Name = "Patriots Path, Mendham, NJ 07945, USA", Lat = 407838351, Lon = -746143763 FEATURE: Name = "101 New Jersey 10, Whippany, NJ 07981, USA", Lat = 408122808, Lon = -743999179 FEATURE: Name = "U.S. 6, Shohola, PA 18458, USA", Lat = 413628156, Lon = -749015468 ... *** CLIENT STREAMING *** Traversing 86 points SUMMARY: Feature Count = 0, Distance = 803709356 *** BIDIRECTIONAL STREAMING *** Note: Latitude = 409146138, Longitude = -746188906, Message = "at 112.45µs" Note: Latitude = 409146139, Longitude = -746188906, Message = "at 1.00011245s" Note: Latitude = 409146140, Longitude = -746188906, Message = "at 2.00011245s"