Google Cloud Dataflow を使用して最初の SQL ステートメントを実行する

1. はじめに

Cloud-Dataflow.png

Google Cloud Dataflow

最終更新日: 2020 年 5 月 26 日

Dataflow とは

Dataflow は、さまざまなデータ処理パターンの実行に対応したマネージド サービスです。このサイトのドキュメントでは、Dataflow を使用してバッチおよびストリーミングのデータ処理パイプラインをデプロイする方法と、サービス機能の使用方法について説明しています。

Apache Beam SDK はオープンソースのプログラミング モデルであり、バッチとストリーミングの両方のパイプラインの開発に使用できます。Apache Beam プログラムでパイプラインを作成し、Dataflow サービスで実行します。Apache Beam のドキュメントには、詳細なコンセプト情報と Apache Beam のプログラミング モデル、SDK、その他のランナーのリファレンス情報が記載されています。

高速なストリーミング データ分析

Dataflow を使用すると、データ レイテンシを低く抑えながら、ストリーミング データ パイプラインを迅速かつ簡単に開発できます。

運用と管理の簡素化

Dataflow のサーバーレス アプローチにより、データ エンジニアリングのワークロードから運用上のオーバーヘッドが取り除かれ、チームはサーバー クラスタの管理ではなくプログラミングに集中できます。

総所有コストを削減

リソースの自動スケーリングとコスト最適化されたバッチ処理機能を組み合わせることにより、Dataflow で実質無制限の容量を利用できます。過剰な費用をかけずに、時季変動したり急変動したりするワークロードを管理できます。

主な機能

自動化されたリソース管理と動的作業再調整

Dataflow は、処理リソースのプロビジョニングと管理を自動化してレイテンシを最小限に抑え、使用率を最大化します。そのため、手動でインスタンスをスピンアップしたり予約したりする必要はありません。作業のパーティショニングも自動化、最適化され、遅れている作業を動的に再調整します。「ホットキー」を追跡する必要がない入力データを前処理することもできます。

水平自動スケーリング

ワーカー リソースの水平自動スケーリングによってスループットが最適化されるため、全体的なコスト パフォーマンスが向上します。

バッチ処理向けの柔軟なリソース スケジューリング料金

夜間のジョブなど、ジョブのスケジュールに柔軟性を持たせた処理には、Flexible Resource Scheduling(FlexRS)の方が低価格でバッチ処理できます。このようなフレキシブル ジョブはキューに入り、6 時間以内に確実にキューから取り出されて実行されます。

ワークショップの一環として実施する

この Codelab では、Dataflow SQL UI から SQL ステートメントを送信して Dataflow SQL の使用を開始します。次に、Dataflow Monitoring UI を使用して、実行中のパイプラインを確認します。

学習内容

  • Dataflow SQL UI で SQL ステートメントを Dataflow ジョブとして送信する方法。
  • Dataflow パイプラインに移動する方法
  • SQL ステートメントで作成された Dataflow グラフを調べる。
  • グラフで提供されるモニタリング情報を確認する。

必要なもの

  • 課金が有効になっている Google Cloud Platform プロジェクト。
  • Google Cloud Dataflow と Google Cloud PubSub が有効になっている。

2. 設定方法

Dataflow API と Cloud Pub/Sub API が有効になっていることを確認します。これを確認するには、API のサービスページ。

3. Dataflow SQL UI にアクセスする

Dataflow SQL UI は、Dataflow SQL ジョブを作成するための BigQuery ウェブ UI の設定です。Dataflow SQL UI には BigQuery ウェブ UI からアクセスできます。

  1. BigQuery ウェブ UI に移動します。

  1. Cloud Dataflow エンジンに切り替えます。
  • [展開] プルダウン メニューを開いて、[クエリの設定] を選択します。

BigQuery ウェブ UI の [展開] プルダウン メニュー。クエリ設定オプションが選択されています。

  • [クエリの設定] メニューで [Dataflow エンジン] を選択します。
  • Dataflow API と Data Catalog API が有効になっていない場合に表示されるプロンプトで、[API を有効にする] をクリックします。

[API を有効にする] プロンプトが表示された [クエリの設定] メニュー

  • [保存] をクリックします。[API を有効にする] プロンプトが表示された [クエリの設定] メニュー。[Dataflow エンジン] ラジオボタンが選択されています。

Dataflow SQL UI には Dataflow モニタリング インターフェースからもアクセスできます。

  • Dataflow モニタリング インターフェースに移動します。

  • [SQL からジョブを作成] をクリックします。

4. SQL クエリを使用して Dataflow ジョブを実行する

Dataflow SQL クエリの作成

Dataflow SQL クエリは、Dataflow SQL クエリ構文を使用します。Dataflow SQL のクエリ構文は BigQuery 標準 SQL に似ています。Dataflow SQL ストリーミング拡張機能を使用すると、Pub/Sub などの Dataflow ソースを継続的に更新するデータを集約できます。たとえば、次のクエリは、タクシー乗車の Pub/Sub ストリームで 1 分ごとに乗客をカウントします。

SELECT
  TUMBLE_START('INTERVAL 1 MINUTE') as period_start,
  SUM(passenger_count) AS pickup_count
FROM pubsub.topic.`pubsub-public-data`.`taxirides-realtime`
WHERE
  ride_status = "pickup"
GROUP BY
  TUMBLE(event_timestamp, 'INTERVAL 1 MINUTE')

Dataflow SQL クエリの実行

Dataflow SQL クエリを実行すると、Dataflow はクエリを Apache Beam パイプラインに変換して実行します。

Dataflow SQL クエリは、Cloud コンソールまたは gcloud コマンドライン ツールを使用して実行できます。

Dataflow SQL クエリを実行するには、Dataflow SQL UI を使用します。

  • Dataflow SQL UI に移動します。
  • 上記の Dataflow SQL クエリをクエリエディタに入力します。
  • [Cloud Dataflow ジョブを作成] をクリックして、ジョブ オプションのパネルを開きます。
  • (省略可)[オプション パラメータを表示] をクリックしてリストを参照します。
  • パネルの [送信先] セクションで、出力タイプとして [BigQuery] を選択します。

1155e94529ff58fe.png

  • データセット ID を選択し、「passengers_per_min」というテーブル名で作成します
  • [作成] をクリックします。

データのクエリと Dataflow SQL クエリ結果の書き込みの詳細については、Dataflow SQL を使用するをご覧ください。

5. Dataflow Monitoring UI でのジョブの探索

Dataflow マネージド サービスを使用してパイプラインを実行する場合、Dataflow のウェブベースのモニタリング ユーザー インターフェースを使用して、そのジョブや他のジョブを表示できます。この監視インターフェースを使用すると、Dataflow ジョブを表示して操作することができます。

Dataflow モニタリング インターフェースにアクセスするには、Google Cloud コンソールを使用します。このモニタリング インターフェースには、次の項目が表示されます。

  • 現在実行中の Dataflow ジョブと過去 30 日以内に実行されたすべての Dataflow ジョブのリスト。
  • 各パイプラインの図
  • ジョブのステータス、実行、および SDK のバージョンに関する詳細
  • Compute Engine や Cloud Storage など、パイプラインを実行している Google Cloud サービスに関する情報へのリンク。
  • ジョブ中に発生したエラーまたは警告

ジョブ モニタリング グラフは、Dataflow モニタリング インターフェース内で表示できます。これらのグラフには、パイプライン ジョブの期間中の指標が表示され、次の情報が含まれます。

  • パイプラインの遅れを引き起こしている可能性のあるステップを特定するためのステップレベルの可視性。
  • 異常な動作を表面化させることのできる統計情報。
  • 参照元やシンク内のボトルネックの特定に役立つ I/O の指標。

Dataflow モニタリング インターフェースへのアクセス

Dataflow モニタリング インターフェースにアクセスする方法は次のとおりです。

  • Cloud コンソールに移動します。

  • Google Cloud プロジェクトを選択します。
  • 左上隅のメニューをクリックします。
  • [ビッグデータ] セクションに移動し、[Dataflow] をクリックします。

Dataflow ジョブのリストとそれぞれのステータスが表示されます。

58f6402cc923d6bb.png

ジョブの状態が実行中、失敗、成功である Cloud コンソールの Dataflow ジョブのリスト。

ジョブのステータスは以下のとおりです。

  • -: Monitoring UI が Dataflow サービスからまだステータスを受信していません。
  • 実行中: ジョブは現在実行中です。
  • 開始前: ジョブは作成されていますが、システムが起動する前に準備する時間が必要です。
  • キューに格納済み: FlexRS ジョブがキューに格納されています。
  • キャンセルしています...: ジョブはキャンセル中です
  • キャンセルされました: ユーザーがジョブをキャンセルしました。
  • ドレイン中...: ジョブはドレインされています
  • ドレインされました: ユーザーがジョブをドレインしました。
  • 更新中...: ジョブは更新中です。
  • 更新されました: ユーザーがジョブを更新しました。
  • 完了しました: ジョブは正常に終了しました。
  • 失敗しました: ジョブは完了しませんでした。

dfsql でジョブを検索する名前をクリックして名前をクリックします。

e3bce9b1929564c9.png

[ジョブの詳細] ページ。次の情報が表示されます。

  • ジョブグラフ: パイプラインの視覚的な表現
  • ジョブの指標: ジョブの実行に関する指標
  • [ジョブ情報] パネル: パイプラインに関する説明
  • ジョブのログ: Dataflow サービスによってジョブレベルで生成されたログ
  • ワーカーログ: Dataflow サービスによってワーカーレベルで生成されたログ
  • ジョブのエラーレポート: 選択したタイムラインでエラーが発生した場所と、記録されたすべてのエラーの数がグラフで表示されます。
  • 期間選択ツール: 指標の期間を調整できるツール

[ジョブの詳細] ページの [ジョブ] グラフと [ジョブの指標] タブを使用して、ジョブビューを切り替えることができます。

  • [ジョブの指標] タブをクリックして、グラフを確認します。

7076f8a8911066ab.png

6. Dataflow SQL ジョブの停止

Dataflow SQL ジョブを停止するには、キャンセル コマンドを使用します。ドレインで Dataflow SQL ジョブを停止することはできません。

1229697a873e1cf9.png