開始使用 gRPC-Python - 串流

1. 簡介

在本程式碼研究室中,您將使用 gRPC-Python 建立用戶端和伺服器,做為以 Python 編寫的路線對應應用程式基礎。

完成本教學課程後,您將擁有一個用戶端,可使用 gRPC 連線至遠端伺服器,取得用戶端路線上的功能資訊、建立用戶端路線摘要,以及與伺服器和其他用戶端交換路線資訊 (例如交通資訊更新)。

服務定義於 Protocol Buffers 檔案中,用於產生用戶端和伺服器的樣板程式碼,以便彼此通訊,節省您實作該功能的時間和精力。

這段產生的程式碼不僅會處理伺服器與用戶端之間複雜的通訊,也會處理資料序列化和還原序列化。

課程內容

  • 如何使用通訊協定緩衝區定義服務 API。
  • 如何使用自動程式碼生成功能,從通訊協定緩衝區定義建構以 gRPC 為基礎的用戶端和伺服器。
  • 瞭解 gRPC 的用戶端與伺服器串流通訊。

這個程式碼研究室適用於剛接觸 gRPC 或想複習 gRPC 的 Python 開發人員,以及有興趣建構分散式系統的任何人。不需要有 gRPC 相關經驗。

2. 事前準備

軟硬體需求

  • Python 3.9 以上版本。建議使用 Python 3.13。如需特定平台的安裝說明,請參閱「Python 設定和使用方式」。或者,您也可以使用 uv pyenv 等工具,安裝非系統 Python。
  • pip 安裝 Python 套件。
  • venv 建立 Python 虛擬環境。

ensurepipvenv 套件是 Python 標準程式庫的一部分,通常預設會提供。

不過,部分以 Debian 為基礎的發行版本 (包括 Ubuntu) 會選擇在重新發布 Python 時排除這些檔案。如要安裝套件,請執行:

sudo apt install python3-pip python3-venv

取得程式碼

為簡化學習過程,本程式碼研究室提供預先建構的原始碼架構,協助您踏出第一步。下列步驟將引導您完成應用程式,包括使用 grpc_tools.protoc Protocol Buffer 編譯器外掛程式產生 gRPC 程式碼。

grpc-codelabs

本程式碼研究室的架構原始碼位於 codelabs/grpc-python-streaming/start_here 目錄。如果您不想自行導入程式碼,完成的原始碼會放在 completed 目錄中。

首先,請建立程式碼研究室工作目錄,然後 cd 到該目錄:

mkdir grpc-python-streaming && cd grpc-python-streaming

下載並擷取程式碼研究室:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-python-streaming/start_here

或者,您也可以下載只包含 Codelab 目錄的 .zip 檔案,然後手動解壓縮。

3. 定義訊息和服務

首先,請使用通訊協定緩衝區定義應用程式的 gRPC 服務、RPC 方法,以及要求和回應訊息類型。你的服務將提供:

  • 伺服器實作且用戶端呼叫的 RPC 方法:ListFeaturesRecordRouteRouteChat
  • 訊息類型 PointFeatureRectangleRouteNoteRouteSummary,這些是呼叫 RPC 方法時,用戶端和伺服器之間交換的資料結構。

這些 RPC 方法及其訊息類型都會在提供的原始碼 protos/route_guide.proto 檔案中定義。

通訊協定緩衝區通常稱為 protobuf。如要進一步瞭解 gRPC 術語,請參閱 gRPC 的「核心概念、架構和生命週期」。

定義訊息類型

在原始碼的 protos/route_guide.proto 檔案中,請先定義 Point 訊息型別。Point 代表地圖上的經緯度座標組合。在本程式碼研究室中,請使用整數做為座標:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

數字 12message 結構中每個欄位的專屬 ID 號碼。

接著,定義 Feature 訊息類型。Feature 會使用 string 欄位,指定 Point 所指定位置的名稱或郵寄地址:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

如要將區域內的多個點串流至用戶端,您需要 Rectangle 訊息,代表以兩個對角點 lohi 表示的經緯度矩形:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

此外,RouteNote 訊息代表在特定時間點傳送的訊息:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

最後,您需要 RouteSummary 訊息。這則訊息是針對 RecordRoute RPC 收到的回應,下一節將說明這項 RPC。其中包含收到的個別點數、偵測到的特徵數量,以及每個點之間距離的累計總和,也就是涵蓋的總距離。

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

定義服務方法

如要定義服務,請在 .proto 檔案中指定具名服務。route_guide.proto 檔案具有名為 RouteGuideservice 結構,可定義應用程式服務提供的一或多個方法。

在服務定義中定義 RPC 方法時,請指定要求和回應類型。在本程式碼研究室的這一節中,我們將定義:

ListFeatures

取得指定 Rectangle 中可用的 Feature 物件。由於矩形可能涵蓋大範圍區域,並包含大量特徵,因此系統會串流傳回結果,而非一次傳回。

在這個應用程式中,您將使用伺服器端串流 RPC:用戶端會將要求傳送至伺服器,並取得串流來讀取一系列訊息。用戶端會從傳回的串流讀取訊息,直到沒有更多訊息為止。如範例所示,您可以在回應型別前加上 stream 關鍵字,指定伺服器端串流方法。

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

接受所遍歷路徑上的 Points 串流,並在遍歷完成時傳回 RouteSummary

在這種情況下,用戶端串流 RPC 是合適的選擇:用戶端會寫入一連串訊息並傳送至伺服器,同樣是使用提供的串流。用戶端寫完訊息後,會等待伺服器讀取所有訊息並傳回回應。如要指定用戶端串流方法,請在要求類型前放置串流關鍵字。

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

接受在路線遍歷期間傳送的 RouteNotes 串流,同時接收其他 RouteNotes (例如來自其他使用者)。

這正是雙向串流的適用情境。雙向串流 RPC,雙方都會使用讀寫串流傳送一連串訊息。這兩個串流會獨立運作,因此用戶端和伺服器可以任意順序讀取和寫入資料。舉例來說,伺服器可以等待接收所有用戶端訊息,再寫入回應;也可以交替讀取和寫入訊息;或是以其他方式組合讀取和寫入作業。每個串流中的訊息順序都會保留。如要指定這類方法,請在要求和回應前加上串流關鍵字。

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. 產生用戶端和伺服器程式碼

接著,使用通訊協定緩衝區編譯器,從 .proto 檔案產生用戶端和伺服器的樣板 gRPC 程式碼。

我們建立了 grpcio-tools,用於生成 gRPC Python 程式碼。It includes:

  1. 可從 message 定義產生 Python 程式碼的標準 protoc 編譯器。
  2. gRPC protobuf 外掛程式,可從 service 定義產生 Python 程式碼 (用戶端和伺服器存根)。

我們將使用 pip 安裝 grpcio-tools Python 套件。讓我們建立新的 Python 虛擬環境 (venv),將專案的依附元件與系統套件區隔開來:

python3 -m venv --upgrade-deps .venv

如要在 bash/zsh 殼層中啟動虛擬環境,請執行下列指令:

source .venv/bin/activate

如為 Windows 和非標準殼層,請參閱 https://docs.python.org/3/library/venv.html#how-venvs-work 中的表格。

接著,請安裝 grpcio-tools (這也會安裝 grpcio 套件):

pip install grpcio-tools

使用下列指令產生 Python 樣板程式碼:

python -m grpc_tools.protoc --proto_path=./protos  \
 --python_out=. --pyi_out=. --grpc_python_out=. \
 ./protos/route_guide.proto

這會為我們在 route_guide.proto 中定義的介面產生下列檔案:

  1. route_guide_pb2.py 包含動態建立類別的程式碼,這些類別是從 message 定義產生。
  2. route_guide_pb2.pyi 是從 message 定義產生的「存根檔案」或「型別提示檔案」。其中只包含沒有實作的簽章。IDE 可使用 Stub 檔案,提供更完善的自動完成和錯誤偵測功能。
  3. route_guide_pb2_grpc.py 是從 service 定義產生,包含 gRPC 專屬的類別和函式。

gRPC 專屬程式碼包含:

  1. RouteGuideStubgRPC 用戶端可使用此檔案叫用 RouteGuide RPC。
  2. RouteGuideServicer,定義 RouteGuide 服務實作的介面。
  3. add_RouteGuideServicer_to_server 函式,用於向 gRPC 伺服器註冊 RouteGuideServicer

5. 建立伺服器

首先,我們來看看如何建立 RouteGuide 伺服器。建立及執行 RouteGuide 伺服器可分為兩項工作:

  • 實作從服務定義產生的服務介面,並使用可執行服務實際「工作」的函式。
  • 執行 gRPC 伺服器,監聽來自用戶端的要求並傳輸回應。

讓我們看看route_guide_server.py

實作 RouteGuide

route_guide_server.py 具有 RouteGuideServicer 類別,可將產生的類別 route_guide_pb2_grpc.RouteGuideServicer 子類別化:

# RouteGuideServicer provides an implementation of the methods of the RouteGuide service.
class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):

RouteGuideServicer 會實作所有 RouteGuide 服務方法。

伺服器端串流 RPC

ListFeatures 是回應串流 RPC,會將多個 Feature 傳送給用戶端:

def ListFeatures(self, request, context):
    """List all features contained within the given Rectangle."""
    left = min(request.lo.longitude, request.hi.longitude)
    right = max(request.lo.longitude, request.hi.longitude)
    top = max(request.lo.latitude, request.hi.latitude)
    bottom = min(request.lo.latitude, request.hi.latitude)
    for feature in self.db:
        lat, lng = feature.location.latitude, feature.location.longitude
        if left <= lng <= right and bottom <= lat <= top:
            yield feature

在這裡,要求訊息是 route_guide_pb2.Rectangle,用戶端要在其中尋找 Feature。這個方法不會傳回單一回應,而是產生零或多個回應。

用戶端串流 RPC

要求串流方法 RecordRoute 會使用要求值的 疊代器,並傳回單一回應值。

def RecordRoute(self, request_iterator, context):
    """Calculate statistics about the trip composed of Points."""
    point_count = 0
    feature_count = 0
    distance = 0.0
    prev_point = None

    start_time = time.time()
    for point in request_iterator:
        point_count += 1
        if get_feature(self.db, point):
            feature_count += 1
        if prev_point:
            distance += get_distance(prev_point, point)
        prev_point = point

    elapsed_time = time.time() - start_time
    return route_guide_pb2.RouteSummary(
        point_count=point_count,
        feature_count=feature_count,
        distance=int(distance),
        elapsed_time=int(elapsed_time),
    )

雙向串流 RPC

最後,我們來看看雙向串流 RPC RouteChat()

def RouteChat(self, request_iterator, context):
    """
    Receive a stream of message/location pairs, and responds with
    a stream of all previous messages for the given location.
    """
    prev_notes = []
    for new_note in request_iterator:
        for prev_note in prev_notes:
            if prev_note.location == new_note.location:
                yield prev_note
        prev_notes.append(new_note)

這個方法的語意結合了要求串流方法和回應串流方法的語意。這個函式會傳遞要求值的疊代器,本身也是回應值的疊代器。

啟動伺服器

實作所有 RouteGuide 方法後,下一步是啟動 gRPC 伺服器,讓用戶端實際使用您的服務:

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    route_guide_pb2_grpc.add_RouteGuideServicer_to_server(
        RouteGuideServicer(),
        server,
    )
    listen_addr = "localhost:50051"
    server.add_insecure_port(listen_addr)
    print(f"Starting server on {listen_addr}")
    server.start()
    server.wait_for_termination()

伺服器 start() 方法不會封鎖。系統會例項化新執行緒來處理要求。呼叫 server.start() 的執行緒通常不會有其他工作要做。在這種情況下,您可以呼叫 server.wait_for_termination(),乾淨地封鎖呼叫執行緒,直到伺服器終止為止。

6. 建立用戶端

讓我們看看route_guide_client.py

建立存根

如要呼叫服務方法,我們必須先建立 stub

我們從 .proto. run() 方法產生的 route_guide_pb2_grpc 模組中,例項化 RouteGuideStub 類別:

with grpc.insecure_channel("localhost:50051") as channel:
    stub = route_guide_pb2_grpc.RouteGuideStub(channel)

請注意,這裡的 channel 用做內容管理工具,解譯器離開 with 區塊後就會自動關閉。

呼叫服務方法

對於傳回單一回應的 RPC 方法 (「response-unary」方法),gRPC Python 同步 (封鎖) 和非同步 (非封鎖) 控制流程語意都支援。如果是回應串流 RPC 方法,呼叫會立即傳回回應值的疊代器。對該疊代器的 next() 方法進行的呼叫會遭到封鎖,直到可從疊代器產生回應為止。

伺服器端串流 RPC

呼叫回應串流 ListFeatures 的方式與處理序列型別類似:

def guide_list_features(stub):
    _lo = route_guide_pb2.Point(latitude=400000000, longitude=-750000000)
    _hi = route_guide_pb2.Point(latitude=420000000, longitude=-730000000)
    rectangle = route_guide_pb2.Rectangle(
        lo=_lo,
        hi=_hi,
    )
    print("Looking for features between 40, -75 and 42, -73")

    features = stub.ListFeatures(rectangle)
    for feature in features:
        print(
            f"Feature called '{feature.name}'"
            f" at {format_point(feature.location)}"
        )

用戶端串流 RPC

呼叫要求串流 RecordRoute 的方式,與將迭代器傳遞至本機方法類似。如同上述也會傳回單一回應的簡單 RPC,這項 RPC 可以同步呼叫:

def guide_record_route(stub):
    feature_list = route_guide_resources.read_route_guide_database()
    route_iterator = generate_route(feature_list)

    route_summary = stub.RecordRoute(route_iterator)
    print(f"Finished trip with {route_summary.point_count} points")
    print(f"Passed {route_summary.feature_count} features")
    print(f"Traveled {route_summary.distance} meters")
    print(f"It took {route_summary.elapsed_time} seconds")

雙向串流 RPC

呼叫雙向串流 RouteChat 時,會結合要求串流和回應串流語意 (服務端也是如此)。

產生要求訊息,然後使用 yield 逐一傳送。

def generate_notes():
    home = route_guide_pb2.Point(latitude=1, longitude=1)
    work = route_guide_pb2.Point(latitude=2, longitude=2)
    notes = [
        make_route_note("Departing from home", home),
        make_route_note("Arrived at work", work),
        make_route_note("Having lunch at work", work),
        make_route_note("Departing from work", work),
        make_route_note("Arrived home", home),
    ]
    for note in notes:
        print(
            f"Sending RouteNote for {format_point(note.location)}:"
            f" {note.message}"
        )
        yield note
        # Sleep to simulate moving from one point to another.
        # Only for demonstrating the order of the messages.
        time.sleep(0.1)

接收及處理伺服器回應:

def guide_route_chat(stub):
    responses = stub.RouteChat(generate_notes())
    for response in responses:
        print(
            "< Found previous note at"
            f" {format_point(response.location)}: {response.message}"
        )

呼叫輔助方法

在執行階段中,執行我們剛建立的方法,並傳遞 stub

print("-------------- ListFeatures --------------")
guide_list_features(stub)
print("-------------- RecordRoute --------------")
guide_record_route(stub)
print("-------------- RouteChat --------------")
guide_route_chat(stub)

7. 立即試用

執行伺服器:

python route_guide_server.py

在其他終端機中再次啟動虛擬環境 (source .venv/bin/activate)),然後執行用戶端:

python route_guide_client.py

讓我們看看輸出內容。

ListFeatures

首先,你會看到功能清單。伺服器會串流傳送每個特徵 (伺服器端串流 RPC),因為伺服器會發現這些特徵位於要求的矩形內:

-------------- ListFeatures --------------
Looking for features between 40, -75 and 42, -73
Feature called 'Patriots Path, Mendham, NJ 07945, USA' at (lat=407838351, lng=-746143763)
Feature called '101 New Jersey 10, Whippany, NJ 07981, USA' at (lat=408122808, lng=-743999179)
Feature called 'U.S. 6, Shohola, PA 18458, USA' at (lat=413628156, lng=-749015468)
Feature called '5 Conners Road, Kingston, NY 12401, USA' at (lat=419999544, lng=-740371136)
...

RecordRoute

其次,RecordRoute 會顯示從用戶端串流至伺服器的隨機造訪點清單(用戶端串流 RPC)

-------------- RecordRoute --------------
Visiting point (lat=410395868, lng=-744972325)
Visiting point (lat=404310607, lng=-740282632)
Visiting point (lat=403966326, lng=-748519297)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=406589790, lng=-743560121)
Visiting point (lat=410322033, lng=-747871659)
Visiting point (lat=415464475, lng=-747175374)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=402647019, lng=-747071791)
Visiting point (lat=414638017, lng=-745957854)

用戶端完成所有造訪點的串流後,會收到伺服器傳送的非串流回應 (一元 RPC)。這項回應會包含對用戶完整路線執行的計算摘要。

Finished trip with 10 points
Passed 10 features
Traveled 654743 meters
It took 0 seconds

RouteChat

最後,RouteChat 輸出內容會示範雙向串流。當用戶端「拜訪」homework 點時,會將 RouteNote 傳送至伺服器,記錄該點的附註。如果某個點已造訪過,伺服器會串流傳回該點的所有先前附註。

-------------- RouteChat --------------
Sending RouteNote for (lat=1, lng=1): Departing from home
Sending RouteNote for (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Having lunch at work
< Found previous note at (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Departing from work
< Found previous note at (lat=2, lng=2): Arrived at work
< Found previous note at (lat=2, lng=2): Having lunch at work
Sending RouteNote for (lat=1, lng=1): Arrived home
< Found previous note at (lat=1, lng=1): Departing from home

8. 後續步驟