Cloud Data Fusion を使用して CSV データを BigQuery に取り込む - バッチ取り込み

1. はじめに

12fb66cc134b50ef.png

最終更新日: 2020 年 2 月 28 日

この Codelab では、CSV 形式の医療データを BigQuery に一括で取り込むデータ取り込みパターンを示します。このラボでは、Cloud Data Fusion バッチデータ パイプラインを使用します。現実的な医療テストデータが生成され、Google Cloud Storage バケット(gs://hcls_testing_data_fhir_10_patients/csv/)で利用できるようになっています。

この Codelab では、次のことを学びます。

  • Cloud Data Fusion を使用して、GCS から BigQuery に CSV データ(バッチ スケジュールされた読み込み)を取り込む方法。
  • Cloud Data Fusion でデータ統合パイプラインを視覚的に構築して、ヘルスケア データの読み込み、変換、マスキングを一括で行う方法。

この Codelab を実行するには何が必要ですか?

  • GCP プロジェクトへのアクセス権が必要です。
  • GCP プロジェクトのオーナーロールが割り当てられている必要があります。
  • ヘッダーを含む CSV 形式の医療データ。

GCP プロジェクトがない場合は、こちらの手順に沿って新しい GCP プロジェクトを作成します。

CSV 形式の医療データは、gs://hcls_testing_data_fhir_10_patients/csv/ の GCS バケットにプリロードされています。各リソース CSV ファイルには、固有のスキーマ構造があります。たとえば、Patients.csv のスキーマは Providers.csv のスキーマとは異なります。事前に読み込まれたスキーマ ファイルは、gs://hcls_testing_data_fhir_10_patients/csv_schemas にあります。

新しいデータセットが必要な場合は、SyntheaTM を使用していつでも生成できます。次に、[入力データをコピー] ステップでバケットからコピーするのではなく、GCS にアップロードします。

2. GCP プロジェクトの設定

環境のシェル変数を初期化します。

PROJECT_ID を見つけるには、プロジェクトの識別をご覧ください。

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

gsutil ツールを使用して、入力データとエラーログを保存する GCS バケットを作成します。

gsutil mb -l us gs://$BUCKET_NAME

合成データセットにアクセスします。

  1. Cloud コンソールへのログインに使用しているメールアドレスから、hcls-solutions-external+subscribe@google.com に参加リクエストを送信します。
  2. 操作を確認する手順が記載されたメールが届きます。525a0fa752e0acae.png
  3. メールに返信してグループに参加するオプションを使用します。ボタンをクリックしないでください。
  4. 確認メールが届いたら、Codelab の次のステップに進みます。

入力データをコピーします。

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

BigQuery データセットを作成します。

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

3. Cloud Data Fusion 環境の設定

Cloud Data Fusion API を有効にして、必要な権限を付与する手順は次のとおりです。

API を有効にします

  1. GCP Console API ライブラリに移動します。
  2. プロジェクト リストからプロジェクトを選択します。
  3. API ライブラリで、有効にする API を選択します。API を検索する場合は検索フィールドやフィルタを使用します。
  4. [API] ページで、[有効にする] をクリックします。

Cloud Data Fusion インスタンスを作成する

  1. GCP Console で、ProjectID を選択します。
  2. 左側のメニューから [Data Fusion] を選択し、ページの中央にある [インスタンスを作成] ボタン(初回作成)をクリックするか、上部のメニューにある [インスタンスを作成] ボタン(追加作成)をクリックします。

a828690ff3bf3c46.png

8372c944c94737ea.png

  1. インスタンス名を指定します。[Enterprise] を選択します。

5af91e46917260ff.png

  1. [作成] ボタンをクリックします。

インスタンスの権限を設定します。

インスタンスを作成したら、次の手順で、インスタンスに関連付けられているサービス アカウントにプロジェクトの権限を付与します。

  1. インスタンス名をクリックして、インスタンスの詳細ページに移動します。

76ad691f795e1ab3.png

  1. サービス アカウントをコピーします。

6c91836afb72209d.png

  1. プロジェクトの IAM ページに移動します。
  2. IAM 権限ページで、サービス アカウントを新しいメンバーとして追加し、Cloud Data Fusion API サービス エージェントのロールを付与します。[追加] ボタンをクリックし、[新しいメンバー] フィールドに「サービス アカウント」を貼り付けて、[サービス管理] -> [Cloud Data Fusion API サーバー エージェント] ロールを選択します。
  3. ea68b28d917a24b1.png
  4. [保存] をクリックします。

これらの手順が完了したら、Cloud Data Fusion インスタンス ページかインスタンスの詳細ページにある [インスタンスを表示] リンクをクリックして、Cloud Data Fusion の使用を開始できます。

ファイアウォール ルールを設定します。

  1. GCP Console -> [VPC ネットワーク] -> [ファイアウォール ルール] に移動して、default-allow-ssh ルールが存在するかどうかを確認します。

102adef44bbe3a45.png

  1. そうでない場合は、すべての上り(内向き)SSH トラフィックをデフォルト ネットワークに許可するファイアウォール ルールを追加します。

コマンドラインを使用する場合:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

UI を使用する場合: [ファイアウォール ルールを作成] をクリックして、情報を入力します。

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. 変換用のスキーマを構築する

GCP に Cloud Fusion 環境ができたので、スキーマを構築しましょう。このスキーマは CSV データの変換に必要です。

  1. Cloud Data Fusion ウィンドウで、[操作] 列の [インスタンスを表示] リンクをクリックします。別のページにリダイレクトされます。表示された URL をクリックして、Cloud Data Fusion インスタンスを開きます。ようこそポップアップで [ツアーを開始] ボタンまたは [いいえ、結構です] ボタンをクリックしたかどうか。
  2. ハンバーガー メニューを開き、[パイプライン] -> [Studio] を選択します。

6561b13f30e36c3a.png

  1. 左側のプラグイン パレットの [変換] セクションで、Wrangler ノードをダブルクリックします。このノードは、データ パイプライン UI に表示されます。

aa44a4db5fe6623a.png

  1. Wrangler ノードにカーソルを合わせ、[プロパティ] をクリックします。[Wrangle] ボタンをクリックし、目的のスキーマを構築するためにすべてのデータ フィールドを含む .csv ソースファイル(patients.csv など)を選択します。
  2. 各列名(body など)の横にある下矢印(列の変換)をクリックします。802edca8a97da18.png
  3. デフォルトでは、最初のインポートではデータファイルに 1 つの列しかないと想定されます。CSV として解析するには、[解析] → [CSV] を選択し、区切り文字を選択して、必要に応じて [最初の行をヘッダーとして設定] チェックボックスをオンにします。[適用] ボタンをクリックします。
  4. [Body] フィールドの横にある下矢印をクリックし、[Delete Column] を選択して [Body] フィールドを削除します。また、列の削除、一部の列のデータ型の変更(デフォルトは「文字列」型)、列の分割、列名の設定など、他の変換を試すこともできます。

e6d2cda51ff298e7.png

  1. [Columns] タブと [Transformation steps] タブには、出力スキーマと Wrangler のレシピが表示されます。右上にある [適用] をクリックします。[検証] ボタンをクリックします。緑色の「エラーは見つかりませんでした」は成功を示します。

1add853c43f2abee.png

  1. [Wrangler Properties] で、[Actions] プルダウンをクリックして、必要に応じて後でインポートできるように、目的のスキーマをローカル ストレージにエクスポートします。
  2. Wrangler レシピを保存して、後で使用できるようにします。
parse-as-csv :body ',' true
drop body
  1. [Wrangler Properties] ウィンドウを閉じるには、[X] ボタンをクリックします。

5. パイプラインのビルドノード

このセクションでは、パイプライン コンポーネントを構築します。

  1. Data Pipelines UI の左上に、パイプライン タイプとして [Data Pipeline - Batch] が選択されていることが表示されます。

af67c42ce3d98529.png

  1. 左側のパネルには、[フィルタ]、[ソース]、[変換]、[分析]、[シンク]、[条件とアクション]、[エラーハンドラとアラート] などのさまざまなセクションがあり、パイプラインのノードを選択できます。

c4438f7682f8b19b.png

ソースノード

  1. ソースノードを選択します。
  2. 左側のプラグイン パレットの [Source] セクションで、Data Pipelines UI に表示される [Google Cloud Storage] ノードをダブルクリックします。
  3. GCS ソースノードにカーソルを合わせ、[Properties] をクリックします。

87e51a3e8dae8b3f.png

  1. 必須項目を入力します。次のフィールドを設定します。
  • Label = {任意のテキスト}
  • 参照名 = {任意のテキスト}
  • プロジェクト ID = 自動検出
  • パス = 現在のプロジェクトのバケットの GCS URL。例: gs://$BUCKET_NAME/csv/
  • Format = text
  • Path Field = filename
  • Path Filename Only = true
  • Read Files Recursively = true
  1. [+] ボタンをクリックして、GCS 出力スキーマに「filename」フィールドを追加します。
  2. 詳細な説明については、[ドキュメント] をクリックしてください。[検証] ボタンをクリックします。緑色の「エラーは見つかりませんでした」は成功を示します。
  3. [GCS Properties] を閉じるには、[X] ボタンをクリックします。

ノードを変換する

  1. 変換ノードを選択します。
  2. 左側のプラグイン パレットの [変換] セクションで、Data Pipelines UI に表示される [Wrangler] ノードをダブルクリックします。GCS ソースノードを Wrangler 変換ノードに接続します。
  3. Wrangler ノードにカーソルを合わせ、[プロパティ] をクリックします。
  4. [アクション] プルダウンをクリックして [インポート] を選択し、保存したスキーマ(例: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json)をインポートします。前のセクションで保存したレシピを貼り付けます。
  5. または、セクション 変換のスキーマを構築するWrangler ノードを再利用します。
  6. 必須項目を入力します。次のフィールドを設定します。
  • Label = {任意のテキスト}
  • 入力フィールド名 = {*}
  • 前提条件 = {filename != "patients.csv"}。各入力ファイル(例: patients.csv、providers.csv、allergies.csv など)を Source ノードと区別します。

2426f8f0a6c4c670.png

  1. JavaScript ノードを追加して、ユーザー提供の JavaScript を実行し、レコードをさらに変換します。この Codelab では、JavaScript ノードを使用して、各レコード更新のタイムスタンプを取得します。Wrangler 変換ノードを JavaScript 変換ノードに接続します。JavaScript の [プロパティ] を開き、次の関数を追加します。

75212f9ad98265a8.png

function transform(input, emitter, context) {
  input.TIMESTAMP = (new Date()).getTime()*1000;
  emitter.emit(input);
}
  1. [+] 記号をクリックして、TIMESTAMP という名前のフィールドを出力スキーマに追加します(存在しない場合)。データ型としてタイムスタンプを選択します。

4227389b57661135.png

  1. 詳細については、ドキュメントをクリックしてください。[検証] ボタンをクリックして、入力したすべての情報を検証します。緑色の「エラーは見つかりませんでした」は成功を示します。
  2. [変換のプロパティ] ウィンドウを閉じるには、[X] ボタンをクリックします。

データのマスキングと匿名化

  1. 列のプルダウンをクリックし、[データをマスク] で要件に応じてマスキング ルールを適用することで、個々のデータ列を選択できます(例: SSN 列)。

bb1eb067dd6e0946.png

  1. Wrangler ノードの [レシピ] ウィンドウで、ディレクティブを追加できます。たとえば、次の構文に従ってハッシュ ディレクティブとハッシュ アルゴリズムを使用して、匿名化を行います。
hash <column> <algorithm> <encode>

<column>: name of the column
<algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.)
<encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

cbcc9a0932f53197.png

シンクノード

  1. シンクノードを選択します。
  2. 左側のプラグイン パレットの [Sink] セクションで、BigQuery ノードをダブルクリックします。このノードがデータ パイプライン UI に表示されます。
  3. BigQuery シンクノードにカーソルを合わせ、[プロパティ] をクリックします。

1be711152c92c692.png

  1. 必須項目に入力します。次のフィールドを設定します。
  • Label = {任意のテキスト}
  • 参照名 = {任意のテキスト}
  • プロジェクト ID = 自動検出
  • データセット = 現在のプロジェクトで使用されている BigQuery データセット(DATASET_ID など)
  • Table = {テーブル名}
  1. 詳細については、ドキュメントをクリックしてください。[検証] ボタンをクリックして、入力したすべての情報を検証します。緑色の「エラーは見つかりませんでした」は成功を示します。

c5585747da2ef341.png

  1. [BigQuery Properties] を閉じるには、[X] ボタンをクリックします。

6. バッチデータ パイプラインを構築する

パイプライン内のすべてのノードを接続する

  1. ソースノードの右端から接続矢印 > をドラッグして宛先ノードの左端でドロップします。
  2. パイプラインには、同じ GCS ソースノードから入力ファイルを取得する複数のブランチを設定できます。

67510ab46bd44d36.png

  1. パイプラインに名前を付けます。

以上です。初めてのバッチデータ パイプラインを作成し、パイプラインのデプロイと実行ができる状態になりました。

メールでパイプライン アラートを送信する(省略可)

Pipeline Alert の SendEmail 機能を利用するには、仮想マシン インスタンスからメールを送信するようにメールサーバーを設定する必要があります。詳細については、以下のリファレンス リンクをご覧ください。

インスタンスからのメールの送信 | Compute Engine ドキュメント

この Codelab では、次の手順で Mailgun を介してメールリレー サービスを設定します。

  1. Mailgun を使用したメールの送信 | Compute Engine ドキュメントの手順に沿って、Mailgun でアカウントを設定し、メールリレー サービスを構成します。その他の変更は以下のとおりです。
  2. すべての受信者のメールアドレスを Mailgun の承認済みリストに追加します。このリストは、左側のパネルの [Mailgun] > [Sending] > [Overview] オプションで確認できます。

7e6224cced3fa4e0.png fa78739f1ddf2dc2.png

受信者が support@mailgun.net から送信されたメールで [同意する] をクリックすると、そのメールアドレスが承認済みリストに保存され、パイプライン アラート メールを受信できるようになります。

72847c97fd5fce0f.png

  1. 「始める前に」のセクションのステップ 3 - 次のようにファイアウォール ルールを作成します。

75b063c165091912.png

  1. 「Postfix でメールリレーとして Mailgun を構成する」の手順 3。手順に記載されている [Local Only] ではなく、[Internet Site] または [Internet with smarthost] を選択します。

8fd8474a4ef18f16.png

  1. 「Postfix でメールリレーとして Mailgun を構成する」の手順 4。vi /etc/postfix/main.cf を編集して、mynetworks の末尾に 10.128.0.0/9 を追加します。

249fbf3edeff1ce8.png

  1. vi /etc/postfix/master.cf を編集して、デフォルトの smtp(25)をポート 587 に変更します。

86c82cf48c687e72.png

  1. Data Fusion Studio の右上にある [構成] をクリックします。[パイプライン アラート] をクリックし、[+] ボタンをクリックして [アラート] ウィンドウを開きます。[SendEmail] を選択します。

dc079a91f1b0da68.png

  1. [Email] 構成フォームに入力します。アラートのタイプごとに、[実行条件] プルダウンから [完了、成功] または [失敗] を選択します。[ワークフロー トークンを含める] = false の場合、[メッセージ] フィールドの情報のみが送信されます。[Include Workflow Token] = true の場合、[Message] フィールドの情報とワークフロー トークンの詳細情報が送信されます。[プロトコル] には小文字を使用する必要があります。[送信者] に、会社のメールアドレス以外の「偽の」メールアドレスを使用する。

1fa619b6ce28f5e5.png

7. パイプラインの構成、デプロイ、実行/スケジューリング

db612e62a1c7ab7e.png

  1. Data Fusion Studio の右上にある [構成] をクリックします。[Engine Config] で [Spark] を選択します。[Configure] ウィンドウで [Save] をクリックします。

8ecf7c243c125882.png

  1. [プレビュー] をクリックしてデータをプレビューし、もう一度 [プレビュー] をクリックして前のウィンドウに戻ります。プレビュー モードでパイプラインを **実行** することもできます。

b3c891e5e1aa20ae.png

  1. [ログ] をクリックしてログを表示します。
  2. [保存] をクリックして、すべての変更を保存します。
  3. 新しいパイプラインを構築するときに保存したパイプライン構成をインポートするには、[インポート] をクリックします。
  4. [エクスポート] をクリックして、パイプライン構成をエクスポートします。
  5. [デプロイ] をクリックして、パイプラインをデプロイします。
  6. デプロイしたら [Run] をクリックし、パイプラインの実行が完了するのを待ちます。

bb06001d46a293db.png

  1. [アクション] ボタンの [複製] を選択して、パイプラインを複製できます。
  2. [アクション] ボタンの [エクスポート] を選択して、パイプライン構成をエクスポートできます。
  3. 必要に応じてパイプライン トリガーを設定するには、Studio ウィンドウの左端または右端にある [インバウンド トリガー] または [アウトバウンド トリガー] をクリックします。
  4. [スケジュール] をクリックして、パイプラインを定期的に実行してデータを読み込むようにスケジュールします。

4167fa67550a49d5.png

  1. [概要] には、実行履歴、レコード、エラーログ、警告のグラフが表示されます。

8. 検証

  1. Validate パイプラインが正常に実行されました。

7dee6e662c323f14.png

  1. BigQuery データセットにすべてのテーブルがあるかどうかを検証します。
bq ls $PROJECT_ID:$DATASET_ID
     tableId       Type    Labels   Time Partitioning
----------------- ------- -------- -------------------
 Allergies         TABLE
 Careplans         TABLE
 Conditions        TABLE
 Encounters        TABLE
 Imaging_Studies   TABLE
 Immunizations     TABLE
 Medications       TABLE
 Observations      TABLE
 Organizations     TABLE
 Patients          TABLE
 Procedures        TABLE
 Providers         TABLE
  1. アラート メールを受信します(設定されている場合)。

結果の表示

パイプラインの実行後、次の手順で結果を確認します。

  1. BigQuery UI でテーブルに対してクエリを実行します。BigQuery UI に移動
  2. 次のクエリを、実際のプロジェクト名、データセット、テーブルに更新します。

e32bfd5d965a117f.png

9. クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud Platform アカウントに課金されないようにする手順は次のとおりです。

チュートリアルが終了したら、GCP で作成したリソースをクリーンアップして、割り当てを使い果たしたり、今後料金が発生しないようにします。次のセクションで、このようなリソースを削除または無効にする方法を説明します。

BigQuery データセットの削除

このチュートリアルで作成した BigQuery データセットを削除するには、次の手順を行います。

GCS バケットの削除

このチュートリアルで作成した GCS バケットを削除する手順は次のとおりです。

Cloud Data Fusion インスタンスの削除

手順に沿って Cloud Data Fusion インスタンスを削除します

プロジェクトの削除

課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには、次の操作を行います。

  1. GCP Console で [プロジェクト] ページに移動します。プロジェクト ページに移動
  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

10. 完了

お疲れさまでした。Cloud Data Fusion を使用して BigQuery に医療データを取り込むコードラボを完了しました。

Google Cloud Storage から BigQuery に CSV データをインポートした。

ヘルスケア データの読み込み、変換、マスキングを一括で行うためのデータ統合パイプラインを視覚的に構築しました。

これで、Google Cloud Platform の BigQuery でヘルスケア データ分析を開始するために必要な主な手順を理解できました。