Cloud Data Fusion を使用して CSV データを BigQuery に取り込む - バッチ取り込み

1. はじめに

12fb66cc134b50ef.png

最終更新日: 2020 年 2 月 28 日

この Codelab では、CSV 形式の医療データを BigQuery に一括で取り込むためのデータ取り込みパターンについて説明します。このラボでは、Cloud Data Fusion の Batch Data パイプラインを使用します。現実的な医療検査データが生成され、Google Cloud Storage バケット(gs://hcls_testing_data_fhir_10_patients/csv/)で利用できるようになります。

この Codelab では、以下について学びます。

  • Cloud Data Fusion を使用して GCS から BigQuery に CSV データを取り込む(バッチ スケジュール読み込み)方法。
  • 医療データを一括で読み込み、変換、マスキングするためのデータ統合パイプラインを Cloud Data Fusion で視覚的に構築する方法。

この Codelab を実行するには何が必要ですか?

  • GCP プロジェクトへのアクセス権が必要です。
  • GCP プロジェクトのオーナーのロールが割り当てられている必要があります。
  • ヘッダーを含む CSV 形式の医療データ。

GCP プロジェクトが存在しない場合は、こちらの手順に沿って新しい GCP プロジェクトを作成します。

CSV 形式の医療データは、gs://hcls_testing_data_fhir_10_patients/csv/ で GCS バケットにプリロードされています。各リソースの CSV ファイルには、固有のスキーマ構造があります。たとえば、Patients.csv と Providers.csv のスキーマは異なります。プリロードされたスキーマ ファイルは gs://hcls_testing_data_fhir_10_patients/csv_schemas にあります。

新しいデータセットが必要な場合は、SyntheaTM を使用していつでも生成できます。入力データのコピーのステップでバケットからコピーするのではなく、GCS にアップロードします。

2. GCP プロジェクトの設定

環境のシェル変数を初期化します。

PROJECT_ID を確認するには、プロジェクトの識別をご覧ください。

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

gsutil ツールを使用して、入力データとエラーログを保存する GCS バケットを作成します。

gsutil mb -l us gs://$BUCKET_NAME

合成データセットへのアクセス権を取得します。

  1. Cloud コンソールへのログインに使用しているメールアドレスから、hcls-solutions-external+subscribe@google.com 宛てにメールを送信し、参加をリクエストします。
  2. 操作の確認方法が記載されたメールが届きます。525a0fa752e0acae.png
  3. メールに返信するオプションを使用してグループに参加します。このボタンはクリックしないでください。
  4. 確認メールが届いたら、Codelab の次のステップに進むことができます。

入力データをコピーする。

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

BigQuery データセットを作成します。

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

3. Cloud Data Fusion 環境の設定

次の手順に沿って Cloud Data Fusion API を有効化し、必要な権限を付与します。

API を有効にします

  1. GCP Console API ライブラリに移動します。
  2. プロジェクト リストからプロジェクトを選択します。
  3. API ライブラリで、有効にする API を選択します。API を検索する場合は、検索フィールドやフィルタを使用します。
  4. [API] ページで、[有効にする] をクリックします。

Cloud Data Fusion インスタンスを作成します

  1. GCP コンソールで、プロジェクト ID を選択します。
  2. 左側のメニューから [Data Fusion] を選択し、ページの中央にある [インスタンスを作成] ボタンをクリックする(最初の作成)か、上部のメニューにある [インスタンスを作成] ボタンをクリックします(追加作成)。

a828690ff3bf3c46.png

8372c944c94737ea.png

  1. インスタンス名を入力します。[Enterprise] を選択します。

5af91e46917260ff.png

  1. [作成] ボタンをクリックします。

インスタンスの権限を設定する。

インスタンスを作成したら、次の手順に沿って、インスタンスに関連付けられたサービス アカウントにプロジェクトの権限を付与します。

  1. インスタンス名をクリックして、インスタンスの詳細ページに移動します。

76ad691f795e1ab3.png

  1. サービス アカウントをコピーします。

6c91836afb72209d.png

  1. プロジェクトの IAM ページに移動します。
  2. IAM の権限のページで、サービス アカウントを新しいメンバーとして追加し、Cloud Data Fusion API サービス エージェントのロールを付与します。[追加] ボタンをクリックし、「サービス アカウント」を貼り付けます。[新しいメンバー] フィールドで [サービス管理] ->Cloud Data Fusion API サーバー エージェントのロール。
  3. ea68b28d917a24b1.png
  4. [保存] をクリックします。

これらの手順を完了したら、Cloud Data Fusion インスタンス ページまたはインスタンスの詳細ページで [インスタンスを表示] リンクをクリックして、Cloud Data Fusion の使用を開始できます。

ファイアウォール ルールを設定します。

  1. GCP コンソール ->VPC ネットワーク ->default-allow-ssh ルールが存在するかどうかを確認するファイアウォール ルール。

102adef44bbe3a45.png

  1. そうでない場合は、デフォルト ネットワークへのすべての上り(内向き)SSH トラフィックを許可するファイアウォール ルールを追加します。

コマンドラインを使用する場合:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

UI を使用する場合: [ファイアウォール ルールを作成] をクリックして、情報を入力します。

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. 変換用のスキーマを構築する

GCP に Cloud Fusion 環境ができたので、スキーマを作成しましょう。このスキーマは CSV データを変換するために必要です。

  1. Cloud Data Fusion ウィンドウで、[アクション] 列の [インスタンスを表示] リンクをクリックします。別のページにリダイレクトされます。表示された [url] をクリックして、Cloud Data Fusion インスタンスを開きます。[ガイドを見る] をクリックするかどうかの選択または「利用しない」[ようこそ]ポップアップが表示されます
  2. 「ハンバーガー」セクションを展開します[パイプライン] ->スタジオ

6561b13f30e36c3a.png

  1. 左側のプラグイン パレットの [Transform] セクションで、[Wrangler] ノードをダブルクリックします。このノードが Data Pipelines UI に表示されます。

aa44a4db5fe6623a.png

  1. Wrangler ノードにカーソルを合わせ、[Properties] をクリックします。[Wrangle] ボタンをクリックし、.csv ソースファイル(例: pattals.csv)を選択します。目的のスキーマを構築するには、すべてのデータ フィールドが必要です。
  2. 各列名(body など)の横にある下矢印(列の変換)をクリックします。802edca8a97da18.png
  3. デフォルトでは、最初のインポートでは、データファイル内に列が 1 つしかないと想定されます。CSV として解析するには、[Parse] → [CSV] の順に選択し、区切り文字を選択して [Set first row as header] チェックボックスをオンにします。選択します。[適用] ボタンをクリックします。
  4. [Body] フィールドの横の下矢印をクリックし、[Delete Column] を選択して [Body] フィールドを削除します。さらに、列の削除、一部の列のデータ型の変更(デフォルトは「string」型)、列の分割、列名の設定など、他の変換を試すこともできます。

e6d2cda51ff298e7.png

  1. 「列」ご覧くださいタブには、出力スキーマと Wrangler のレシピが表示されています。右上にある [適用] をクリックします。[検証] ボタンをクリックします。緑色の [No errors found]成功を示します。

1add853c43f2abee.png

  1. [Wrangler Properties] で [Actions] プルダウンをクリックし、必要に応じて今後 [Import] を選択できるように、目的のスキーマをローカル ストレージにエクスポートします。
  2. 今後使用するために Wrangler レシピを保存します。
parse-as-csv :body ',' true
drop body
  1. [X] ボタンをクリックして [Wrangler Properties] ウィンドウを閉じます。

5. パイプライン用のノードをビルドする

このセクションでは、パイプライン コンポーネントを構築します。

  1. Data Pipelines UI の左上で、パイプライン タイプとして [Data Pipeline - Batch] が選択されていることを確認します。

af67c42ce3d98529.png

  1. 左側のパネルには、[フィルタ]、[ソース]、[変換]、[分析]、[シンク]、[条件とアクション]、[エラーハンドラ]、[アラート] などのさまざまなセクションがあり、パイプラインのノードを選択できます。

c4438f7682f8b19b.png

ソースノード

  1. ソースノードを選択します。
  2. 左側のプラグイン パレットの [Source] セクションで、Data Pipelines UI に表示されている [Google Cloud Storage] ノードをダブルクリックします。
  3. GCS ソースノードにカーソルを合わせて、[Properties] をクリックします。

87e51a3e8dae8b3f.png

  1. 必須項目に入力します。次のフィールドを設定します。
  • ラベル = {任意のテキスト}
  • 参照名 = {任意のテキスト}
  • プロジェクト ID = 自動検出
  • パス = 現在のプロジェクト内のバケットへの GCS URL。例: gs://$BUCKET_NAME/csv/
  • Format = テキスト
  • パス フィールド = ファイル名
  • Path Filename Only = true
  • Read Files Recursively = true
  1. フィールド「filename」を追加GCS Output Schema に追加する場合は、[+] ボタンをクリックします。
  2. 詳しい説明については、[Documentation] をクリックしてください。[検証] ボタンをクリックします。緑色の [No errors found]成功を示します。
  3. GCS プロパティを閉じるには、[X] ボタンをクリックします。

node を変換する

  1. 変換ノードを選択します。
  2. 左側のプラグイン パレットの [Transform] セクションで、Data Pipelines UI に表示される [Wrangler] ノードをダブルクリックします。GCS ソースノードを Wrangler 変換ノードに接続します。
  3. Wrangler ノードにカーソルを合わせ、[Properties] をクリックします。
  4. [Actions] プルダウンをクリックして [Import] を選択して、保存したスキーマ(例: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json)をインポートし、前のセクションの保存したレシピを貼り付けます。
  5. または、変換用のスキーマを構築するセクションの Wrangler ノードを再利用します。
  6. 必須項目に入力します。次のフィールドを設定します。
  • ラベル = {任意のテキスト}
  • 入力フィールド名 = {*}
  • 前提条件 = {filename != "patients.csv"} は、各入力ファイル(例: patient.csv、providers.csv、allergies.csv)とソースノードを区別します。

2426f8f0a6c4c670.png

  1. レコードをさらに変換するユーザー提供の JavaScript を実行する JavaScript ノードを追加します。この Codelab では、JavaScript ノードを使用してレコードが更新されるたびにタイムスタンプを取得します。Wrangler 変換ノードを JavaScript 変換ノードに接続します。JavaScript の [プロパティ] を開き、次の関数を追加します。

75212f9ad98265a8.png

function transform(input, emitter, context) {
  input.TIMESTAMP = (new Date()).getTime()*1000;
  emitter.emit(input);
}
  1. + 記号をクリックして、TIMESTAMP という名前のフィールドを出力スキーマに追加します(存在しない場合)。データ型としてタイムスタンプを選択します。

4227389b57661135.png

  1. 詳しい説明については、[Documentation] をクリックしてください。[検証] ボタンをクリックして、すべての入力情報を検証します。緑色: エラーは見つかりませんでした成功を示します。
  2. [Transform Properties] ウィンドウを閉じるには、[X] ボタンをクリックします。

データのマスキングと匿名化

  1. 列の下矢印をクリックし、要件に応じて [マスクデータの選択] でマスキング ルールを適用することで、個々のデータ列を選択できます(例: SSN 列)。

bb1eb067dd6e0946.png

  1. Wrangler ノードの [Recipe] ウィンドウでディレクティブを追加できます。たとえば、匿名化を目的として、次の構文に従うハッシュ アルゴリズムでハッシュ ディレクティブを使用します。
hash <column> <algorithm> <encode>

<column>: name of the column
<algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.)
<encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

cbcc9a0932f53197.png

シンクノード

  1. シンクノードを選択します。
  2. 左側のプラグイン パレットの [Sink] セクションで BigQuery ノードをダブルクリックすると、データ パイプライン UI に表示されます。
  3. BigQuery シンクノードにカーソルを合わせて [プロパティ] をクリックします。

1be711152c92c692.png

  1. 必須項目に入力します。次のフィールドを設定します。
  • ラベル = {任意のテキスト}
  • 参照名 = {任意のテキスト}
  • プロジェクト ID = 自動検出
  • データセット = 現在のプロジェクトで使用されている BigQuery データセット(DATASET_ID)
  • テーブル = {テーブル名}
  1. 詳しい説明については、[Documentation] をクリックしてください。[検証] ボタンをクリックして、すべての入力情報を検証します。緑色: エラーは見つかりませんでした成功を示します。

c5585747da2ef341.png

  1. BigQuery のプロパティを閉じるには、[X] ボタンをクリックします。

6. バッチ データ パイプラインを構築する

パイプラインのすべてのノードを接続する

  1. 接続用の矢印をドラッグ >をドロップします。
  2. パイプラインには、同じ GCS ソースノードから入力ファイルを取得する複数のブランチを設定できます。

67510ab46bd44d36.png

  1. パイプラインに名前を付けます。

以上です。最初の Batch データ パイプラインを作成できました。パイプラインをデプロイして実行できます。

パイプラインのアラートをメールで送信する(省略可)

Pipeline Alert SendEmail 機能を利用するには、仮想マシン インスタンスからメールを送信するようにメールサーバーを設定する必要があります。詳しくは、以下のリファレンス リンクをご覧ください。

インスタンスからのメールの送信 |Compute Engine ドキュメント

この Codelab では、次の手順で Mailgun を介したメールリレー サービスを設定します。

  1. Mailgun でメールを送信する |Compute Engine のドキュメントを参照して、Mailgun でアカウントを設定し、メールリレー サービスを構成します。その他の変更は以下のとおりです。
  2. すべての受信者を追加'メールアドレスを Mailgun の承認済みリストに追加します。このリストは、左側のパネルの [Mailgun] > [send] > [Overview] オプションで確認できます。

7e6224cced3fa4e0.png fa78739f1ddf2dc2.png

受信者が [同意する]をクリックしたらsupport@mailgun.net から送信されたメールで、これらのユーザーのメールアドレスが承認済みリストに保存され、パイプライン アラートのメールを受信します。

72847c97fd5fce0f.png

  1. 「始める前に」のステップ 3セクション - 次のようにファイアウォール ルールを作成します。

75b063c165091912.png

  1. 「Postfix でメールリレーとして Mailgun を構成する」のステップ 3手順に沿って [Local Only] ではなく、[Internet Site] または [Internet with smarthost] を選択します。

8fd8474a4ef18f16.png

  1. 「Postfix でメールリレーとして Mailgun を構成する」のステップ 4vi /etc/postfix/main.cf を編集して、mynetworks の最後に 10.128.0.0/9 を追加します。

249fbf3edeff1ce8.png

  1. vi /etc/postfix/master.cf を編集して、デフォルトの smtp(25)をポート 587 に変更します。

86c82cf48c687e72.png

  1. Data Fusion Studio の右上にある [構成] をクリックします。[Pipeline alert] をクリックし、[+] ボタンをクリックして [Alerts] ウィンドウを開きます。[SendEmail] を選択します。

dc079a91f1b0da68.png

  1. [メール] 設定フォームに入力します。各アラートの種類の [Run Condition] プルダウンから、[Complete]、[success]、または [failure] を選択します。[Include Workflow Token] = false の場合は、[Message] フィールドの情報のみが送信されます。[Include Workflow Token] = true の場合、[Message] フィールドの情報とワークフロー トークンの詳細情報が送信されます。[プロトコル] には小文字を使用する必要があります。架空」を使用する[Sender] の会社以外のメールアドレス。

1fa619b6ce28f5e5.png

7. パイプラインの構成、デプロイ、実行/スケジュール

db612e62a1c7ab7e.png

  1. Data Fusion Studio の右上にある [構成] をクリックします。[エンジン構成] で [Spark] を選択します。[構成] ウィンドウで [保存] をクリックします。

8ecf7c243c125882.png

  1. [プレビュー] をクリックしてデータをプレビューし、もう一度 [**プレビュー**] をクリックして前のウィンドウに戻ります。プレビュー モードでパイプラインを [**実行**] することもできます。

b3c891e5e1aa20ae.png

  1. [ログ] をクリックしてログを表示します。
  2. [保存] をクリックしてすべての変更を保存します。
  3. 新しいパイプラインの構築時に保存したパイプライン構成をインポートするには、[インポート] をクリックします。
  4. [エクスポート] をクリックして、パイプライン構成をエクスポートします。
  5. [デプロイ] をクリックしてパイプラインをデプロイします。
  6. デプロイしたら [Run] をクリックし、パイプラインの実行が完了するのを待ちます。

bb06001d46a293db.png

  1. [アクション] ボタンの下にある [複製] を選択すると、パイプラインを複製できます。
  2. パイプライン構成をエクスポートするには、[Actions] ボタンで [Export] を選択します。
  3. 必要に応じてパイプライン トリガーを設定するには、Studio ウィンドウの左端または右端で [インバウンド トリガー] または [アウトバウンド トリガー] をクリックします。
  4. [Schedule] をクリックして、パイプラインの実行とデータの読み込みをスケジュールします。

4167fa67550a49d5.png

  1. [サマリー] には、実行履歴、レコード、エラーログ、警告のグラフが表示されます。

8. 確認事項

  1. 検証パイプラインが正常に実行されました。

7dee6e662c323f14.png

  1. BigQuery データセットにすべてのテーブルがあるかどうかを確認します。
bq ls $PROJECT_ID:$DATASET_ID
     tableId       Type    Labels   Time Partitioning
----------------- ------- -------- -------------------
 Allergies         TABLE
 Careplans         TABLE
 Conditions        TABLE
 Encounters        TABLE
 Imaging_Studies   TABLE
 Immunizations     TABLE
 Medications       TABLE
 Observations      TABLE
 Organizations     TABLE
 Patients          TABLE
 Procedures        TABLE
 Providers         TABLE
  1. アラートメールを受信する(構成されている場合)。

結果の表示

パイプラインの実行後、次の手順で結果を確認します。

  1. BigQuery UI でテーブルにクエリを実行します。BigQuery UI に移動
  2. 以下のクエリを、実際のプロジェクト名、データセット、テーブルに変更します。

e32bfd5d965a117f.png

9. クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud Platform アカウントに課金されないようにする手順は次のとおりです。

チュートリアルが終了したら、GCP で作成したリソースをクリーンアップして、今後料金が発生しないようにします。次のセクションで、このようなリソースを削除または無効にする方法を説明します。

BigQuery データセットの削除

手順に沿って、このチュートリアルで作成した BigQuery データセットを削除します。

GCS バケットの削除

手順に沿って、このチュートリアルで作成した GCS バケットを削除します。

Cloud Data Fusion インスタンスの削除

Cloud Data Fusion インスタンスの削除の手順に沿って操作します。

プロジェクトの削除

課金を停止する最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには、次の操作を行います。

  1. GCP Console でプロジェクト ページに移動します。プロジェクト ページに移動
  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

10.完了

これで、Cloud Data Fusion を使用して BigQuery に医療データを取り込む Codelab が完了しました。

CSV データを Google Cloud Storage から BigQuery にインポートしました。

医療データを一括で読み込み、変換、マスキングするためのデータ統合パイプラインを視覚的に構築しました。

ここでは、Google Cloud Platform で BigQuery を使用して医療データ分析の取り組みを開始するために必要な主な手順を学習しました。