gRPC-Python スタートガイド - ストリーミング

1. はじめに

この Codelab では、gRPC-Python を使用して、Python で記述されたルート マッピング アプリケーションの基盤となるクライアントとサーバーを作成します。

このチュートリアルを完了すると、gRPC を使用してリモート サーバーに接続し、クライアントのルート上の機能に関する情報を取得し、クライアントのルートの概要を作成し、サーバーや他のクライアントとトラフィックの更新などのルート情報を交換するクライアントが作成されます。

サービスは Protocol Buffers ファイルで定義されます。このファイルは、クライアントとサーバーが相互に通信できるように、クライアントとサーバーのボイラープレート コードを生成するために使用されます。これにより、この機能を実装する時間と労力を節約できます。

この生成されたコードは、サーバーとクライアント間の通信の複雑さだけでなく、データのシリアル化と逆シリアル化も処理します。

学習内容

  • プロトコル バッファを使用してサービス API を定義する方法。
  • 自動コード生成を使用して、プロトコル バッファ定義から gRPC ベースのクライアントとサーバーを構築する方法。
  • gRPC を使用したクライアント サーバー ストリーミング通信の理解。

この Codelab は、gRPC を初めて使用する Python デベロッパー、gRPC の復習を希望するデベロッパー、分散システムの構築に関心のある方を対象としています。gRPC の経験は必要ありません。

2. 始める前に

必要なもの

  • Python 3.9 以降。Python 3.13 をおすすめします。プラットフォーム固有のインストール手順については、Python の設定と使用をご覧ください。または、uv pyenv などのツールを使用して、システム以外の Python をインストールします。
  • Python パッケージをインストールする pip
  • Python 仮想環境を作成する venv

ensurepip パッケージと venv パッケージは Python 標準ライブラリの一部であり、通常はデフォルトで使用できます。

ただし、Debian ベースのディストリビューション(Ubuntu など)では、Python を再配布する際にこれらを除外することがあります。パッケージをインストールするには、次のコマンドを実行します。

sudo apt install python3-pip python3-venv

コードを取得する

学習を効率化するため、この Codelab では、すぐに始められるように、あらかじめ作成されたソースコード スキャフォールドが用意されています。次の手順では、grpc_tools.protoc Protocol Buffer コンパイラ プラグインを使用した gRPC コード生成など、アプリケーションの完成までの手順について説明します。

grpc-codelabs

この Codelab のスキャフォールディング ソースコードは、codelabs/grpc-python-streaming/start_here ディレクトリにあります。コードを自分で実装しない場合は、完成したソースコードが completed ディレクトリにあります。

まず、Codelab の作業ディレクトリを作成し、そのディレクトリに移動します。

mkdir grpc-python-streaming && cd grpc-python-streaming

Codelab をダウンロードして解凍します。

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-python-streaming/start_here

または、Codelab ディレクトリのみを含む .zip ファイルをダウンロードして、手動で解凍することもできます。

3. メッセージとサービスを定義する

まず、プロトコル バッファを使用して、アプリケーションの gRPC サービス、RPC メソッド、リクエストとレスポンスのメッセージ タイプを定義します。サービスでは以下を提供します。

  • サーバーが実装し、クライアントが呼び出す RPC メソッド ListFeaturesRecordRouteRouteChat
  • RPC メソッドを呼び出すときにクライアントとサーバー間で交換されるデータ構造であるメッセージ タイプ PointFeatureRectangleRouteNoteRouteSummary

これらの RPC メソッドとそのメッセージ型はすべて、提供されたソースコードの protos/route_guide.proto ファイルで定義されます。

Protocol Buffers は一般に protobuf と呼ばれます。gRPC の用語の詳細については、gRPC の コア コンセプト、アーキテクチャ、ライフサイクルをご覧ください。

メッセージ タイプを定義する

ソースコードの protos/route_guide.proto ファイルで、まず Point メッセージ タイプを定義します。Point は、地図上の緯度と経度の座標ペアを表します。この Codelab では、座標に整数を使用します。

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

数値 12 は、message 構造内の各フィールドの一意の ID 番号です。

次に、Feature メッセージ タイプを定義します。Feature は、Point で指定された場所にあるものの名前または郵便番号に string フィールドを使用します。

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

領域内の複数のポイントをクライアントにストリーミングできるようにするには、緯度経度の長方形を表す Rectangle メッセージが必要です。これは、対角線上の 2 つのポイント lohi で表されます。

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

また、特定の時点で送信されたメッセージを表す RouteNote メッセージは次のようになります。

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

最後に、RouteSummary メッセージが必要です。このメッセージは、次のセクションで説明する RecordRoute RPC のレスポンスとして受信されます。受信した個々のポイントの数、検出された特徴の数、各ポイント間の距離の累積合計としてカバーされた合計距離が含まれます。

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

サービス メソッドを定義する

サービスを定義するには、.proto ファイルで名前付きサービスを指定します。route_guide.proto ファイルには、アプリケーションのサービスによって提供される 1 つ以上のメソッドを定義する RouteGuide という名前の service 構造があります。

サービス定義内で RPC メソッドを定義するときは、リクエストとレスポンスの型を指定します。この Codelab のこのセクションでは、次のことを定義します。

ListFeatures

指定された Rectangle 内で使用可能な Feature オブジェクトを取得します。長方形が広い範囲をカバーし、多数の対象物を含む可能性があるため、結果は一度に返されるのではなく、ストリーミングされます。

このアプリケーションでは、サーバーサイド ストリーミング RPC を使用します。クライアントがサーバーにリクエストを送信し、一連のメッセージを読み取るストリームを取得します。クライアントは、受信したストリームをメッセージがなくなるまで読み取ります。例に示すように、レスポンス型の前に stream キーワードを配置して、サーバーサイド ストリーミング メソッドを指定します。

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

通過中のルート上のポイントのストリームを受け取り、通過が完了すると RouteSummary を返します。

この場合は、クライアントサイド ストリーミング RPC が適しています。クライアントは、提供されたストリームを使用して、一連のメッセージを書き込み、サーバーに送信します。クライアントは、メッセージの書き込みを終了すると、サーバーがすべてのメッセージを読み取ってレスポンスを返すのを待ちます。クライアントサイド ストリーミング メソッドを指定するには、リクエスト タイプの前に stream キーワードを配置します。

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

ルートの移動中に送信された RouteNotes のストリームを受け入れ、他の RouteNotes(他のユーザーからのものなど)を受信します。

これはまさに双方向ストリーミングのユースケースです。双方向ストリーミング RPC。両側が読み取り / 書き込みストリームを使用して一連のメッセージを送信します。2 つのストリームは独立して動作するため、クライアントとサーバーは任意の順序で読み取りと書き込みを行うことができます。たとえば、サーバーはすべてのクライアント メッセージを受信してからレスポンスを書き込むことも、メッセージを読み取ってからメッセージを書き込むことを交互に行うこともできます。各ストリーム内のメッセージの順序は保持されます。このタイプのメソッドを指定するには、リクエストとレスポンスの両方の前に stream キーワードを配置します。

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. クライアント コードとサーバーコードを生成する

次に、プロトコル バッファ コンパイラを使用して、.proto ファイルからクライアントとサーバーの両方のボイラープレート gRPC コードを生成します。

gRPC Python コード生成用に、grpcio-tools を作成しました。次の内容が含まれます。

  1. message 定義から Python コードを生成する通常の protoc コンパイラ。
  2. service 定義から Python コード(クライアントとサーバースタブ)を生成する gRPC protobuf プラグイン。

pip を使用して grpcio-tools Python パッケージをインストールします。プロジェクトの依存関係をシステム パッケージから分離するために、新しい Python 仮想環境(venv)を作成しましょう。

python3 -m venv --upgrade-deps .venv

bash/zsh シェルで仮想環境を有効にするには:

source .venv/bin/activate

Windows と標準以外のシェルについては、https://docs.python.org/3/library/venv.html#how-venvs-work の表をご覧ください。

次に、grpcio-tools をインストールします(これにより、grpcio パッケージもインストールされます)。

pip install grpcio-tools

次のコマンドを使用して、Python ボイラープレート コードを生成します。

python -m grpc_tools.protoc --proto_path=./protos  \
 --python_out=. --pyi_out=. --grpc_python_out=. \
 ./protos/route_guide.proto

これにより、route_guide.proto で定義したインターフェースに対して次のファイルが生成されます。

  1. route_guide_pb2.py には、message 定義から生成されたクラスを動的に作成するコードが含まれています。
  2. route_guide_pb2.pyi は、message 定義から生成された「スタブファイル」または「型ヒントファイル」です。実装のないシグネチャのみが含まれています。スタブファイルは、IDE でより優れたオートコンプリートとエラー検出を提供するために使用できます。
  3. route_guide_pb2_grpc.pyservice 定義から生成され、gRPC 固有のクラスと関数が含まれています。

gRPC 固有のコードには次のものが含まれます。

  1. RouteGuideStub。これは、gRPC クライアントが RouteGuide RPC を呼び出すために使用できます。
  2. RouteGuideServicer。これは、RouteGuide サービスの実装のインターフェースを定義します。
  3. RouteGuideServicergRPC サーバーに登録するために使用される add_RouteGuideServicer_to_server 関数。

5. サーバーを作成する

まず、RouteGuide サーバーの作成方法を見てみましょう。RouteGuide サーバーの作成と実行は、次の 2 つの作業項目に分かれます。

  • サービス定義から生成されたサービサー インターフェースを、サービスの実際の「作業」を行う関数で実装します。
  • クライアントからのリクエストをリッスンしてレスポンスを送信する gRPC サーバーを実行する。

route_guide_server.py を見てみましょう。

RouteGuide を実装する

route_guide_server.py には、生成されたクラス route_guide_pb2_grpc.RouteGuideServicer をサブクラス化する RouteGuideServicer クラスがあります。

# RouteGuideServicer provides an implementation of the methods of the RouteGuide service.
class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):

RouteGuideServicer は、すべての RouteGuide サービス メソッドを実装します。

サーバーサイド ストリーミング RPC

ListFeatures は、複数の Feature をクライアントに送信するレスポンス ストリーミング RPC です。

def ListFeatures(self, request, context):
    """List all features contained within the given Rectangle."""
    left = min(request.lo.longitude, request.hi.longitude)
    right = max(request.lo.longitude, request.hi.longitude)
    top = max(request.lo.latitude, request.hi.latitude)
    bottom = min(request.lo.latitude, request.hi.latitude)
    for feature in self.db:
        lat, lng = feature.location.latitude, feature.location.longitude
        if left <= lng <= right and bottom <= lat <= top:
            yield feature

ここで、リクエスト メッセージは route_guide_pb2.Rectangle であり、クライアントは Feature を検索しようとしています。メソッドは単一のレスポンスを返すのではなく、ゼロ以上のレスポンスを生成します。

クライアントサイド ストリーミング RPC

リクエスト ストリーミング メソッド RecordRoute は、リクエスト値の イテレータを使用し、単一のレスポンス値を返します。

def RecordRoute(self, request_iterator, context):
    """Calculate statistics about the trip composed of Points."""
    point_count = 0
    feature_count = 0
    distance = 0.0
    prev_point = None

    start_time = time.time()
    for point in request_iterator:
        point_count += 1
        if get_feature(self.db, point):
            feature_count += 1
        if prev_point:
            distance += get_distance(prev_point, point)
        prev_point = point

    elapsed_time = time.time() - start_time
    return route_guide_pb2.RouteSummary(
        point_count=point_count,
        feature_count=feature_count,
        distance=int(distance),
        elapsed_time=int(elapsed_time),
    )

双方向ストリーミング RPC

最後に、双方向ストリーミング RPC RouteChat() を見てみましょう。

def RouteChat(self, request_iterator, context):
    """
    Receive a stream of message/location pairs, and responds with
    a stream of all previous messages for the given location.
    """
    prev_notes = []
    for new_note in request_iterator:
        for prev_note in prev_notes:
            if prev_note.location == new_note.location:
                yield prev_note
        prev_notes.append(new_note)

このメソッドのセマンティクスは、リクエスト ストリーミング メソッドとレスポンス ストリーミング メソッドのセマンティクスの組み合わせです。リクエスト値のイテレータが渡され、レスポンス値のイテレータになります。

サーバーを起動する

RouteGuide メソッドをすべて実装したら、次のステップとして、クライアントが実際にサービスを使用できるように gRPC サーバーを起動します。

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    route_guide_pb2_grpc.add_RouteGuideServicer_to_server(
        RouteGuideServicer(),
        server,
    )
    listen_addr = "localhost:50051"
    server.add_insecure_port(listen_addr)
    print(f"Starting server on {listen_addr}")
    server.start()
    server.wait_for_termination()

サーバーの start() メソッドはブロックされません。リクエストを処理するために新しいスレッドがインスタンス化されます。server.start() を呼び出すスレッドは、その間に行う他の処理がないことがよくあります。この場合、server.wait_for_termination() を呼び出して、サーバーが終了するまで呼び出しスレッドをクリーンにブロックできます。

6. クライアントを作成する

route_guide_client.py を見てみましょう。

スタブを作成する

サービス メソッドを呼び出すには、まずスタブを作成する必要があります。

run() メソッドで .proto. から生成された route_guide_pb2_grpc モジュールの RouteGuideStub クラスをインスタンス化します。

with grpc.insecure_channel("localhost:50051") as channel:
    stub = route_guide_pb2_grpc.RouteGuideStub(channel)

ここで channel はコンテキスト マネージャーとして使用され、インタープリタが with ブロックを離れると自動的に閉じます。

サービス メソッドを呼び出す

単一のレスポンスを返す RPC メソッド(「response-unary」メソッド)の場合、gRPC Python は同期(ブロッキング)と非同期(ノンブロッキング)の両方の制御フロー セマンティクスをサポートします。レスポンス ストリーミング RPC メソッドの場合、呼び出しはレスポンス値のイテレータをすぐに返します。そのイテレータの next() メソッドの呼び出しは、イテレータから生成されるレスポンスが利用可能になるまでブロックされます。

サーバーサイド ストリーミング RPC

レスポンス ストリーミング ListFeatures の呼び出しは、シーケンス型を扱うのと似ています。

def guide_list_features(stub):
    _lo = route_guide_pb2.Point(latitude=400000000, longitude=-750000000)
    _hi = route_guide_pb2.Point(latitude=420000000, longitude=-730000000)
    rectangle = route_guide_pb2.Rectangle(
        lo=_lo,
        hi=_hi,
    )
    print("Looking for features between 40, -75 and 42, -73")

    features = stub.ListFeatures(rectangle)
    for feature in features:
        print(
            f"Feature called '{feature.name}'"
            f" at {format_point(feature.location)}"
        )

クライアントサイド ストリーミング RPC

リクエスト ストリーミング RecordRoute の呼び出しは、ローカル メソッドにイテレータを渡すのと同様です。上記の単一のレスポンスを返すシンプルな RPC と同様に、同期的に呼び出すことができます。

def guide_record_route(stub):
    feature_list = route_guide_resources.read_route_guide_database()
    route_iterator = generate_route(feature_list)

    route_summary = stub.RecordRoute(route_iterator)
    print(f"Finished trip with {route_summary.point_count} points")
    print(f"Passed {route_summary.feature_count} features")
    print(f"Traveled {route_summary.distance} meters")
    print(f"It took {route_summary.elapsed_time} seconds")

双方向ストリーミング RPC

双方向ストリーミング RouteChat の呼び出しには、(サービス側の場合と同様に)リクエスト ストリーミングとレスポンス ストリーミングのセマンティクスの組み合わせがあります。

リクエスト メッセージを生成し、yield を使用して 1 つずつ送信します。

def generate_notes():
    home = route_guide_pb2.Point(latitude=1, longitude=1)
    work = route_guide_pb2.Point(latitude=2, longitude=2)
    notes = [
        make_route_note("Departing from home", home),
        make_route_note("Arrived at work", work),
        make_route_note("Having lunch at work", work),
        make_route_note("Departing from work", work),
        make_route_note("Arrived home", home),
    ]
    for note in notes:
        print(
            f"Sending RouteNote for {format_point(note.location)}:"
            f" {note.message}"
        )
        yield note
        # Sleep to simulate moving from one point to another.
        # Only for demonstrating the order of the messages.
        time.sleep(0.1)

サーバー レスポンスを受信して処理します。

def guide_route_chat(stub):
    responses = stub.RouteChat(generate_notes())
    for response in responses:
        print(
            "< Found previous note at"
            f" {format_point(response.location)}: {response.message}"
        )

ヘルパー メソッドを呼び出す

run で、作成したメソッドを実行し、stub を渡します。

print("-------------- ListFeatures --------------")
guide_list_features(stub)
print("-------------- RecordRoute --------------")
guide_record_route(stub)
print("-------------- RouteChat --------------")
guide_route_chat(stub)

7. 試してみる

サーバーを実行します。

python route_guide_server.py

別のターミナルから、仮想環境を再度有効にして(source .venv/bin/activate))、クライアントを実行します。

python route_guide_client.py

出力を見てみましょう。

ListFeatures

まず、機能のリストが表示されます。各フィーチャーは、リクエストされた長方形内にあることが検出されると、サーバーからストリーミングされます(サーバーサイド ストリーミング RPC)。

-------------- ListFeatures --------------
Looking for features between 40, -75 and 42, -73
Feature called 'Patriots Path, Mendham, NJ 07945, USA' at (lat=407838351, lng=-746143763)
Feature called '101 New Jersey 10, Whippany, NJ 07981, USA' at (lat=408122808, lng=-743999179)
Feature called 'U.S. 6, Shohola, PA 18458, USA' at (lat=413628156, lng=-749015468)
Feature called '5 Conners Road, Kingston, NY 12401, USA' at (lat=419999544, lng=-740371136)
...

RecordRoute

次に、RecordRoute は、クライアントからサーバーにストリーミングされるランダムに訪問したポイントのリストを示しています(クライアントサイド ストリーミング RPC)

-------------- RecordRoute --------------
Visiting point (lat=410395868, lng=-744972325)
Visiting point (lat=404310607, lng=-740282632)
Visiting point (lat=403966326, lng=-748519297)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=406589790, lng=-743560121)
Visiting point (lat=410322033, lng=-747871659)
Visiting point (lat=415464475, lng=-747175374)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=402647019, lng=-747071791)
Visiting point (lat=414638017, lng=-745957854)

クライアントがすべての訪問ポイントのストリーミングを完了すると、サーバーから非ストリーミング レスポンス(単項 RPC)が返されます。このレスポンスには、クライアントの全ルートに対して実行された計算の概要が含まれます。

Finished trip with 10 points
Passed 10 features
Traveled 654743 meters
It took 0 seconds

RouteChat

最後に、RouteChat 出力は双方向ストリーミングを示しています。クライアントが home または work ポイントを「訪問」している場合、RouteNote をサーバーに送信して、ポイントのメモを記録します。ポイントがすでにアクセスされている場合、サーバーはこのポイントの以前のすべてのメモをストリーミングで返します。

-------------- RouteChat --------------
Sending RouteNote for (lat=1, lng=1): Departing from home
Sending RouteNote for (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Having lunch at work
< Found previous note at (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Departing from work
< Found previous note at (lat=2, lng=2): Arrived at work
< Found previous note at (lat=2, lng=2): Having lunch at work
Sending RouteNote for (lat=1, lng=1): Arrived home
< Found previous note at (lat=1, lng=1): Departing from home

8. 次のステップ