gRPC-Go スタートガイド - ストリーミング

1. はじめに

この Codelab では、gRPC-Go を使用して、Go で記述されたルート マッピング アプリケーションの基盤となるクライアントとサーバーを作成します。

このチュートリアルを完了すると、gRPC を使用してリモート サーバーに接続し、クライアントのルート上の機能に関する情報を取得し、クライアントのルートの概要を作成し、サーバーや他のクライアントとトラフィックの更新などのルート情報を交換するクライアントが作成されます。

サービスは Protocol Buffers ファイルで定義されます。このファイルは、クライアントとサーバーが相互に通信できるように、クライアントとサーバーのボイラープレート コードを生成するために使用されます。これにより、この機能を実装する時間と労力を節約できます。

この生成されたコードは、サーバーとクライアント間の通信の複雑さだけでなく、データのシリアル化と逆シリアル化も処理します。

学習内容

  • プロトコル バッファを使用してサービス API を定義する方法。
  • 自動コード生成を使用して、プロトコル バッファ定義から gRPC ベースのクライアントとサーバーを構築する方法。
  • gRPC を使用したクライアント サーバー ストリーミング通信の理解。

この Codelab は、gRPC を初めて使用する Go デベロッパー、gRPC の復習を希望するデベロッパー、分散システムの構築に関心のある方を対象としています。gRPC の経験は必要ありません。

2. 始める前に

前提条件

次のものがインストールされていることを確認します。

  • Go ツールチェーン バージョン 1.24.5 以降。インストール手順については、Go のスタートガイドをご覧ください。
  • プロトコル バッファ コンパイラ protoc バージョン 3.27.1 以降。インストール手順については、コンパイラのインストール ガイドをご覧ください。
  • Go と gRPC のプロトコル バッファ コンパイラ プラグイン。これらのプラグインをインストールするには、次のコマンドを実行します。
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

プロトコル バッファ コンパイラがプラグインを見つけられるように、PATH 変数を更新します。

export PATH="$PATH:$(go env GOPATH)/bin"

コードを取得する

この Codelab では、完全にゼロから始める必要がないように、アプリケーションのソースコードのスケルトンが用意されています。次の手順では、プロトコル バッファ コンパイラ プラグインを使用してボイラープレート gRPC コードを生成するなど、アプリケーションを完成させる方法について説明します。

まず、Codelab の作業ディレクトリを作成し、そのディレクトリに cd します。

mkdir streaming-grpc-go-getting-started && cd streaming-grpc-go-getting-started

Codelab をダウンロードして解凍します。

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-go-streaming/start_here

または、Codelab ディレクトリのみを含む .zip ファイルをダウンロードして、手動で解凍することもできます。

実装の入力をスキップする場合は、完成したソースコードを GitHub で入手できます。

3. メッセージとサービスを定義する

まず、プロトコル バッファを使用して、アプリケーションの gRPC サービス、RPC メソッド、リクエストとレスポンスのメッセージ タイプを定義します。サービスでは以下を提供します。

  • サーバーが実装し、クライアントが呼び出す RPC メソッド ListFeaturesRecordRouteRouteChat
  • 上記のメソッドを呼び出すときにクライアントとサーバー間で交換されるデータ構造であるメッセージ タイプ PointFeatureRectangleRouteNoteRouteSummary

これらの RPC メソッドとそのメッセージ型はすべて、提供されたソースコードの routeguide/route_guide.proto ファイルで定義されます。

Protocol Buffers は一般に protobuf と呼ばれます。gRPC の用語の詳細については、gRPC の コア コンセプト、アーキテクチャ、ライフサイクルをご覧ください。

メッセージのタイプを定義する

ソースコードの routeguide/route_guide.proto ファイルで、まず Point メッセージ タイプを定義します。Point は、地図上の緯度と経度の座標ペアを表します。この Codelab では、座標に整数を使用します。

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

数値 12 は、message 構造内の各フィールドの一意の ID 番号です。

次に、Feature メッセージ タイプを定義します。Feature は、Point で指定された場所にあるものの名前または郵便番号に string フィールドを使用します。

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

次に、緯度と経度の長方形を表す Rectangle メッセージ。これは、対角線上の 2 つの反対側の点「lo」と「hi」として表されます。

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

また、特定の時点で送信されたメッセージを表す RouteNote メッセージ。

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

また、RouteSummary メッセージも必要です。このメッセージは、次のセクションで説明する RecordRoute RPC のレスポンスとして受信されます。受信した個々のポイントの数、検出された特徴の数、各ポイント間の距離の累積合計としてカバーされた合計距離が含まれます。

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

サービス メソッドを定義する

サービスを定義するには、.proto ファイルで名前付きサービスを指定します。route_guide.proto ファイルには、アプリケーションのサービスによって提供される 1 つ以上のメソッドを定義する RouteGuide という名前の service 構造があります。

サービス定義内で RPC メソッドを定義し、リクエストとレスポンスの型を指定します。この Codelab のこのセクションでは、次のことを定義します。

ListFeatures

指定された Rectangle 内で使用可能な Feature を取得します。長方形が広い領域をカバーし、多数のフィーチャーを含む可能性があるため、結果は一度に返されるのではなく(繰り返しフィールドを含むレスポンス メッセージなど)、ストリーミングされます。

この RPC に適したタイプは、サーバーサイド ストリーミング RPC です。クライアントがサーバーにリクエストを送信し、一連のメッセージを読み取るストリームを取得します。クライアントは、受信したストリームをメッセージがなくなるまで読み取ります。例に示すように、レスポンス型の前に stream キーワードを配置して、サーバーサイド ストリーミング メソッドを指定します。

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

トラバースされるルート上の Point のストリームを受け取り、トラバースが完了すると RouteSummary を返します。

この場合は、クライアントサイド ストリーミング RPC が適切です。クライアントは、提供されたストリームを使用して、一連のメッセージを書き込み、サーバーに送信します。クライアントは、メッセージの書き込みを終了すると、サーバーがすべてのメッセージを読み取ってレスポンスを返すのを待ちます。クライアントサイド ストリーミング メソッドを指定するには、リクエスト タイプの前に stream キーワードを配置します。

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

ルートの移動中に送信された RouteNote のストリームを受け取り、他の RouteNote(他のユーザーからのものなど)も受け取ります。

これは、まさに双方向ストリーミングのユースケースです。双方向ストリーミング RPC では、両側が読み取り / 書き込みストリームを使用して一連のメッセージを送信します。2 つのストリームは独立して動作するため、クライアントとサーバーは任意の順序で読み取りと書き込みを行うことができます。

たとえば、サーバーはすべてのクライアント メッセージを受信してからレスポンスを書き込むことも、メッセージを読み取ってからメッセージを書き込むことを交互に行うことも、読み取りと書き込みの他の組み合わせを行うこともできます。

各ストリーム内のメッセージの順序は保持されます。このタイプのメソッドを指定するには、リクエストとレスポンスの両方の前に stream キーワードを配置します。

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. クライアントとサーバーのコードを生成する

次に、プロトコル バッファ コンパイラを使用して、.proto ファイルからクライアントとサーバーの両方のボイラープレート gRPC コードを生成します。routeguide ディレクトリで、次のコマンドを実行します。

protoc --go_out=. --go_opt=paths=source_relative \
       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
       route_guide.proto

このコマンドは、次のファイルを生成します。

  • route_guide.pb.go。アプリケーションのメッセージ タイプを作成し、そのデータとメッセージを表すタイプの定義にアクセスする関数が含まれています。
  • route_guide_grpc.pb.go: クライアントがサービスの gRPC リモート メソッドを呼び出すために使用する関数と、サーバーがそのリモート サービスを提供するために使用する関数が含まれています。

次に、クライアントがリクエストを送信したときにサーバーが回答を返せるように、サーバーサイドでメソッドを実装します。

5. サービスを実装する

まず、RouteGuide サーバーの作成方法を見てみましょう。RouteGuide サービスが機能するには、次の 2 つの要素が必要です。

  • サービス定義から生成されたサービス インターフェースを実装する: サービスの実際の「作業」を行う。
  • gRPC サーバーを実行して、クライアントからのリクエストをリッスンし、適切なサービス実装にディスパッチします。

server/server.go で RouteGuide を実装しましょう。

RouteGuide を実装する

生成された RouteGuideService インターフェースを実装する必要があります。実装は次のようになります。

type routeGuideServer struct {
        ...
}
...
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
        ...
}
...

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
        ...
}
...

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
        ...
}

各 RPC 実装について詳しく見ていきましょう。

サーバーサイド ストリーミング RPC

ストリーミング RPC のいずれかから始めます。ListFeatures はサーバーサイド ストリーミング RPC であるため、複数の Feature をクライアントに送り返す必要があります。

func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
  for _, feature := range s.savedFeatures {
    if inRange(feature.Location, rect) {
      if err := stream.Send(feature); err != nil {
        return err
      }
    }
  }
  return nil
}

ご覧のとおり、メソッド パラメータで単純なリクエスト オブジェクトとレスポンス オブジェクトを取得するのではなく、今回はリクエスト オブジェクト(クライアントが Features を見つけたい Rectangle)と、レスポンスを書き込むための特別な RouteGuide_ListFeaturesServer オブジェクトを取得します。このメソッドでは、返す必要がある数の Feature オブジェクトを生成し、Send() メソッドを使用して RouteGuide_ListFeaturesServer に書き込みます。最後に、単純な RPC と同様に、nil エラーを返して、レスポンスの書き込みが完了したことを gRPC に伝えます。この呼び出しでエラーが発生した場合は、nil 以外のエラーを返します。gRPC レイヤは、このエラーを適切な RPC ステータスに変換して、ワイヤで送信します。

クライアントサイド ストリーミング RPC

次に、もう少し複雑なクライアントサイド ストリーミング メソッド RecordRoute を見てみましょう。ここでは、クライアントから Points のストリームを取得し、旅行に関する情報を含む単一の RouteSummary を返します。ご覧のとおり、今回はメソッドにリクエスト パラメータがまったくありません。代わりに、サーバーがメッセージの読み取りと書き込みの両方に使用できる RouteGuide_RecordRouteServer ストリームを取得します。サーバーは Recv() メソッドを使用してクライアント メッセージを受信し、SendAndClose() メソッドを使用して単一のレスポンスを返すことができます。

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
  var pointCount, featureCount, distance int32
  var lastPoint *pb.Point
  startTime := time.Now()
  for {
    point, err := stream.Recv()
    if err == io.EOF {
      endTime := time.Now()
      return stream.SendAndClose(&pb.RouteSummary{
        PointCount:   pointCount,
        FeatureCount: featureCount,
        Distance:     distance,
        ElapsedTime:  int32(endTime.Sub(startTime).Seconds()),
      })
    }
    if err != nil {
      return err
    }
    pointCount++
    for _, feature := range s.savedFeatures {
      if proto.Equal(feature.Location, point) {
        featureCount++
      }
    }
    if lastPoint != nil {
      distance += calcDistance(lastPoint, point)
    }
    lastPoint = point
  }
}

メソッドの本文では、RouteGuide_RecordRouteServerRecv() メソッドを使用して、メッセージがなくなるまでクライアントのリクエストをリクエスト オブジェクト(この場合は Point)に繰り返し読み込みます。サーバーは、各呼び出しの後に Recv() から返されたエラーを確認する必要があります。nil の場合、ストリームはまだ有効で、読み取りを続行できます。io.EOF の場合、メッセージ ストリームは終了しており、サーバーは RouteSummary を返すことができます。それ以外の値の場合は、gRPC レイヤで RPC ステータスに変換されるように、エラーを「そのまま」返します。

双方向ストリーミング RPC

最後に、双方向ストリーミング RPC RouteChat() を見てみましょう。

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
  for {
    in, err := stream.Recv()
    if err == io.EOF {
      return nil
    }
    if err != nil {
      return err
    }
    key := serialize(in.Location)

    s.mu.Lock()
    s.routeNotes[key] = append(s.routeNotes[key], in)
    // Note: this copy prevents blocking other clients while serving this one.
    // We don't need to do a deep copy, because elements in the slice are
    // insert-only and never modified.
    rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
    copy(rn, s.routeNotes[key])
    s.mu.Unlock()

    for _, note := range rn {
      if err := stream.Send(note); err != nil {
        return err
      }
    }
  }
}

今回は、クライアントサイド ストリーミングの例と同様に、メッセージの読み取りと書き込みに使用できる RouteGuide_RouteChatServer ストリームを取得します。ただし、今回はクライアントがメッセージ ストリームにメッセージを書き込んでいる間に、メソッドのストリームを介して値を返します。ここでの読み取りと書き込みの構文は、クライアント ストリーミング メソッドと非常によく似ています。ただし、サーバーは複数のレスポンスを書き込むため、ストリームの SendAndClose() メソッドではなく send() メソッドを使用します。各サイドは常に相手のメッセージを書き込まれた順に取得しますが、クライアントとサーバーの両方が任意の順序で読み取りと書き込みを行うことができます。ストリームは完全に独立して動作します。

サーバーを起動する

すべてのメソッドを実装したら、クライアントが実際にサービスを使用できるように、gRPC サーバーを起動する必要があります。次のスニペットは、RouteGuide サービスでこれを行う方法を示しています。

lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
  log.Fatalf("failed to listen: %v", err)
}

grpcServer := grpc.NewServer()

s := &routeGuideServer{routeNotes: make(map[string][]*pb.RouteNote)}
s.loadFeatures()
pb.RegisterRouteGuideServer(grpcServer, s)
grpcServer.Serve(lis)

main() で起こっていることをステップごとに説明します。

  1. lis, err := net.Listen(...) を使用して、リモート クライアント リクエストをリッスンするために使用する TCP ポートを指定します。デフォルトでは、アプリケーションは変数 port で指定された TCP ポート 50051 を使用します。または、サーバーの実行時にコマンドラインで --port スイッチを渡すことで指定します。TCP ポートを開けない場合、アプリケーションは致命的なエラーで終了します。
  2. grpc.NewServer(...) を使用して gRPC サーバーのインスタンスを作成し、このインスタンスに grpcServer という名前を付けます。
  3. アプリケーションの API サービスを表す構造体 routeGuideServer へのポインタを作成し、ポインタに s. という名前を付けます。
  4. s.loadFeatures() を使用して、配列 s.savedFeatures に値を設定します。
  5. サービス実装を gRPC サーバーに登録します。
  6. サーバーでポートの詳細を指定して Serve() を呼び出し、クライアント リクエストのブロック待機を行います。この処理は、プロセスが強制終了されるか Stop() が呼び出されるまで続きます。

関数 loadFeatures() は、座標と位置のマッピングを server/testdata.go から取得します。

6. クライアントを作成する

次に、クライアント コードを実装する client/client.go を編集します。

リモート サービスのメソッドを呼び出すには、まずサーバーと通信するための gRPC チャネルを作成する必要があります。これは、クライアントの main() 関数で、サーバーのターゲット URI 文字列(この場合はアドレスとポート番号)を grpc.NewClient() に渡すことで作成します。

// Set up a connection to the gRPC server.
conn, err := grpc.NewClient("dns:///"+*serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
  log.Fatalf("fail to dial: %v", err)
}
defer conn.Close()

変数 serverAddr で定義されるサーバーのアドレスは、デフォルトでは localhost:50051 です。クライアントの実行時にコマンドラインで --addr スイッチを使用すると、このアドレスをオーバーライドできます。

クライアントが TLS や JWT 認証情報などの認証情報を必要とするサービスに接続する必要がある場合、クライアントは必要な認証情報を含む DialOptions オブジェクトを grpc.NewClient のパラメータとして渡すことができます。RouteGuide サービスには認証情報は必要ありません。

gRPC チャネルを設定したら、Go 関数呼び出しで RPC を実行するためのクライアント スタブが必要です。このスタブは、アプリの .proto ファイルから生成された route_guide_grpc.pb.go ファイルが提供する NewRouteGuideClient メソッドを使用して取得します。

import (pb "github.com/grpc-ecosystem/codelabs/getting_started_streaming/routeguide")

client := pb.NewRouteGuideClient(conn)

サービス メソッドを呼び出す

次に、サービス メソッドを呼び出す方法を見てみましょう。gRPC-Go では、RPC はブロッキング/同期モードで動作します。つまり、RPC 呼び出しはサーバーからの応答を待機し、応答またはエラーを返します。

サーバーサイド ストリーミング RPC

ここでは、サーバーサイド ストリーミング メソッド ListFeatures を呼び出します。このメソッドは、地理的な Feature オブジェクトのストリームを返します。

rect := &pb.Rectangle{ ... }  // initialize a pb.Rectangle
log.Printf("Looking for features within %v", rect)
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
  log.Fatalf("client.ListFeatures failed: %v", err)
}
for {
  // For server-to-client streaming RPCs, you call stream.Recv() until it
  // returns io.EOF.
  feature, err := stream.Recv()
  if err == io.EOF {
    break
  }
  if err != nil {
    log.Fatalf("client.ListFeatures failed: %v", err)
  }
  log.Printf("Feature: name: %q, point:(%v, %v)", feature.GetName(),
    feature.GetLocation().GetLatitude(), feature.GetLocation().GetLongitude())
}

単純な RPC と同様に、メソッドにコンテキストとリクエストを渡します。ただし、レスポンス オブジェクトではなく、RouteGuide_ListFeaturesClient のインスタンスが返されます。クライアントは RouteGuide_ListFeaturesClient ストリームを使用して、サーバーのレスポンスを読み取ることができます。RouteGuide_ListFeaturesClientRecv() メソッドを使用して、メッセージがなくなるまで、サーバーのレスポンスをレスポンス プロトコル バッファ オブジェクト(この場合は Feature)に繰り返し読み込みます。クライアントは、各呼び出しの後に Recv() から返されたエラー err を確認する必要があります。nil の場合は、ストリームはまだ有効で、読み取りを続行できます。io.EOF の場合は、メッセージ ストリームが終了しています。それ以外の場合は、RPC エラーが発生しているため、err を介して渡す必要があります。

クライアントサイド ストリーミング RPC

クライアントサイドのストリーミング メソッド RecordRoute は、サーバーサイドのメソッドと似ています。ただし、メソッドにコンテキストを渡して RouteGuide_RecordRouteClient ストリームを取得するだけです。このストリームは、メッセージの書き込み読み取りの両方に使用できます。

// Create a random number of random points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
  points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
c2sStream, err := client.RecordRoute(context.TODO())
if err != nil {
  log.Fatalf("client.RecordRoute failed: %v", err)
}
// Stream each point to the server.
for _, point := range points {
  if err := c2sStream.Send(point); err != nil {
    log.Fatalf("client.RecordRoute: stream.Send(%v) failed: %v", point, err)
  }
}
// Close the stream and receive the RouteSummary from the server.
reply, err := c2sStream.CloseAndRecv()
if err != nil {
  log.Fatalf("client.RecordRoute failed: %v", err)
}
log.Printf("Route summary: %v", reply)

RouteGuide_RecordRouteClient には、サーバーにリクエストを送信するために使用できる Send() メソッドがあります。Send() を使用してクライアントのリクエストをストリームに書き込んだら、ストリームで CloseAndRecv() を呼び出して、書き込みが完了し、レスポンスの受信を待機していることを gRPC に通知する必要があります。RPC ステータスは、CloseAndRecv() から返された err から取得します。ステータスが nil の場合、CloseAndRecv() からの最初の戻り値は有効なサーバー レスポンスになります。

双方向ストリーミング RPC

最後に、双方向ストリーミング RPC RouteChat() を見てみましょう。RecordRoute の場合と同様に、メソッドにコンテキスト オブジェクトを渡すだけで、メッセージの書き込みと読み取りの両方に使用できるストリームが返されます。ただし、今回は、サーバーがメッセージ ストリームにメッセージを書き込んでいる間に、メソッドのストリームを介して値を返します。

biDiStream, err := client.RouteChat(context.Background())
if err != nil {
  log.Fatalf("client.RouteChat failed: %v", err)
}
// this channel is used to wait for the receive goroutine to finish.
recvDoneCh := make(chan struct{})
// receive goroutine.
go func() {
  for {
    in, err := biDiStream.Recv()
    if err == io.EOF {
      // read done.
      close(recvDoneCh)
      return
    }
    if err != nil {
      log.Fatalf("client.RouteChat failed: %v", err)
    }
    log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
  }
}()
// send messages simultaneously.
for _, note := range notes {
  if err := biDiStream.Send(note); err != nil {
    log.Fatalf("client.RouteChat: stream.Send(%v) failed: %v", note, err)
  }
}
biDiStream.CloseSend()
// wait for the receive goroutine to finish.
<-recvDoneCh

ここでの読み取りと書き込みの構文は、クライアントサイドのストリーミング メソッドとよく似ています。ただし、呼び出しが完了したら、ストリームの CloseSend() メソッドを 1 回使用します。各サイドは常に相手のメッセージを書き込まれた順に取得しますが、クライアントとサーバーの両方が任意の順序で読み取りと書き込みを行うことができます。ストリームは完全に独立して動作します。

7. 試してみる

アプリケーションの作業ディレクトリで次のコマンドを実行して、サーバーとクライアントが正しく連携していることを確認します。

  1. 1 つのターミナルでサーバーを実行します。
cd server
go run .
  1. 別のターミナルからクライアントを実行します。
cd client
go run .

次のような出力が表示されます(わかりやすくするため、タイムスタンプは省略されています)。

Looking for features within lo:<latitude:400000000 longitude:-750000000 > hi:<latitude:420000000 longitude:-730000000 >
name:"Patriots Path, Mendham, NJ 07945, USA" location:<latitude:407838351 longitude:-746143763 >
...
name:"3 Hasta Way, Newton, NJ 07860, USA" location:<latitude:410248224 longitude:-747127767 >
Traversing 56 points.
Route summary: point_count:56 distance:497013163
Got message First message at point(0, 1)
Got message Second message at point(0, 2)
Got message Third message at point(0, 3)
Got message First message at point(0, 1)
Got message Fourth message at point(0, 1)
Got message Second message at point(0, 2)
Got message Fifth message at point(0, 2)
Got message Third message at point(0, 3)
Got message Sixth message at point(0, 3)

8. 次のステップ