Cloud Data Fusion を使用して CSV(カンマ区切り値)データを BigQuery に取り込む - リアルタイムの取り込み

1. はじめに

509db33558ae025.png

最終更新日: 2020 年 2 月 28 日

この Codelab では、CSV 形式の医療データを BigQuery にリアルタイムで取り込むデータ取り込みパターンを示します。このラボでは、Cloud Data Fusion リアルタイム データ パイプラインを使用します。現実的な医療テストデータが生成され、Google Cloud Storage バケット(gs://hcls_testing_data_fhir_10_patients/csv/)で利用できるようになっています。

この Codelab では、次のことを学びます。

  • Cloud Data Fusion を使用して Pub/Sub から BigQuery に CSV データ(リアルタイム読み込み)を取り込む方法。
  • Cloud Data Fusion でデータ統合パイプラインを視覚的に構築して、医療データをリアルタイムで読み込み、変換、マスクする方法。

このデモを実行するには何が必要ですか?

  • GCP プロジェクトへのアクセス権が必要です。
  • GCP プロジェクトにオーナーロールが割り当てられている必要があります。
  • ヘッダーを含む CSV 形式の医療データ。

GCP プロジェクトがない場合は、こちらの手順に沿って新しい GCP プロジェクトを作成します。

CSV 形式の医療データは、gs://hcls_testing_data_fhir_10_patients/csv/ の GCS バケットにプリロードされています。各 CSV リソース ファイルには固有のスキーマ構造があります。たとえば、Patients.csv のスキーマは Providers.csv のスキーマとは異なります。事前に読み込まれたスキーマ ファイルは、gs://hcls_testing_data_fhir_10_patients/csv_schemas にあります。

新しいデータセットが必要な場合は、SyntheaTM を使用していつでも生成できます。次に、[入力データをコピー] ステップでバケットからコピーするのではなく、GCS にアップロードします。

2. GCP プロジェクトの設定

環境のシェル変数を初期化します。

PROJECT_ID を見つけるには、プロジェクトの識別をご覧ください。

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

gsutil ツールを使用して、入力データとエラーログを保存する GCS バケットを作成します。

gsutil mb -l us gs://$BUCKET_NAME

合成データセットにアクセスします。

  1. Cloud コンソールへのログインに使用しているメールアドレスから、hcls-solutions-external+subscribe@google.com に参加リクエストを送信します。
  2. 操作を確認する手順が記載されたメールが届きます。
  3. メールに返信してグループに参加するオプションを使用します。[525a0fa752e0acae.png] ボタンはクリックしないでください。
  4. 確認メールが届いたら、Codelab の次のステップに進みます。

入力データをコピーします。

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

BigQuery データセットを作成します。

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

Google Cloud SDK をインストールして初期化し、Pub/Sub のトピックとサブスクリプションを作成します。

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Cloud Data Fusion 環境の設定

Cloud Data Fusion API を有効にして、必要な権限を付与する手順は次のとおりです。

API を有効にする

  1. GCP Console API ライブラリに移動します。
  2. プロジェクト リストからプロジェクトを選択します。
  3. API ライブラリで、有効にする API(Cloud Data Fusion API、Cloud Pub/Sub API)を選択します。API を検索する場合は検索フィールドやフィルタを使用します。
  4. [API] ページで、[有効にする] をクリックします。

Cloud Data Fusion インスタンスを作成する

  1. GCP Console で、ProjectID を選択します。
  2. 左側のメニューから [Data Fusion] を選択し、ページの中央にある [インスタンスを作成] ボタン(初回作成)をクリックするか、上部のメニューにある [インスタンスを作成] ボタン(追加作成)をクリックします。

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. インスタンス名を指定します。[Enterprise] を選択します。

5af91e46917260ff.png

  1. [作成] ボタンをクリックします。

インスタンスの権限を設定します。

インスタンスを作成したら、次の手順で、インスタンスに関連付けられているサービス アカウントにプロジェクトの権限を付与します。

  1. インスタンス名をクリックして、インスタンスの詳細ページに移動します。

76ad691f795e1ab3.png

  1. サービス アカウントをコピーします。

6c91836afb72209d.png

  1. プロジェクトの IAM ページに移動します。
  2. IAM 権限ページで、[追加] ボタンをクリックして、サービス アカウントに Cloud Data Fusion API サービス エージェントのロールを付与します。[新しいメンバー] フィールドに「サービス アカウント」を貼り付け、[サービス管理] -> [Cloud Data Fusion API サーバー エージェント] ロールを選択します。

36f03d11c2a4ce0.png

  1. [+ 別のロールを追加](または [Cloud Data Fusion API サービス エージェントを編集])をクリックして、Pub/Sub サブスクライバー ロールを追加します。

b4bf5500b8cbe5f9.png

  1. [保存] をクリックします。

これらの手順が完了したら、Cloud Data Fusion インスタンス ページかインスタンスの詳細ページにある [インスタンスを表示] リンクをクリックして、Cloud Data Fusion の使用を開始できます。

ファイアウォール ルールを設定します。

  1. GCP Console -> [VPC ネットワーク] -> [ファイアウォール ルール] に移動して、default-allow-ssh ルールが存在するかどうかを確認します。

102adef44bbe3a45.png

  1. そうでない場合は、すべての上り(内向き)SSH トラフィックをデフォルト ネットワークに許可するファイアウォール ルールを追加します。

コマンドラインを使用する場合:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

UI を使用する場合: [ファイアウォール ルールを作成] をクリックして、情報を入力します。

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. パイプラインのビルドノード

GCP に Cloud Data Fusion 環境が用意できたので、次の手順に沿って Cloud Data Fusion でデータ パイプラインの構築を開始しましょう。

  1. Cloud Data Fusion ウィンドウで、[操作] 列の [インスタンスを表示] リンクをクリックします。別のページにリダイレクトされます。表示された URL をクリックして、Cloud Data Fusion インスタンスを開きます。ようこそポップアップで [ツアーを開始] ボタンまたは [いいえ、結構です] ボタンをクリックしたかどうか。
  2. ハンバーガー メニューを開き、[パイプライン] -> [リスト] を選択します。

317820def934a00a.png

  1. 右上にある緑色の + ボタンをクリックし、[Create Pipeline] を選択します。または、[Create] をクリックしてパイプライン リンクを作成します。

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. パイプライン スタジオが表示されたら、左上のプルダウンから [Data Pipeline - Realtime] を選択します。

372a889a81da5e66.png

  1. Data Pipelines UI の左側のパネルには、[Filter]、[Source]、[Transform]、[Analytics]、[Sink]、[Error Handlers]、[Alerts] などのさまざまなセクションが表示されます。ここで、パイプラインのノードを選択できます。

c63de071d4580f2f.png

ソース ノードを選択します。

  1. 左側のプラグイン パレットの [Source] セクションで、Data Pipelines UI に表示される Google Cloud PubSub ノードをダブルクリックします。
  2. PubSub ソースノードにカーソルを合わせ、[Properties] をクリックします。

ed857a5134148d7b.png

  1. 必須項目を入力します。次のフィールドを設定します。
  • Label = {任意のテキスト}
  • 参照名 = {任意のテキスト}
  • プロジェクト ID = 自動検出
  • Subscription = Pub/Sub トピックの作成セクションで作成したサブスクリプション(例: your-sub
  • Topic = Pub/Sub トピックの作成セクションで作成したトピック(例: your-topic
  1. 詳細については、ドキュメントをクリックしてください。[検証] ボタンをクリックして、入力したすべての情報を検証します。緑色の「エラーは見つかりませんでした」は成功を示します。

5c2774338b66bebe.png

  1. [Pub/Sub Properties] を閉じるには、[X] ボタンをクリックします。

変換 ノードを選択します。

  1. 左側のプラグイン パレットの [変換] セクションで、Data Pipelines UI に表示される [Projection] ノードをダブルクリックします。Pub/Sub ソースノードを Projection 変換ノードに接続します。
  2. Projection ノードにカーソルを合わせ、[Properties] をクリックします。

b3a9a3878879bfd7.png

  1. 必須項目を入力します。次のフィールドを設定します。
  • Convert = message をバイト型から文字列型に変換します。
  • 削除するフィールド = {任意のフィールド}
  • 保持するフィールド = {messagetimestampattributes}(例: Pub/Sub から送信された属性: key=‘filename':value=‘patients')
  • 名前を変更するフィールド = {message, timestamp}
  1. 詳細については、ドキュメントをクリックしてください。[検証] ボタンをクリックして、入力したすべての情報を検証します。緑色の「エラーは見つかりませんでした」は成功を示します。

b8c2f8efe18234ff.png

  1. 左側のプラグイン パレットの [変換] セクションで、Data Pipelines UI に表示される [Wrangler] ノードをダブルクリックします。Projection 変換ノードを Wrangler 変換ノードに接続します。Wrangler ノードにカーソルを合わせ、[プロパティ] をクリックします。

aa44a4db5fe6623a.png

  1. [アクション] プルダウンをクリックして [インポート] を選択し、保存したスキーマ(例: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json)をインポートします。
  2. 最後のフィールドの横にある + ボタンをクリックして、出力スキーマに TIMESTAMP フィールドを追加し(存在しない場合)、[Null] チェックボックスをオンにします。
  3. 必須項目を入力します。次のフィールドを設定します。
  • Label = {任意のテキスト}
  • 入力フィールド名 = {*}
  • 事前条件 = {attributes.get("filename") != "patients"}。PubSub ソースノードから送信された各タイプのレコードまたはメッセージ(患者、プロバイダ、アレルギーなど)を区別します。
  1. 詳細については、ドキュメントをクリックしてください。[検証] ボタンをクリックして、入力したすべての情報を検証します。緑色の「エラーは見つかりませんでした」は成功を示します。

3b8e552cd2e3442c.png

  1. 列名を任意の順序で設定し、不要なフィールドを削除します。次のコードスニペットをコピーして、レシピ ボックスに貼り付けます。
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. データ マスキングと匿名化については、Batch-Codelab - CSV to BigQuery via CDF をご覧ください。または、レシピ ボックスに mask-number SSN xxxxxxx#### というコード スニペットを追加します。
  2. [変換のプロパティ] ウィンドウを閉じるには、[X] ボタンをクリックします。

Sink ノードを選択します。

  1. 左側のプラグイン パレットの [Sink] セクションで、[BigQuery] ノードをダブルクリックします。このノードは、データ パイプライン UI に表示されます。Wrangler 変換ノードを BigQuery シンクノードに接続します。
  2. BigQuery シンクノードにカーソルを合わせ、[プロパティ] をクリックします。

1be711152c92c692.png

  1. 必須フィールドに入力します。
  • Label = {任意のテキスト}
  • 参照名 = {任意のテキスト}
  • プロジェクト ID = 自動検出
  • データセット = 現在のプロジェクトで使用されている BigQuery データセット(DATASET_ID など)
  • Table = {テーブル名}
  1. 詳細については、ドキュメントをクリックしてください。[検証] ボタンをクリックして、入力したすべての情報を検証します。緑色の「エラーは見つかりませんでした」は成功を示します。

bba71de9f31e842a.png

  1. [BigQuery Properties] を閉じるには、[X] ボタンをクリックします。

5. リアルタイム データ パイプラインを構築する

前のセクションでは、Cloud Data Fusion でデータ パイプラインを構築するために必要なノードを作成しました。このセクションでは、ノードを接続して実際のパイプラインを構築します。

パイプライン内のすべてのノードを接続する

  1. ソースノードの右端から接続矢印 > をドラッグして宛先ノードの左端でドロップします。
  2. パイプラインには、同じ PubSub ソースノードからパブリッシュされたメッセージを取得する複数のブランチを設定できます。

b22908cc35364cdd.png

  1. パイプラインに名前を付けます。

以上です。これで、デプロイして実行する最初のリアルタイム データ パイプラインが作成されました。

Cloud Pub/Sub を介してメッセージを送信する

Pub/Sub UI を使用する場合:

  1. GCP コンソール -> Pub/Sub -> トピックに移動し、your-topic を選択して、上部のメニューで [メッセージをパブリッシュ] をクリックします。

d65b2a6af1668ecd.png

  1. Message フィールドには一度に 1 つのレコード行のみを配置します。[+ 属性を追加] ボタンをクリックします。Key = filename、Value = <type of record>(例: patients、providers、allergies など)を指定します。
  2. [公開] ボタンをクリックしてメッセージを送信します。

gcloud コマンドを使用します。

  1. メッセージを手動で指定します。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. catsed の Unix コマンドを使用して、メッセージを半自動的に提供します。このコマンドは、異なるパラメータを指定して繰り返し実行できます。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. パイプラインの構成、デプロイ、実行

データ パイプラインを開発したので、Cloud Data Fusion にデプロイして実行できます。

1bb5b0b8e2953ffa.png

  1. [構成] のデフォルトをそのまま使用します。
  2. [プレビュー] をクリックして、データをプレビューします。[**プレビュー**] をもう一度クリックすると、前のウィンドウに戻ります。[**実行**] をクリックして、パイプラインをプレビュー モードで実行することもできます。

b3c891e5e1aa20ae.png

  1. [ログ] をクリックしてログを表示します。
  2. [保存] をクリックして、すべての変更を保存します。
  3. 新しいパイプラインを構築するときに、保存したパイプライン構成をインポートするには、[インポート] をクリックします。
  4. [エクスポート] をクリックして、パイプライン構成をエクスポートします。
  5. [デプロイ] をクリックして、パイプラインをデプロイします。
  6. デプロイしたら [Run] をクリックし、パイプラインの実行が完了するのを待ちます。

f01ba6b746ba53a.png

  1. [停止] をクリックすると、いつでもパイプラインの実行を停止できます。
  2. パイプラインを複製するには、[アクション] ボタンで [複製] を選択します。
  3. パイプライン構成をエクスポートするには、[アクション] ボタンで [エクスポート] を選択します。

28ea4fc79445fad2.png

  1. [概要] をクリックすると、実行履歴、レコード、エラーログ、警告のグラフが表示されます。

7. 検証

このセクションでは、データ パイプラインの実行を検証します。

  1. パイプラインが正常に実行され、継続的に実行されていることを確認します。

1644dfac4a2d819d.png

  1. TIMESTAMP に基づいて BigQuery テーブルに更新されたレコードが読み込まれていることを確認します。この例では、2 つの患者レコードまたはメッセージと 1 つのアレルギー レコードまたはメッセージが 2019 年 6 月 25 日に Pub/Sub トピックにパブリッシュされました。
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. <your-topic> にパブリッシュされたメッセージが <your-sub> サブスクライバーによって受信されたことを確認します。
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

結果の表示

リアルタイム パイプラインの実行中にメッセージが Pub/Sub トピックにパブリッシュされた後の結果を表示するには:

  1. BigQuery UI でテーブルに対してクエリを実行します。BigQuery UI に移動
  2. 次のクエリを、実際のプロジェクト名、データセット、テーブルに更新します。

6a1fb85bd868abc9.png

8. クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud Platform アカウントに課金されないようにする手順は次のとおりです。

チュートリアルが終了したら、GCP で作成したリソースをクリーンアップして、割り当てを使い果たしたり、今後料金が発生しないようにします。次のセクションで、このようなリソースを削除または無効にする方法を説明します。

BigQuery データセットの削除

このチュートリアルで作成した BigQuery データセットを削除するには、次の手順を行います。

GCS バケットの削除

このチュートリアルで作成した GCS バケットを削除する手順は次のとおりです。

Cloud Data Fusion インスタンスの削除

手順に沿って Cloud Data Fusion インスタンスを削除します

プロジェクトの削除

課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには、次の操作を行います。

  1. GCP Console で [プロジェクト] ページに移動します。プロジェクト ページに移動
  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

9. 完了

お疲れさまでした。Cloud Data Fusion を使用して BigQuery に医療データを取り込むコードラボを完了しました。

CSV データを Pub/Sub トピックにパブリッシュし、BigQuery に読み込みました。

ヘルスケア データの読み込み、変換、マスキングを行うデータ統合パイプラインをリアルタイムで視覚的に構築しました。

これで、Google Cloud Platform の BigQuery でヘルスケア データ分析を開始するために必要な主な手順を理解できました。