1. 簡介
在本程式碼研究室中,您將使用 gRPC-Python 建立用戶端和伺服器,做為以 Python 編寫的路線對應應用程式基礎。
完成本教學課程後,您將擁有一個用戶端,可使用 gRPC 連線至遠端伺服器,取得用戶端路線上的功能資訊、建立用戶端路線摘要,以及與伺服器和其他用戶端交換路線資訊 (例如交通資訊更新)。
服務定義於 Protocol Buffers 檔案中,用於產生用戶端和伺服器的樣板程式碼,以便彼此通訊,節省您實作該功能的時間和精力。
這段產生的程式碼不僅會處理伺服器與用戶端之間複雜的通訊,也會處理資料序列化和還原序列化。
課程內容
- 如何使用通訊協定緩衝區定義服務 API。
- 如何使用自動程式碼生成功能,從通訊協定緩衝區定義建構以 gRPC 為基礎的用戶端和伺服器。
- 瞭解 gRPC 的用戶端與伺服器串流通訊。
這個程式碼研究室適用於剛接觸 gRPC 或想複習 gRPC 的 Python 開發人員,以及有興趣建構分散式系統的任何人。不需要有 gRPC 相關經驗。
2. 事前準備
軟硬體需求
- Python 3.9 以上版本。建議使用 Python 3.13。如需特定平台的安裝說明,請參閱「Python 設定和使用方式」。或者,您也可以使用 uv 或 pyenv 等工具,安裝非系統 Python。
- pip 安裝 Python 套件。
- venv 建立 Python 虛擬環境。
ensurepip
和 venv
套件是 Python 標準程式庫的一部分,通常預設會提供。
不過,部分以 Debian 為基礎的發行版本 (包括 Ubuntu) 會選擇在重新發布 Python 時排除這些檔案。如要安裝套件,請執行:
sudo apt install python3-pip python3-venv
取得程式碼
為簡化學習過程,本程式碼研究室提供預先建構的原始碼架構,協助您踏出第一步。下列步驟將引導您完成應用程式,包括使用 grpc_tools.protoc
Protocol Buffer 編譯器外掛程式產生 gRPC 程式碼。
grpc-codelabs
本程式碼研究室的架構原始碼位於 codelabs/grpc-python-streaming/start_here 目錄。如果您不想自行導入程式碼,完成的原始碼會放在 completed
目錄中。
首先,請建立程式碼研究室工作目錄,然後 cd 到該目錄:
mkdir grpc-python-streaming && cd grpc-python-streaming
下載並擷取程式碼研究室:
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
| tar xvz --strip-components=4 \
grpc-codelabs-1/codelabs/grpc-python-streaming/start_here
或者,您也可以下載只包含 Codelab 目錄的 .zip 檔案,然後手動解壓縮。
3. 定義訊息和服務
首先,請使用通訊協定緩衝區定義應用程式的 gRPC 服務、RPC 方法,以及要求和回應訊息類型。你的服務將提供:
- 伺服器實作且用戶端呼叫的 RPC 方法:
ListFeatures
、RecordRoute
和RouteChat
。 - 訊息類型
Point
、Feature
、Rectangle
、RouteNote
和RouteSummary
,這些是呼叫 RPC 方法時,用戶端和伺服器之間交換的資料結構。
這些 RPC 方法及其訊息類型都會在提供的原始碼 protos/route_guide.proto
檔案中定義。
通訊協定緩衝區通常稱為 protobuf。如要進一步瞭解 gRPC 術語,請參閱 gRPC 的「核心概念、架構和生命週期」。
定義訊息類型
在原始碼的 protos/route_guide.proto
檔案中,請先定義 Point
訊息型別。Point
代表地圖上的經緯度座標組合。在本程式碼研究室中,請使用整數做為座標:
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
數字 1
和 2
是 message
結構中每個欄位的專屬 ID 號碼。
接著,定義 Feature
訊息類型。Feature
會使用 string
欄位,指定 Point
所指定位置的名稱或郵寄地址:
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
如要將區域內的多個點串流至用戶端,您需要 Rectangle
訊息,代表以兩個對角點 lo
和 hi
表示的經緯度矩形:
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
此外,RouteNote
訊息代表在特定時間點傳送的訊息:
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
最後,您需要 RouteSummary
訊息。這則訊息是針對 RecordRoute
RPC 收到的回應,下一節將說明這項 RPC。其中包含收到的個別點數、偵測到的特徵數量,以及每個點之間距離的累計總和,也就是涵蓋的總距離。
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
定義服務方法
如要定義服務,請在 .proto
檔案中指定具名服務。route_guide.proto
檔案具有名為 RouteGuide
的 service
結構,可定義應用程式服務提供的一或多個方法。
在服務定義中定義 RPC
方法時,請指定要求和回應類型。在本程式碼研究室的這一節中,我們將定義:
ListFeatures
取得指定 Rectangle
中可用的 Feature
物件。由於矩形可能涵蓋大範圍區域,並包含大量特徵,因此系統會串流傳回結果,而非一次傳回。
在這個應用程式中,您將使用伺服器端串流 RPC:用戶端會將要求傳送至伺服器,並取得串流來讀取一系列訊息。用戶端會從傳回的串流讀取訊息,直到沒有更多訊息為止。如範例所示,您可以在回應型別前加上 stream 關鍵字,指定伺服器端串流方法。
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
接受所遍歷路徑上的 Points 串流,並在遍歷完成時傳回 RouteSummary
。
在這種情況下,用戶端串流 RPC 是合適的選擇:用戶端會寫入一連串訊息並傳送至伺服器,同樣是使用提供的串流。用戶端寫完訊息後,會等待伺服器讀取所有訊息並傳回回應。如要指定用戶端串流方法,請在要求類型前放置串流關鍵字。
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
接受在路線遍歷期間傳送的 RouteNotes
串流,同時接收其他 RouteNotes
(例如來自其他使用者)。
這正是雙向串流的適用情境。雙向串流 RPC,雙方都會使用讀寫串流傳送一連串訊息。這兩個串流會獨立運作,因此用戶端和伺服器可以任意順序讀取和寫入資料。舉例來說,伺服器可以等待接收所有用戶端訊息,再寫入回應;也可以交替讀取和寫入訊息;或是以其他方式組合讀取和寫入作業。每個串流中的訊息順序都會保留。如要指定這類方法,請在要求和回應前加上串流關鍵字。
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. 產生用戶端和伺服器程式碼
接著,使用通訊協定緩衝區編譯器,從 .proto
檔案產生用戶端和伺服器的樣板 gRPC 程式碼。
我們建立了 grpcio-tools,用於生成 gRPC Python 程式碼。It includes:
- 可從
message
定義產生 Python 程式碼的標準 protoc 編譯器。 - gRPC protobuf 外掛程式,可從
service
定義產生 Python 程式碼 (用戶端和伺服器存根)。
我們將使用 pip 安裝 grpcio-tools
Python 套件。讓我們建立新的 Python 虛擬環境 (venv),將專案的依附元件與系統套件區隔開來:
python3 -m venv --upgrade-deps .venv
如要在 bash/zsh 殼層中啟動虛擬環境,請執行下列指令:
source .venv/bin/activate
如為 Windows 和非標準殼層,請參閱 https://docs.python.org/3/library/venv.html#how-venvs-work 中的表格。
接著,請安裝 grpcio-tools (這也會安裝 grpcio 套件):
pip install grpcio-tools
使用下列指令產生 Python 樣板程式碼:
python -m grpc_tools.protoc --proto_path=./protos \
--python_out=. --pyi_out=. --grpc_python_out=. \
./protos/route_guide.proto
這會為我們在 route_guide.proto
中定義的介面產生下列檔案:
route_guide_pb2.py
包含動態建立類別的程式碼,這些類別是從message
定義產生。route_guide_pb2.pyi
是從message
定義產生的「存根檔案」或「型別提示檔案」。其中只包含沒有實作的簽章。IDE 可使用 Stub 檔案,提供更完善的自動完成和錯誤偵測功能。route_guide_pb2_grpc.py
是從service
定義產生,包含 gRPC 專屬的類別和函式。
gRPC 專屬程式碼包含:
5. 建立伺服器
首先,我們來看看如何建立 RouteGuide
伺服器。建立及執行 RouteGuide
伺服器可分為兩項工作:
- 實作從服務定義產生的服務介面,並使用可執行服務實際「工作」的函式。
- 執行 gRPC 伺服器,監聽來自用戶端的要求並傳輸回應。
讓我們看看route_guide_server.py
。
實作 RouteGuide
route_guide_server.py
具有 RouteGuideServicer
類別,可將產生的類別 route_guide_pb2_grpc.RouteGuideServicer
子類別化:
# RouteGuideServicer provides an implementation of the methods of the RouteGuide service.
class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):
RouteGuideServicer
會實作所有 RouteGuide
服務方法。
伺服器端串流 RPC
ListFeatures
是回應串流 RPC,會將多個 Feature
傳送給用戶端:
def ListFeatures(self, request, context):
"""List all features contained within the given Rectangle."""
left = min(request.lo.longitude, request.hi.longitude)
right = max(request.lo.longitude, request.hi.longitude)
top = max(request.lo.latitude, request.hi.latitude)
bottom = min(request.lo.latitude, request.hi.latitude)
for feature in self.db:
lat, lng = feature.location.latitude, feature.location.longitude
if left <= lng <= right and bottom <= lat <= top:
yield feature
在這裡,要求訊息是 route_guide_pb2.Rectangle
,用戶端要在其中尋找 Feature
。這個方法不會傳回單一回應,而是產生零或多個回應。
用戶端串流 RPC
要求串流方法 RecordRoute
會使用要求值的 疊代器,並傳回單一回應值。
def RecordRoute(self, request_iterator, context):
"""Calculate statistics about the trip composed of Points."""
point_count = 0
feature_count = 0
distance = 0.0
prev_point = None
start_time = time.time()
for point in request_iterator:
point_count += 1
if get_feature(self.db, point):
feature_count += 1
if prev_point:
distance += get_distance(prev_point, point)
prev_point = point
elapsed_time = time.time() - start_time
return route_guide_pb2.RouteSummary(
point_count=point_count,
feature_count=feature_count,
distance=int(distance),
elapsed_time=int(elapsed_time),
)
雙向串流 RPC
最後,我們來看看雙向串流 RPC RouteChat()
:
def RouteChat(self, request_iterator, context):
"""
Receive a stream of message/location pairs, and responds with
a stream of all previous messages for the given location.
"""
prev_notes = []
for new_note in request_iterator:
for prev_note in prev_notes:
if prev_note.location == new_note.location:
yield prev_note
prev_notes.append(new_note)
這個方法的語意結合了要求串流方法和回應串流方法的語意。這個函式會傳遞要求值的疊代器,本身也是回應值的疊代器。
啟動伺服器
實作所有 RouteGuide
方法後,下一步是啟動 gRPC 伺服器,讓用戶端實際使用您的服務:
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
route_guide_pb2_grpc.add_RouteGuideServicer_to_server(
RouteGuideServicer(),
server,
)
listen_addr = "localhost:50051"
server.add_insecure_port(listen_addr)
print(f"Starting server on {listen_addr}")
server.start()
server.wait_for_termination()
伺服器 start()
方法不會封鎖。系統會例項化新執行緒來處理要求。呼叫 server.start()
的執行緒通常不會有其他工作要做。在這種情況下,您可以呼叫 server.wait_for_termination()
,乾淨地封鎖呼叫執行緒,直到伺服器終止為止。
6. 建立用戶端
讓我們看看route_guide_client.py
。
建立存根
如要呼叫服務方法,我們必須先建立 stub。
我們從 .proto.
run()
方法產生的 route_guide_pb2_grpc
模組中,例項化 RouteGuideStub
類別:
with grpc.insecure_channel("localhost:50051") as channel:
stub = route_guide_pb2_grpc.RouteGuideStub(channel)
請注意,這裡的 channel
用做內容管理工具,解譯器離開 with
區塊後就會自動關閉。
呼叫服務方法
對於傳回單一回應的 RPC 方法 (「response-unary」方法),gRPC Python 同步 (封鎖) 和非同步 (非封鎖) 控制流程語意都支援。如果是回應串流 RPC 方法,呼叫會立即傳回回應值的疊代器。對該疊代器的 next()
方法進行的呼叫會遭到封鎖,直到可從疊代器產生回應為止。
伺服器端串流 RPC
呼叫回應串流 ListFeatures
的方式與處理序列型別類似:
def guide_list_features(stub):
_lo = route_guide_pb2.Point(latitude=400000000, longitude=-750000000)
_hi = route_guide_pb2.Point(latitude=420000000, longitude=-730000000)
rectangle = route_guide_pb2.Rectangle(
lo=_lo,
hi=_hi,
)
print("Looking for features between 40, -75 and 42, -73")
features = stub.ListFeatures(rectangle)
for feature in features:
print(
f"Feature called '{feature.name}'"
f" at {format_point(feature.location)}"
)
用戶端串流 RPC
呼叫要求串流 RecordRoute
的方式,與將迭代器傳遞至本機方法類似。如同上述也會傳回單一回應的簡單 RPC,這項 RPC 可以同步呼叫:
def guide_record_route(stub):
feature_list = route_guide_resources.read_route_guide_database()
route_iterator = generate_route(feature_list)
route_summary = stub.RecordRoute(route_iterator)
print(f"Finished trip with {route_summary.point_count} points")
print(f"Passed {route_summary.feature_count} features")
print(f"Traveled {route_summary.distance} meters")
print(f"It took {route_summary.elapsed_time} seconds")
雙向串流 RPC
呼叫雙向串流 RouteChat
時,會結合要求串流和回應串流語意 (服務端也是如此)。
產生要求訊息,然後使用 yield
逐一傳送。
def generate_notes():
home = route_guide_pb2.Point(latitude=1, longitude=1)
work = route_guide_pb2.Point(latitude=2, longitude=2)
notes = [
make_route_note("Departing from home", home),
make_route_note("Arrived at work", work),
make_route_note("Having lunch at work", work),
make_route_note("Departing from work", work),
make_route_note("Arrived home", home),
]
for note in notes:
print(
f"Sending RouteNote for {format_point(note.location)}:"
f" {note.message}"
)
yield note
# Sleep to simulate moving from one point to another.
# Only for demonstrating the order of the messages.
time.sleep(0.1)
接收及處理伺服器回應:
def guide_route_chat(stub):
responses = stub.RouteChat(generate_notes())
for response in responses:
print(
"< Found previous note at"
f" {format_point(response.location)}: {response.message}"
)
呼叫輔助方法
在執行階段中,執行我們剛建立的方法,並傳遞 stub
。
print("-------------- ListFeatures --------------")
guide_list_features(stub)
print("-------------- RecordRoute --------------")
guide_record_route(stub)
print("-------------- RouteChat --------------")
guide_route_chat(stub)
7. 立即試用
執行伺服器:
python route_guide_server.py
在其他終端機中再次啟動虛擬環境 (source .venv/bin/activate)
),然後執行用戶端:
python route_guide_client.py
讓我們看看輸出內容。
ListFeatures
首先,你會看到功能清單。伺服器會串流傳送每個特徵 (伺服器端串流 RPC),因為伺服器會發現這些特徵位於要求的矩形內:
-------------- ListFeatures -------------- Looking for features between 40, -75 and 42, -73 Feature called 'Patriots Path, Mendham, NJ 07945, USA' at (lat=407838351, lng=-746143763) Feature called '101 New Jersey 10, Whippany, NJ 07981, USA' at (lat=408122808, lng=-743999179) Feature called 'U.S. 6, Shohola, PA 18458, USA' at (lat=413628156, lng=-749015468) Feature called '5 Conners Road, Kingston, NY 12401, USA' at (lat=419999544, lng=-740371136) ...
RecordRoute
其次,RecordRoute
會顯示從用戶端串流至伺服器的隨機造訪點清單(用戶端串流 RPC):
-------------- RecordRoute -------------- Visiting point (lat=410395868, lng=-744972325) Visiting point (lat=404310607, lng=-740282632) Visiting point (lat=403966326, lng=-748519297) Visiting point (lat=407586880, lng=-741670168) Visiting point (lat=406589790, lng=-743560121) Visiting point (lat=410322033, lng=-747871659) Visiting point (lat=415464475, lng=-747175374) Visiting point (lat=407586880, lng=-741670168) Visiting point (lat=402647019, lng=-747071791) Visiting point (lat=414638017, lng=-745957854)
用戶端完成所有造訪點的串流後,會收到伺服器傳送的非串流回應 (一元 RPC)。這項回應會包含對用戶完整路線執行的計算摘要。
Finished trip with 10 points Passed 10 features Traveled 654743 meters It took 0 seconds
RouteChat
最後,RouteChat
輸出內容會示範雙向串流。當用戶端「拜訪」home
或 work
點時,會將 RouteNote 傳送至伺服器,記錄該點的附註。如果某個點已造訪過,伺服器會串流傳回該點的所有先前附註。
-------------- RouteChat -------------- Sending RouteNote for (lat=1, lng=1): Departing from home Sending RouteNote for (lat=2, lng=2): Arrived at work Sending RouteNote for (lat=2, lng=2): Having lunch at work < Found previous note at (lat=2, lng=2): Arrived at work Sending RouteNote for (lat=2, lng=2): Departing from work < Found previous note at (lat=2, lng=2): Arrived at work < Found previous note at (lat=2, lng=2): Having lunch at work Sending RouteNote for (lat=1, lng=1): Arrived home < Found previous note at (lat=1, lng=1): Departing from home
8. 後續步驟
- 請參閱「gRPC 簡介」和「核心概念」,瞭解 gRPC 的運作方式
- 逐步完成基礎教學課程
- 探索 Python API 參考資料