Cloud Data Fusion を使用して CSV(カンマ区切り値)データを BigQuery に取り込む - リアルタイムの取り込み

1. はじめに

509db33558ae025.png

最終更新日: 2020 年 2 月 28 日

この Codelab では、CSV 形式の医療データをリアルタイムで BigQuery に取り込むデータ取り込みパターンについて説明します。このラボでは、Cloud Data Fusion リアルタイム データ パイプラインを使用します。現実的な医療検査データが生成され、Google Cloud Storage バケット(gs://hcls_testing_data_fhir_10_patients/csv/)で利用可能です。

この Codelab では、以下について学びます。

  • Cloud Data Fusion を使用して Pub/Sub から BigQuery に CSV データを取り込む(リアルタイム読み込み)方法。
  • 医療データをリアルタイムで読み込み、変換、マスキングするためのデータ統合パイプラインを Cloud Data Fusion で視覚的に構築する方法。

このデモを実行するには何が必要ですか?

  • GCP プロジェクトへのアクセス権が必要です。
  • GCP プロジェクトのオーナーのロールが割り当てられている必要があります。
  • ヘッダーを含む CSV 形式の医療データ。

GCP プロジェクトがない場合は、こちらの手順に沿って新しい GCP プロジェクトを作成します。

CSV 形式の医療データは、gs://hcls_testing_data_fhir_10_patients/csv/ の GCS バケットにプリロードされています。CSV リソース ファイルには、それぞれ固有のスキーマ構造があります。たとえば、Patients.csv と Providers.csv のスキーマは異なります。プリロードされたスキーマ ファイルは gs://hcls_testing_data_fhir_10_patients/csv_schemas にあります。

新しいデータセットが必要な場合は、SyntheaTM を使用していつでも生成できます。入力データのコピーのステップでバケットからコピーするのではなく、GCS にアップロードします。

2. GCP プロジェクトの設定

環境のシェル変数を初期化します。

PROJECT_ID を確認するには、プロジェクトの識別をご覧ください。

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

gsutil ツールを使用して、入力データとエラーログを保存する GCS バケットを作成します。

gsutil mb -l us gs://$BUCKET_NAME

合成データセットへのアクセス権を取得します。

  1. Cloud コンソールへのログインに使用しているメールアドレスから、hcls-solutions-external+subscribe@google.com 宛てにメールを送信し、参加をリクエストします。
  2. 操作の確認方法が記載されたメールが届きます。
  3. メールに返信するオプションを使用してグループに参加します。525a0fa752e0acae.png ボタンはクリックしないでください。
  4. 確認メールが届いたら、Codelab の次のステップに進むことができます。

入力データをコピーする。

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

BigQuery データセットを作成します。

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

Google Cloud SDK をインストールして初期化し、Pub/Sub のトピックとサブスクリプションを作成します。

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Cloud Data Fusion 環境の設定

次の手順に沿って Cloud Data Fusion API を有効化し、必要な権限を付与します。

API を有効にする

  1. GCP Console API ライブラリに移動します。
  2. プロジェクト リストからプロジェクトを選択します。
  3. API ライブラリで、有効にする API(Cloud Data Fusion API、Cloud Pub/Sub API)を選択します。API を検索する場合は、検索フィールドとフィルタを使用します。
  4. [API] ページで、[有効にする] をクリックします。

Cloud Data Fusion インスタンスを作成する

  1. GCP コンソールで、プロジェクト ID を選択します。
  2. 左側のメニューから [Data Fusion] を選択し、ページの中央にある [インスタンスを作成] ボタンをクリックする(最初の作成)か、上部のメニューにある [インスタンスを作成] ボタンをクリックします(追加作成)。

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. インスタンス名を入力します。[Enterprise] を選択します。

5af91e46917260ff.png

  1. [作成] ボタンをクリックします。

インスタンスの権限を設定する。

インスタンスを作成したら、次の手順に沿って、インスタンスに関連付けられたサービス アカウントにプロジェクトの権限を付与します。

  1. インスタンス名をクリックして、インスタンスの詳細ページに移動します。

76ad691f795e1ab3.png

  1. サービス アカウントをコピーします。

6c91836afb72209d.png

  1. プロジェクトの IAM ページに移動します。
  2. IAM の権限ページで [追加] ボタンをクリックして、サービス アカウントに Cloud Data Fusion API サービス エージェントのロールを付与します。[サービス アカウント] を[新しいメンバー] フィールドで [サービス管理] ->Cloud Data Fusion API サーバー エージェントのロール。

36f03d11c2a4ce0.png

  1. [+ 別のロールを追加](または [Cloud Data Fusion API サービス エージェントを編集])をクリックして、Pub/Sub サブスクライバーのロールを追加します。

b4bf5500b8cbe5f9.png

  1. [保存] をクリックします。

これらの手順を完了したら、Cloud Data Fusion インスタンス ページまたはインスタンスの詳細ページで [インスタンスを表示] リンクをクリックして、Cloud Data Fusion の使用を開始できます。

ファイアウォール ルールを設定します。

  1. GCP コンソール ->VPC ネットワーク ->default-allow-ssh ルールが存在するかどうかを確認するファイアウォール ルール。

102adef44bbe3a45.png

  1. そうでない場合は、デフォルト ネットワークへのすべての上り(内向き)SSH トラフィックを許可するファイアウォール ルールを追加します。

コマンドラインを使用する場合:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

UI を使用する場合: [ファイアウォール ルールを作成] をクリックして、情報を入力します。

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. パイプライン用のノードをビルドする

GCP に Cloud Data Fusion 環境が用意できたので、Cloud Data Fusion でデータ パイプラインの構築を始めましょう。手順は次のとおりです。

  1. Cloud Data Fusion ウィンドウで、[アクション] 列の [インスタンスを表示] リンクをクリックします。別のページにリダイレクトされます。表示された [url] をクリックして、Cloud Data Fusion インスタンスを開きます。[ガイドを見る] をクリックするかどうかの選択または「利用しない」[ようこそ]ポップアップが表示されます
  2. 「ハンバーガー」セクションを展開します[パイプライン] ->リスト

317820def934a00a.png

  1. 右上にある緑色の [+] ボタンをクリックし、[Create Pipeline] を選択します。または [作成] をクリックしますパイプラインへのリンクがあります

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. パイプライン スタジオが表示されたら、左上のプルダウンから [Data Pipeline - Realtime] を選択します。

372a889a81da5e66.png

  1. Data Pipelines UI の左側のパネルには、フィルタ、ソース、変換、分析、シンク、エラーハンドラ、アラートなどのさまざまなセクションがあり、パイプラインのノードを選択できます。

c63de071d4580f2f.png

ソース ノードを選択します。

  1. 左側のプラグイン パレットの [Source] セクションで、[Google Cloud PubSub] ノードをダブルクリックします。このノードは Data Pipelines UI に表示されます。
  2. PubSub ソースノードにカーソルを合わせて、[Properties] をクリックします。

ed857a5134148d7b.png

  1. 必須項目に入力します。次のフィールドを設定します。
  • ラベル = {任意のテキスト}
  • 参照名 = {任意のテキスト}
  • プロジェクト ID = 自動検出
  • Subscription = [Pub/Sub トピックの作成] セクションで作成されたサブスクリプション(your-sub など)
  • Topic = [Pub/Sub トピックの作成] セクションで作成されたトピック(your-topic など)
  1. 詳しい説明については、[Documentation] をクリックしてください。[検証] ボタンをクリックして、すべての入力情報を検証します。緑色: エラーは見つかりませんでした成功を示します。

5c2774338b66bebe.png

  1. Pub/Sub プロパティを閉じるには、[X] ボタンをクリックします。

[変換 ] ノードを選択します。

  1. 左側のプラグイン パレットの [Transform] セクションで、Data Pipelines UI に表示される [Projection] ノードをダブルクリックします。Pub/Sub ソースノードを Projection 変換ノードに接続する。
  2. [投影] ノードにカーソルを合わせて、[プロパティ] をクリックします。

b3a9a3878879bfd7.png

  1. 必須項目に入力します。次のフィールドを設定します。
  • Convert = message をバイト型から文字列型に変換します。
  • 破棄するフィールド = {任意のフィールド}
  • 保持するフィールド = {メッセージタイムスタンプ属性}(たとえば、Attributes: key=‘filename':value=‘patients' sent from Pub/Sub)
  • 名前を変更するフィールド = {message, timestamp}
  1. 詳しい説明については、[Documentation] をクリックしてください。[検証] ボタンをクリックして、すべての入力情報を検証します。緑色: エラーは見つかりませんでした成功を示します。

b8c2f8efe18234ff.png

  1. 左側のプラグイン パレットの [Transform] セクションで、Data Pipelines UI に表示される [Wrangler] ノードをダブルクリックします。Projection 変換ノードを Wrangler 変換ノードに接続します。Wrangler ノードにカーソルを合わせ、[Properties] をクリックします。

aa44a4db5fe6623a.png

  1. [Actions] プルダウンをクリックして [Import] を選択し、保存したスキーマ(例: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json)をインポートします。
  2. 最後のフィールドの横にある [+] ボタンをクリックし、[Null] を選択して、出力スキーマに TIMESTAMP フィールドを追加します(存在しない場合)。のボックスを選択します。
  3. 必須項目に入力します。次のフィールドを設定します。
  • ラベル = {任意のテキスト}
  • 入力フィールド名 = {*}
  • Precondition = {attributes.get("filename") != "patients"} は、PubSub ソースノードから送信されたレコードやメッセージの各タイプ(たとえば、患者、医療機関、アレルギーなど)を区別します。
  1. 詳しい説明については、[Documentation] をクリックしてください。[検証] ボタンをクリックして、すべての入力情報を検証します。緑色: エラーは見つかりませんでした成功を示します。

3b8e552cd2e3442c.png

  1. 適切な順序で列名を設定し、不要なフィールドを削除します。次のコード スニペットをコピーして、[Recipe] ボックスに貼り付けます。
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. データのマスキングと匿名化については、Batch-Codelab - CSV to BigQuery via CDF をご覧ください。または、レシピボックスにこのコード スニペット mask-number SSN xxxxxxx#### を追加します。
  2. [Transform Properties] ウィンドウを閉じるには、[X] ボタンをクリックします。

シンクノードを選択します。

  1. 左側のプラグイン パレットの [Sink] セクションで、データ パイプライン UI に表示される [BigQuery] ノードをダブルクリックします。Wrangler 変換ノードを BigQuery シンクノードに接続します。
  2. BigQuery シンクノードにカーソルを合わせて [プロパティ] をクリックします。

1be711152c92c692.png

  1. 以下の必須項目を入力します。
  • ラベル = {任意のテキスト}
  • 参照名 = {任意のテキスト}
  • プロジェクト ID = 自動検出
  • データセット = 現在のプロジェクトで使用されている BigQuery データセット(DATASET_ID など)
  • テーブル = {テーブル名}
  1. 詳しい説明については、[Documentation] をクリックしてください。[検証] ボタンをクリックして、すべての入力情報を検証します。緑色: エラーは見つかりませんでした成功を示します。

bba71de9f31e842a.png

  1. BigQuery のプロパティを閉じるには、[X] ボタンをクリックします。

5. リアルタイムのデータ パイプラインの構築

前のセクションでは、Cloud Data Fusion でデータ パイプラインを構築するために必要なノードを作成しました。このセクションでは、ノードを接続して実際のパイプラインを構築します。

パイプラインのすべてのノードを接続する

  1. 接続用の矢印をドラッグ >をドロップします。
  2. パイプラインは、同じ PubSub ソースノードからパブリッシュされたメッセージを取得する複数のブランチを持つことができます。

b22908cc35364cdd.png

  1. パイプラインに名前を付けます。

以上です。デプロイして実行する最初のリアルタイム データ パイプラインを作成しました。

Cloud Pub/Sub 経由でメッセージを送信する

Pub/Sub UI を使用する場合:

  1. GCP コンソール ->Pub/Sub ->トピックを選択し、トピックを選択して、上部のメニューで [メッセージをパブリッシュ] をクリックします。

d65b2a6af1668ecd.png

  1. [Message] フィールドには、一度に 1 つのレコード行のみを入力します。[+ 属性を追加] ボタンをクリックします。指定: キー = ファイル名、値 = <レコードのタイプ>(患者、医療機関、アレルギーなど)。
  2. [公開] ボタンをクリックしてメッセージを送信します。

gcloud コマンドを使用する場合:

  1. メッセージを手動で入力します。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. catsed の unix コマンドを使用して、メッセージを半自動的に提供します。このコマンドは、さまざまなパラメータを指定して繰り返し実行できます。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. パイプラインの構成、デプロイ、実行

データ パイプラインを開発したので、それを Cloud Data Fusion にデプロイして実行します。

1bb5b0b8e2953ffa.png

  1. [構成] はデフォルトのままにします。
  2. [プレビュー] をクリックしてデータをプレビューします**。**[**プレビュー**] をもう一度クリックすると、前のウィンドウに戻ります。[**実行**] をクリックして、パイプラインをプレビュー モードで実行することもできます。

b3c891e5e1aa20ae.png

  1. [ログ] をクリックしてログを表示します。
  2. [保存] をクリックしてすべての変更を保存します。
  3. [Import] をクリックして、新しいパイプラインの構築時に保存したパイプライン構成をインポートします。
  4. [エクスポート] をクリックして、パイプライン構成をエクスポートします。
  5. [デプロイ] をクリックしてパイプラインをデプロイします。
  6. デプロイしたら [Run] をクリックし、パイプラインの実行が完了するのを待ちます。

f01ba6b746ba53a.png

  1. [停止] をクリックすると、パイプラインの実行をいつでも停止できます。
  2. [アクション] ボタンの下にある [複製] を選択すると、パイプラインを複製できます。
  3. パイプライン構成をエクスポートするには、[Actions] ボタンで [Export] を選択します。

28ea4fc79445fad2.png

  1. [Summary] をクリックすると、実行履歴、レコード、エラーログ、警告のグラフが表示されます。

7. 確認事項

このセクションでは、データ パイプラインの実行を検証します。

  1. パイプラインが正常に実行され、継続的に実行されていることを確認します。

1644dfac4a2d819d.png

  1. BigQuery テーブルに、TIMESTAMP に基づいて更新されたレコードが読み込まれていることを検証します。この例では、2019 年 6 月 25 日に 2 つの患者レコードまたはメッセージと、1 つのアレルギー レコードまたはメッセージが Pub/Sub トピックにパブリッシュされています。
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. <your-topic> にパブリッシュされたメッセージを検証するが <your-sub> に受信されました。購読者限定になります
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

結果の表示

リアルタイム パイプラインの実行中に Pub/Sub トピックにメッセージがパブリッシュされた後に結果を表示するには:

  1. BigQuery UI でテーブルにクエリを実行します。BigQuery UI に移動
  2. 以下のクエリを、実際のプロジェクト名、データセット、テーブルに変更します。

6a1fb85bd868abc9.png

8. クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud Platform アカウントに課金されないようにする手順は次のとおりです。

チュートリアルが終了したら、GCP で作成したリソースをクリーンアップして、今後料金が発生しないようにします。次のセクションで、このようなリソースを削除または無効にする方法を説明します。

BigQuery データセットの削除

手順に沿って、このチュートリアルで作成した BigQuery データセットを削除します。

GCS バケットの削除

手順に沿って、このチュートリアルで作成した GCS バケットを削除します。

Cloud Data Fusion インスタンスの削除

Cloud Data Fusion インスタンスの削除の手順に沿って操作します。

プロジェクトの削除

課金を停止する最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには、次の操作を行います。

  1. GCP Console でプロジェクト ページに移動します。プロジェクト ページに移動
  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

9. 完了

これで、Cloud Data Fusion を使用して BigQuery に医療データを取り込む Codelab が完了しました。

CSV データを Pub/Sub トピックに公開してから、BigQuery に読み込みました。

医療データをリアルタイムで読み込み、変換、マスキングするためのデータ統合パイプラインを視覚的に構築しました。

ここでは、Google Cloud Platform で BigQuery を使用して医療データ分析の取り組みを開始するために必要な主な手順を学習しました。