1. 소개
이 Codelab에서는 gRPC-Rust를 사용하여 Rust로 작성된 경로 매핑 애플리케이션의 기반을 형성하는 클라이언트와 서버를 만듭니다.
튜토리얼이 끝나면 gRPC를 사용하여 원격 서버에 연결하여 클라이언트 경로의 기능에 관한 정보를 가져오고, 클라이언트 경로의 요약을 만들고, 서버 및 다른 클라이언트와 트래픽 업데이트와 같은 경로 정보를 교환하는 클라이언트가 있습니다.
서비스는 프로토콜 버퍼 파일에 정의되며, 이 파일은 클라이언트와 서버가 서로 통신할 수 있도록 상용구 코드를 생성하는 데 사용되므로 이 기능을 구현하는 데 드는 시간과 노력을 절약할 수 있습니다.
이 생성된 코드는 서버와 클라이언트 간의 복잡한 통신뿐만 아니라 데이터 직렬화 및 역직렬화도 처리합니다.
학습할 내용
- 프로토콜 버퍼를 사용하여 서비스 API를 정의하는 방법
- 자동 코드 생성을 사용하여 프로토콜 버퍼 정의에서 gRPC 기반 클라이언트와 서버를 빌드하는 방법
- gRPC를 사용한 클라이언트-서버 스트리밍 통신에 대한 이해
이 Codelab은 gRPC를 처음 접하거나 gRPC를 다시 살펴보고 싶은 Rust 개발자 또는 분산 시스템을 빌드하는 데 관심이 있는 모든 사용자를 대상으로 합니다. 이전 gRPC 경험은 필요하지 않습니다.
2. 시작하기 전에
기본 요건
다음이 설치되어 있는지 확인합니다.
코드 가져오기
처음부터 완전히 시작하지 않아도 되도록 이 Codelab에서는 애플리케이션 소스 코드의 스캐폴드를 제공하여 이를 완성할 수 있습니다. 다음 단계에서는 프로토콜 버퍼 컴파일러 플러그인을 사용하여 상용구 gRPC 코드를 생성하는 등 애플리케이션을 완료하는 방법을 보여줍니다.
먼저 Codelab 작업 디렉터리를 만들고 cd
로 이동합니다.
mkdir streaming-grpc-rust-getting-started && cd streaming-grpc-rust-getting-started
Codelab을 다운로드하고 압축을 풉니다.
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \ | tar xvz --strip-components=4 \ grpc-codelabs-1/codelabs/grpc-rust-streaming/start_here
또는 codelab 디렉터리만 포함된 .zip 파일을 다운로드하고 직접 압축을 해제할 수 있습니다.
구현을 입력하는 것을 건너뛰려면 완료된 소스 코드를 GitHub에서 확인하세요.
3. 메시지 및 서비스 정의
첫 번째 단계는 프로토콜 버퍼를 사용하여 애플리케이션의 gRPC 서비스, RPC 메서드, 요청 및 응답 메시지 유형을 정의하는 것입니다. 서비스에서 제공하는 기능:
- 서버가 구현하고 클라이언트가 호출하는 RPC 메서드
ListFeatures
,RecordRoute
,RouteChat
- 위의 메서드를 호출할 때 클라이언트와 서버 간에 교환되는 데이터 구조인 메시지 유형
Point
,Feature
,Rectangle
,RouteNote
,RouteSummary
이러한 RPC 메서드와 메시지 유형은 모두 제공된 소스 코드의 proto/routeguide.proto
파일에 정의됩니다.
프로토콜 버퍼는 일반적으로 protobufs로 알려져 있습니다. gRPC 용어에 대한 자세한 내용은 gRPC의 핵심 개념, 아키텍처, 수명 주기를 참고하세요.
메시지 유형 정의
먼저 RPC에서 사용할 메시지를 정의해 보겠습니다. 소스 코드의 routeguide/route_guide.proto
파일에서 먼저 Point
메시지 유형을 정의합니다. Point
는 지도상의 위도-경도 좌표 쌍을 나타냅니다. 이 Codelab에서는 좌표에 정수를 사용합니다.
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
1
및 2
은 message
구조의 각 필드에 대한 고유 ID 번호입니다.
다음으로 Feature
메시지 유형을 정의합니다. Feature
는 Point
로 지정된 위치에 있는 항목의 이름이나 우편 주소에 string
필드를 사용합니다.
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
다음은 위도-경도 사각형을 나타내는 Rectangle
메시지입니다. 이 메시지는 대각선으로 반대되는 두 점 'lo'와 'hi'로 표시됩니다.
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
또한 특정 시점에 전송된 메시지를 나타내는 RouteNote
메시지도 있습니다.
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
RouteSummary
메시지도 필요합니다. 이 메시지는 다음 섹션에 설명된 RecordRoute
RPC에 대한 응답으로 수신됩니다. 여기에는 수신된 개별 포인트 수, 감지된 특징 수, 각 포인트 간 거리의 누적 합계로 계산된 총 이동 거리가 포함됩니다.
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
서비스 메서드 정의
먼저 서비스를 정의한 다음 메시지를 정의해 보겠습니다. 서비스를 정의하려면 .proto
파일에서 명명된 서비스를 지정합니다. proto/routeguide.proto
파일에는 애플리케이션 서비스에서 제공하는 하나 이상의 메서드를 정의하는 RouteGuide
라는 service
구조가 있습니다.
서비스 정의 내에서 RPC 메서드를 정의하고 요청 및 응답 유형을 지정합니다. Codelab의 이 섹션에서는 다음을 정의합니다.
ListFeatures
지정된 Rectangle
내에서 사용할 수 있는 Feature
을 가져옵니다. 결과는 한 번에 반환되지 않고 스트리밍됩니다 (예: 반복 필드가 있는 응답 메시지). 사각형이 넓은 영역을 포함하고 많은 수의 기능을 포함할 수 있기 때문입니다.
이 RPC에 적합한 유형은 서버 측 스트리밍 RPC입니다. 클라이언트는 서버에 요청을 보내고 일련의 메시지를 다시 읽는 스트림을 가져옵니다. 클라이언트는 메시지가 더 이상 없을 때까지 반환된 스트림을 읽습니다. 예에서 볼 수 있듯이 응답 유형 앞에 stream
키워드를 배치하여 서버 측 스트리밍 메서드를 지정합니다.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
이동 중인 경로에서 Point
스트림을 수락하고 이동이 완료되면 RouteSummary
를 반환합니다.
이 경우 클라이언트 측 스트리밍 RPC가 적합해 보입니다. 클라이언트는 일련의 메시지를 작성한 후 제공된 스트림을 사용하여 서버에 이를 보냅니다. 클라이언트는 메시지 작성을 완료한 후 서버가 메시지를 모두 읽고 응답을 반환할 때까지 대기합니다. 요청 유형 앞에 stream
키워드를 배치하여 클라이언트 측 스트리밍 메서드를 지정합니다.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
경로가 이동되는 동안 전송된 RouteNote
스트림을 수락하고 다른 RouteNote
(예: 다른 사용자로부터)를 수신합니다.
이것이 바로 양방향 스트리밍의 사용 사례입니다. 양방향 스트리밍 RPC에서는 양쪽에서 읽기-쓰기 스트림을 사용하여 일련의 메시지를 보냅니다. 두 스트림은 독립적으로 작동하므로 클라이언트와 서버는 원하는 순서로 읽고 쓸 수 있습니다.
예를 들어 서버는 응답을 작성하기 전에 모든 클라이언트 메시지를 수신할 때까지 기다리거나, 메시지를 읽은 다음 메시지를 작성하거나, 읽기와 쓰기의 다른 조합을 사용할 수 있습니다.
각 스트림의 메시지 순서는 유지됩니다. 요청과 응답 앞에 모두 stream
키워드를 배치하여 이러한 유형의 메서드를 지정합니다.
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. 클라이언트 및 서버 코드 생성
생성된 디렉터리의 .proto
파일에서 생성된 코드는 이미 제공되었습니다.
.proto
파일에서 코드를 직접 생성하거나 .proto
파일을 변경하고 테스트하는 방법을 알아보려면 이 안내를 참고하세요.
생성된 코드에는 다음이 포함됩니다.
- 메시지 유형
Point
,Feature
,Rectangle
,RouteNote
,RouteSummary
의 구조체 정의 - 구현해야 하는 서비스 트레이트:
route_guide_server::RouteGuide
- 서버를 호출하는 데 사용할 클라이언트 유형:
route_guide_client::RouteGuideClient<T>
다음으로 클라이언트가 요청을 보내면 서버가 답변으로 응답할 수 있도록 서버 측에서 메서드를 구현합니다.
5. 서비스 구현
먼저 RouteGuide
서버를 만드는 방법을 살펴보겠습니다. RouteGuide
서비스가 작업을 수행하도록 하는 방법에는 두 가지가 있습니다.
- 서비스 정의에서 생성된 서비스 인터페이스 구현: 서비스의 실제 '작업' 실행
- 클라이언트의 요청을 수신하고 올바른 메서드 구현으로 디스패치하는 gRPC 서버를 실행합니다.
src/server/server.rs
에서 gRPC의 include_generated_proto!
매크로를 통해 생성된 코드를 범위로 가져오고 RouteGuide
트레이트와 Point
를 가져올 수 있습니다.
mod grpc_pb {
grpc::include_generated_proto!("generated", "routeguide");
}
pub use grpc_pb::{
route_guide_server::{RouteGuideServer, RouteGuide},
Point, Feature, Rectangle, RouteNote, RouteSummary
};
서비스를 나타내는 구조체를 정의하는 것부터 시작할 수 있습니다. 지금은 src/server/server.rs
에서 이 작업을 할 수 있습니다.
#[derive(Debug)]
pub struct RouteGuideService {
features: Vec<Feature>,
}
이제 생성된 코드에서 route_guide_server::RouteGuide
트레이트를 구현해야 합니다.
RouteGuide 구현
생성된 RouteGuide
인터페이스를 구현해야 합니다. 구현은 다음과 같습니다. 이 내용은 이미 템플릿에 포함되어 있습니다.
#[tonic::async_trait] impl RouteGuide for RouteGuideService { async fn list_features( &self, request: Request<Rectangle>, ) -> Result<Response<ListFeaturesStream>, Status> { ... } async fn record_route( &self, request: Request<tonic::Streaming<Point>>, ) -> Result<Response<RouteSummary>, Status> { ... } async fn route_chat( &self, request: Request<tonic::Streaming<RouteNote>>, ) -> Result<Response<RouteChatStream>, Status> { ... } }
각 RPC 구현을 자세히 살펴보겠습니다.
서버 측 스트리밍 RPC
ListFeatures
부터 시작해 보겠습니다. 이는 서버 측 스트리밍 RPC이므로 클라이언트에 여러 Feature
를 다시 보내야 합니다.
async fn list_features(
&self,
request: Request<Rectangle>,
) -> Result<Response<ListFeaturesStream>, Status> {
println!("ListFeatures = {:?}", request);
let (tx, rx) = mpsc::channel(4);
let features = self.features.clone();
tokio::spawn(async move {
for feature in &features[..] {
if in_range(&feature.location().to_owned(), request.get_ref()) {
println!(" => send {feature:?}");
tx.send(Ok(feature.clone())).await.unwrap();
}
}
println!(" /// done sending");
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream)))
}
요청 객체 (클라이언트가 Features
을 찾으려는 Rectangle
)가 표시됩니다. 이번에는 값 스트림을 반환해야 합니다. 채널을 만들고 제약 조건을 충족하는 기능을 채널로 전송하는 조회를 실행하는 새로운 비동기 작업을 생성합니다. 채널의 스트림 절반이 tonic::Response
로 래핑되어 호출자에게 반환됩니다.
클라이언트 측 스트리밍 RPC
이제 좀 더 복잡한 클라이언트 측 스트리밍 메서드 RecordRoute
를 살펴보겠습니다. 여기서는 클라이언트에서 Points
스트림을 가져와 여행에 관한 정보가 포함된 단일 RouteSummary
를 반환합니다. 서버가 메시지를 읽고 쓰는 데 사용할 수 있는 스트림을 입력으로 가져옵니다. next()
메서드를 사용하여 클라이언트 메시지를 반복하고 단일 응답을 반환할 수 있습니다.
async fn record_route(
&self,
request: Request<tonic::Streaming<Point>>,
) -> Result<Response<RouteSummary>, Status> {
println!("RecordRoute");
let mut stream = request.into_inner();
let mut summary = RouteSummary::default();
let mut last_point = None;
let now = Instant::now();
while let Some(point) = stream.next().await {
let point = point?;
println!(" ==> Point = {point:?}");
// Increment the point count
summary.set_point_count(summary.point_count() + 1);
// Find features
for feature in &self.features[..] {
if feature.location().latitude() == point.latitude() {
if feature.location().longitude() == point.longitude(){
summary.set_feature_count(summary.feature_count() + 1);
}
}
}
// Calculate the distance
if let Some(ref last_point) = last_point {
let new_dist = summary.distance() + calc_distance(last_point, &point);
summary.set_distance(new_dist);
}
last_point = Some(point);
}
summary.set_elapsed_time(now.elapsed().as_secs() as i32);
Ok(Response::new(summary))
}
메서드 본문에서 스트림의 next()
메서드를 사용하여 더 이상 메시지가 없을 때까지 클라이언트의 요청을 요청 객체 (이 경우 Point
)에 반복적으로 읽어옵니다. None이면 스트림이 여전히 양호하며 계속 읽을 수 있습니다.
양방향 스트리밍 RPC
마지막으로 양방향 스트리밍 RPC RouteChat()
를 살펴보겠습니다.
async fn route_chat(
&self,
request: Request<tonic::Streaming<RouteNote>>,
) -> Result<Response<RouteChatStream>, Status> {
println!("RouteChat");
let mut notes: HashMap<(i32, i32), Vec<RouteNote>> = HashMap::new();
let mut stream = request.into_inner();
let output = async_stream::try_stream! {
while let Some(note) = stream.next().await {
let note = note?;
let location = note.location();
let key = (location.latitude(), location.longitude());
let location_notes = notes.entry(key).or_insert(vec![]);
location_notes.push(note);
for note in location_notes {
yield note.clone();
}
}
};
Ok(Response::new(Box::pin(output)))
}
이번에는 클라이언트 측 스트리밍 예와 마찬가지로 메시지를 읽고 쓰는 데 사용할 수 있는 스트림이 표시됩니다. 하지만 이번에는 클라이언트가 메시지 스트림에 메시지를 계속 쓰는 동안 메서드의 스트림을 통해 값을 반환합니다. 여기서 읽기 및 쓰기 구문은 서버가 RouteChatStream
를 반환한다는 점을 제외하면 클라이언트 스트리밍 메서드와 매우 유사합니다. 각 측면은 항상 작성된 순서대로 다른 측면의 메시지를 받지만 클라이언트와 서버는 어떤 순서로든 읽고 쓸 수 있습니다. 스트림은 완전히 독립적으로 작동합니다.
스트림이 오류를 반환할 수 있음을 나타내는 try_stream!
을 사용하여 output
스트림을 만듭니다.
서버 시작
이 메서드를 구현한 후에는 클라이언트가 실제로 서비스를 사용할 수 있도록 gRPC 서버도 시작해야 합니다. main()
을 입력합니다.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:10000".parse().unwrap();
println!("RouteGuideServer listening on: {addr}");
let route_guide = RouteGuideService {
features: load(),
};
let svc = RouteGuideServer::new(route_guide);
Server::builder().add_service(svc).serve(addr).await?;
Ok(())
}
main()
에서 이루어지는 작업은 다음과 같습니다.
- 클라이언트 요청을 수신하는 데 사용할 포트를 지정합니다.
- 기능이 로드된
RouteGuideService
만들기 - 생성한 서비스를 사용하여
RouteGuideServer::new()
를 사용하여 gRPC 서버의 인스턴스를 만듭니다. - gRPC 서버에 서비스 구현을 등록합니다.
- 프로세스가 종료될 때까지 차단 대기를 실행하기 위해 포트 세부정보와 함께 서버에서
serve()
를 호출합니다.
6. 클라이언트 만들기
이 섹션에서는 src/client/client.rs
에서 RouteGuide 서비스의 Rust 클라이언트를 만드는 방법을 살펴봅니다.
먼저 생성된 코드를 범위로 가져옵니다.
mod grpc_pb {
grpc::include_generated_proto!("generated", "routeguide");
}
use grpc_pb::route_guide_client::RouteGuideClient;
use grpc_pb::{Point, Rectangle, RouteNote};
서비스 메서드 호출
이제 서비스 메서드를 호출하는 방법을 살펴보겠습니다. gRPC-Rust에서 RPC는 차단/동기 모드로 작동합니다. 즉, RPC 호출은 서버가 응답할 때까지 기다리며 응답이나 오류를 반환합니다.
서버 측 스트리밍 RPC
여기에서 지리적 Feature
객체의 스트림을 반환하는 서버 측 스트리밍 메서드 ListFeatures
를 호출합니다.
async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let rectangle = proto!(Rectangle {
lo: proto!(Point {
latitude: 400_000_000,
longitude: -750_000_000,
}),
hi: proto!(Point {
latitude: 420_000_000,
longitude: -730_000_000,
}),
});
let mut stream = client
.list_features(Request::new(rectangle))
.await?
.into_inner();
while let Some(feature) = stream.message().await? {
println!("FEATURE: Name = \"{}\", Lat = {}, Lon = {}",
feature.name(),
feature.location().latitude(),
feature.location().longitude());
}
Ok(())
}
메서드에 요청을 전달하고 ListFeaturesStream
인스턴스를 다시 가져옵니다. 클라이언트는 ListFeaturesStream
스트림을 사용하여 서버의 응답을 읽을 수 있습니다. ListFeaturesStream
의 message()
메서드를 사용하여 더 이상 메시지가 없을 때까지 응답 프로토콜 버퍼 객체 (이 경우 Feature
)에 대한 서버의 응답을 반복적으로 읽습니다.
클라이언트 측 스트리밍 RPC
여기서는 record_route
의 경우 점 벡터를 스트림으로 변환합니다. 그런 다음 이 스트림을 record_route()
에 요청으로 전달하고 서버에서 스트림을 완전히 처리한 후 단일 RouteSummary
응답을 가져옵니다.
async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let mut rng = rand::rng();
let point_count: i32 = rng.random_range(2..100);
let mut points = vec![];
for _ in 0..=point_count {
points.push(random_point(&mut rng))
}
println!("Traversing {} points", points.len());
let request = Request::new(tokio_stream::iter(points));
match client.record_route(request).await {
Ok(response) => {
let response = response.into_inner();
println!("SUMMARY: Feature Count = {}, Distance = {}", response.feature_count(), response.distance())},
Err(e) => println!("something went wrong: {e:?}"),
}
Ok(())
}
양방향 스트리밍 RPC
마지막으로 양방향 스트리밍 RPC RouteChat()
를 살펴보겠습니다. 메서드에 쓰기 작업을 수행하는 스트림 요청을 전달하고 메시지를 읽을 수 있는 스트림을 다시 가져옵니다. 이번에는 서버가 메시지 스트림에 메시지를 쓰는 동안 메서드의 스트림을 통해 값을 반환합니다.
async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let start = time::Instant::now();
let outbound = async_stream::stream! {
let mut interval = time::interval(Duration::from_secs(1));
for _ in 0..10 {
let time = interval.tick().await;
let elapsed = time.duration_since(start);
let note = proto!(RouteNote {
location: proto!(Point {
latitude: 409146138 + elapsed.as_secs() as i32,
longitude: -746188906,
}),
message: format!("at {elapsed:?}"),
});
yield note;
}
};
let response = client.route_chat(Request::new(outbound)).await?;
let mut inbound = response.into_inner();
while let Some(note) = inbound.message().await? {
println!("Note: Latitude = {}, Longitude = {}, Message = \"{}\"",
note.location().latitude(),
note.location().longitude(),
note.message());
}
Ok(())
}
각 측면은 항상 작성된 순서대로 다른 측면의 메시지를 받지만 클라이언트와 서버는 어떤 순서로든 읽고 쓸 수 있습니다. 스트림은 완전히 독립적으로 작동합니다.
도우미 메서드 호출
서비스 메서드를 호출하려면 먼저 서버와 통신할 채널을 만들어야 합니다. 엔드포인트를 먼저 만들고 해당 엔드포인트에 연결한 다음 연결 시 생성된 채널을 RouteGuideClient::new()
에 전달하여 이를 만듭니다.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoint to connect to
let endpoint = Endpoint::new("http://[::1]:10000")?;
let channel = endpoint.connect().await?;
// Create a new client
let mut client = RouteGuideClient::new(channel);
Ok(())
}
main()
에서 방금 만든 메서드를 실행합니다.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoint to connect to
let endpoint = Endpoint::new("http://[::1]:10000")?;
let channel = endpoint.connect().await?;
// Create a new client
let mut client = RouteGuideClient::new(channel);
println!("\n*** SERVER STREAMING ***");
print_features(&mut client).await?;
println!("\n*** CLIENT STREAMING ***");
run_record_route(&mut client).await?;
println!("\n*** BIDIRECTIONAL STREAMING ***");
run_route_chat(&mut client).await?;
Ok(())
}
7. 사용해 보기
먼저 클라이언트와 서버를 실행하기 위해 크레이트에 바이너리 타겟으로 추가해 보겠습니다. Cargo.toml을 다음과 같이 수정해야 합니다.
[[bin]]
name = "routeguide-server"
path = "src/server/server.rs"
[[bin]]
name = "routeguide-client"
path = "src/client/client.rs"
모든 프로젝트와 마찬가지로 코드가 작동하는 데 필요한 종속 항목도 고려해야 합니다. Rust 프로젝트의 경우 종속 항목이 Cargo.toml
에 있습니다. 필요한 종속 항목은 이미 Cargo.toml
파일에 나열되어 있습니다.
그런 다음 작업 디렉터리에서 다음 명령어를 실행합니다.
- 한 터미널에서 서버를 실행합니다.
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-server
- 다른 터미널에서 클라이언트를 실행합니다.
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-client
다음과 같은 출력이 표시됩니다.
*** SERVER STREAMING *** FEATURE: Name = "Patriots Path, Mendham, NJ 07945, USA", Lat = 407838351, Lon = -746143763 FEATURE: Name = "101 New Jersey 10, Whippany, NJ 07981, USA", Lat = 408122808, Lon = -743999179 FEATURE: Name = "U.S. 6, Shohola, PA 18458, USA", Lat = 413628156, Lon = -749015468 ... *** CLIENT STREAMING *** Traversing 86 points SUMMARY: Feature Count = 0, Distance = 803709356 *** BIDIRECTIONAL STREAMING *** Note: Latitude = 409146138, Longitude = -746188906, Message = "at 112.45µs" Note: Latitude = 409146139, Longitude = -746188906, Message = "at 1.00011245s" Note: Latitude = 409146140, Longitude = -746188906, Message = "at 2.00011245s"