1. はじめに

最終更新日: 2020 年 2 月 28 日
この Codelab では、CSV 形式の医療データを BigQuery にリアルタイムで取り込むデータ取り込みパターンを示します。このラボでは、Cloud Data Fusion リアルタイム データ パイプラインを使用します。現実的な医療テストデータが生成され、Google Cloud Storage バケット(gs://hcls_testing_data_fhir_10_patients/csv/)で利用できるようになっています。
この Codelab では、次のことを学びます。
- Cloud Data Fusion を使用して Pub/Sub から BigQuery に CSV データ(リアルタイム読み込み)を取り込む方法。
- Cloud Data Fusion でデータ統合パイプラインを視覚的に構築して、医療データをリアルタイムで読み込み、変換、マスクする方法。
このデモを実行するには何が必要ですか?
- GCP プロジェクトへのアクセス権が必要です。
- GCP プロジェクトにオーナーロールが割り当てられている必要があります。
- ヘッダーを含む CSV 形式の医療データ。
GCP プロジェクトがない場合は、こちらの手順に沿って新しい GCP プロジェクトを作成します。
CSV 形式の医療データは、gs://hcls_testing_data_fhir_10_patients/csv/ の GCS バケットにプリロードされています。各 CSV リソース ファイルには固有のスキーマ構造があります。たとえば、Patients.csv のスキーマは Providers.csv のスキーマとは異なります。事前に読み込まれたスキーマ ファイルは、gs://hcls_testing_data_fhir_10_patients/csv_schemas にあります。
新しいデータセットが必要な場合は、SyntheaTM を使用していつでも生成できます。次に、[入力データをコピー] ステップでバケットからコピーするのではなく、GCS にアップロードします。
2. GCP プロジェクトの設定
環境のシェル変数を初期化します。
PROJECT_ID を見つけるには、プロジェクトの識別をご覧ください。
<!-- CODELAB: Initialize shell variables -> <!-- Your current GCP Project ID -> export PROJECT_ID=<PROJECT_ID> <!-- A new GCS Bucket in your current Project - INPUT -> export BUCKET_NAME=<BUCKET_NAME> <!-- A new BQ Dataset ID - OUTPUT -> export DATASET_ID=<DATASET_ID>
gsutil ツールを使用して、入力データとエラーログを保存する GCS バケットを作成します。
gsutil mb -l us gs://$BUCKET_NAME
合成データセットにアクセスします。
- Cloud コンソールへのログインに使用しているメールアドレスから、hcls-solutions-external+subscribe@google.com に参加リクエストを送信します。
- 操作を確認する手順が記載されたメールが届きます。
- メールに返信してグループに参加するオプションを使用します。[
] ボタンはクリックしないでください。 - 確認メールが届いたら、Codelab の次のステップに進みます。
入力データをコピーします。
gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME
BigQuery データセットを作成します。
bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID
Google Cloud SDK をインストールして初期化し、Pub/Sub のトピックとサブスクリプションを作成します。
gcloud init gcloud pubsub topics create your-topic gcloud pubsub subscriptions create --topic your-topic your-sub
3. Cloud Data Fusion 環境の設定
Cloud Data Fusion API を有効にして、必要な権限を付与する手順は次のとおりです。
API を有効にする
- GCP Console API ライブラリに移動します。
- プロジェクト リストからプロジェクトを選択します。
- API ライブラリで、有効にする API(Cloud Data Fusion API、Cloud Pub/Sub API)を選択します。API を検索する場合は検索フィールドやフィルタを使用します。
- [API] ページで、[有効にする] をクリックします。
Cloud Data Fusion インスタンスを作成する。
- GCP Console で、ProjectID を選択します。
- 左側のメニューから [Data Fusion] を選択し、ページの中央にある [インスタンスを作成] ボタン(初回作成)をクリックするか、上部のメニューにある [インスタンスを作成] ボタン(追加作成)をクリックします。


- インスタンス名を指定します。[Enterprise] を選択します。

- [作成] ボタンをクリックします。
インスタンスの権限を設定します。
インスタンスを作成したら、次の手順で、インスタンスに関連付けられているサービス アカウントにプロジェクトの権限を付与します。
- インスタンス名をクリックして、インスタンスの詳細ページに移動します。

- サービス アカウントをコピーします。

- プロジェクトの IAM ページに移動します。
- IAM 権限ページで、[追加] ボタンをクリックして、サービス アカウントに Cloud Data Fusion API サービス エージェントのロールを付与します。[新しいメンバー] フィールドに「サービス アカウント」を貼り付け、[サービス管理] -> [Cloud Data Fusion API サーバー エージェント] ロールを選択します。

- [+ 別のロールを追加](または [Cloud Data Fusion API サービス エージェントを編集])をクリックして、Pub/Sub サブスクライバー ロールを追加します。

- [保存] をクリックします。
これらの手順が完了したら、Cloud Data Fusion インスタンス ページかインスタンスの詳細ページにある [インスタンスを表示] リンクをクリックして、Cloud Data Fusion の使用を開始できます。
ファイアウォール ルールを設定します。
- GCP Console -> [VPC ネットワーク] -> [ファイアウォール ルール] に移動して、default-allow-ssh ルールが存在するかどうかを確認します。

- そうでない場合は、すべての上り(内向き)SSH トラフィックをデフォルト ネットワークに許可するファイアウォール ルールを追加します。
コマンドラインを使用する場合:
gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging
UI を使用する場合: [ファイアウォール ルールを作成] をクリックして、情報を入力します。


4. パイプラインのビルドノード
GCP に Cloud Data Fusion 環境が用意できたので、次の手順に沿って Cloud Data Fusion でデータ パイプラインの構築を開始しましょう。
- Cloud Data Fusion ウィンドウで、[操作] 列の [インスタンスを表示] リンクをクリックします。別のページにリダイレクトされます。表示された URL をクリックして、Cloud Data Fusion インスタンスを開きます。ようこそポップアップで [ツアーを開始] ボタンまたは [いいえ、結構です] ボタンをクリックしたかどうか。
- ハンバーガー メニューを開き、[パイプライン] -> [リスト] を選択します。

- 右上にある緑色の + ボタンをクリックし、[Create Pipeline] を選択します。または、[Create] をクリックしてパイプライン リンクを作成します。


- パイプライン スタジオが表示されたら、左上のプルダウンから [Data Pipeline - Realtime] を選択します。

- Data Pipelines UI の左側のパネルには、[Filter]、[Source]、[Transform]、[Analytics]、[Sink]、[Error Handlers]、[Alerts] などのさまざまなセクションが表示されます。ここで、パイプラインのノードを選択できます。

ソース ノードを選択します。
- 左側のプラグイン パレットの [Source] セクションで、Data Pipelines UI に表示される Google Cloud PubSub ノードをダブルクリックします。
- PubSub ソースノードにカーソルを合わせ、[Properties] をクリックします。

- 必須項目を入力します。次のフィールドを設定します。
- Label = {任意のテキスト}
- 参照名 = {任意のテキスト}
- プロジェクト ID = 自動検出
- Subscription = Pub/Sub トピックの作成セクションで作成したサブスクリプション(例: your-sub)
- Topic = Pub/Sub トピックの作成セクションで作成したトピック(例: your-topic)
- 詳細については、ドキュメントをクリックしてください。[検証] ボタンをクリックして、入力したすべての情報を検証します。緑色の「エラーは見つかりませんでした」は成功を示します。

- [Pub/Sub Properties] を閉じるには、[X] ボタンをクリックします。
変換 ノードを選択します。
- 左側のプラグイン パレットの [変換] セクションで、Data Pipelines UI に表示される [Projection] ノードをダブルクリックします。Pub/Sub ソースノードを Projection 変換ノードに接続します。
- Projection ノードにカーソルを合わせ、[Properties] をクリックします。

- 必須項目を入力します。次のフィールドを設定します。
- Convert = message をバイト型から文字列型に変換します。
- 削除するフィールド = {任意のフィールド}
- 保持するフィールド = {message、timestamp、attributes}(例: Pub/Sub から送信された属性: key=‘filename':value=‘patients')
- 名前を変更するフィールド = {message, timestamp}
- 詳細については、ドキュメントをクリックしてください。[検証] ボタンをクリックして、入力したすべての情報を検証します。緑色の「エラーは見つかりませんでした」は成功を示します。

- 左側のプラグイン パレットの [変換] セクションで、Data Pipelines UI に表示される [Wrangler] ノードをダブルクリックします。Projection 変換ノードを Wrangler 変換ノードに接続します。Wrangler ノードにカーソルを合わせ、[プロパティ] をクリックします。

- [アクション] プルダウンをクリックして [インポート] を選択し、保存したスキーマ(例: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json)をインポートします。
- 最後のフィールドの横にある + ボタンをクリックして、出力スキーマに TIMESTAMP フィールドを追加し(存在しない場合)、[Null] チェックボックスをオンにします。
- 必須項目を入力します。次のフィールドを設定します。
- Label = {任意のテキスト}
- 入力フィールド名 = {*}
- 事前条件 = {attributes.get("filename") != "patients"}。PubSub ソースノードから送信された各タイプのレコードまたはメッセージ(患者、プロバイダ、アレルギーなど)を区別します。
- 詳細については、ドキュメントをクリックしてください。[検証] ボタンをクリックして、入力したすべての情報を検証します。緑色の「エラーは見つかりませんでした」は成功を示します。

- 列名を任意の順序で設定し、不要なフィールドを削除します。次のコードスニペットをコピーして、レシピ ボックスに貼り付けます。
drop attributes parse-as-csv :body ',' false drop body set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP mask-number SSN xxxxxxx####

- データ マスキングと匿名化については、Batch-Codelab - CSV to BigQuery via CDF をご覧ください。または、レシピ ボックスに mask-number SSN xxxxxxx#### というコード スニペットを追加します。
- [変換のプロパティ] ウィンドウを閉じるには、[X] ボタンをクリックします。
Sink ノードを選択します。
- 左側のプラグイン パレットの [Sink] セクションで、[BigQuery] ノードをダブルクリックします。このノードは、データ パイプライン UI に表示されます。Wrangler 変換ノードを BigQuery シンクノードに接続します。
- BigQuery シンクノードにカーソルを合わせ、[プロパティ] をクリックします。

- 必須フィールドに入力します。
- Label = {任意のテキスト}
- 参照名 = {任意のテキスト}
- プロジェクト ID = 自動検出
- データセット = 現在のプロジェクトで使用されている BigQuery データセット(DATASET_ID など)
- Table = {テーブル名}
- 詳細については、ドキュメントをクリックしてください。[検証] ボタンをクリックして、入力したすべての情報を検証します。緑色の「エラーは見つかりませんでした」は成功を示します。

- [BigQuery Properties] を閉じるには、[X] ボタンをクリックします。
5. リアルタイム データ パイプラインを構築する
前のセクションでは、Cloud Data Fusion でデータ パイプラインを構築するために必要なノードを作成しました。このセクションでは、ノードを接続して実際のパイプラインを構築します。
パイプライン内のすべてのノードを接続する
- ソースノードの右端から接続矢印 > をドラッグして宛先ノードの左端でドロップします。
- パイプラインには、同じ PubSub ソースノードからパブリッシュされたメッセージを取得する複数のブランチを設定できます。

- パイプラインに名前を付けます。
以上です。これで、デプロイして実行する最初のリアルタイム データ パイプラインが作成されました。
Cloud Pub/Sub を介してメッセージを送信する
Pub/Sub UI を使用する場合:
- GCP コンソール -> Pub/Sub -> トピックに移動し、your-topic を選択して、上部のメニューで [メッセージをパブリッシュ] をクリックします。

- Message フィールドには一度に 1 つのレコード行のみを配置します。[+ 属性を追加] ボタンをクリックします。Key = filename、Value = <type of record>(例: patients、providers、allergies など)を指定します。
- [公開] ボタンをクリックしてメッセージを送信します。
gcloud コマンドを使用します。
- メッセージを手動で指定します。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "paste one record row here"
- cat と sed の Unix コマンドを使用して、メッセージを半自動的に提供します。このコマンドは、異なるパラメータを指定して繰り返し実行できます。
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"
6. パイプラインの構成、デプロイ、実行
データ パイプラインを開発したので、Cloud Data Fusion にデプロイして実行できます。

- [構成] のデフォルトをそのまま使用します。
- [プレビュー] をクリックして、データをプレビューします。[**プレビュー**] をもう一度クリックすると、前のウィンドウに戻ります。[**実行**] をクリックして、パイプラインをプレビュー モードで実行することもできます。

- [ログ] をクリックしてログを表示します。
- [保存] をクリックして、すべての変更を保存します。
- 新しいパイプラインを構築するときに、保存したパイプライン構成をインポートするには、[インポート] をクリックします。
- [エクスポート] をクリックして、パイプライン構成をエクスポートします。
- [デプロイ] をクリックして、パイプラインをデプロイします。
- デプロイしたら [Run] をクリックし、パイプラインの実行が完了するのを待ちます。

- [停止] をクリックすると、いつでもパイプラインの実行を停止できます。
- パイプラインを複製するには、[アクション] ボタンで [複製] を選択します。
- パイプライン構成をエクスポートするには、[アクション] ボタンで [エクスポート] を選択します。

- [概要] をクリックすると、実行履歴、レコード、エラーログ、警告のグラフが表示されます。
7. 検証
このセクションでは、データ パイプラインの実行を検証します。
- パイプラインが正常に実行され、継続的に実行されていることを確認します。

- TIMESTAMP に基づいて BigQuery テーブルに更新されたレコードが読み込まれていることを確認します。この例では、2 つの患者レコードまたはメッセージと 1 つのアレルギー レコードまたはメッセージが 2019 年 6 月 25 日に Pub/Sub トピックにパブリッシュされました。
bq query --nouse_legacy_sql 'select (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Patients' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC" ) as Patients, (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
| 2 | 1 |
+----------+-----------+
- <your-topic> にパブリッシュされたメッセージが <your-sub> サブスクライバーによって受信されたことを確認します。
gcloud pubsub subscriptions pull --auto-ack <your-sub>

結果の表示
リアルタイム パイプラインの実行中にメッセージが Pub/Sub トピックにパブリッシュされた後の結果を表示するには:
- BigQuery UI でテーブルに対してクエリを実行します。BigQuery UI に移動
- 次のクエリを、実際のプロジェクト名、データセット、テーブルに更新します。

8. クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud Platform アカウントに課金されないようにする手順は次のとおりです。
チュートリアルが終了したら、GCP で作成したリソースをクリーンアップして、割り当てを使い果たしたり、今後料金が発生しないようにします。次のセクションで、このようなリソースを削除または無効にする方法を説明します。
BigQuery データセットの削除
このチュートリアルで作成した BigQuery データセットを削除するには、次の手順を行います。
GCS バケットの削除
このチュートリアルで作成した GCS バケットを削除する手順は次のとおりです。
Cloud Data Fusion インスタンスの削除
手順に沿って Cloud Data Fusion インスタンスを削除します。
プロジェクトの削除
課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。
プロジェクトを削除するには、次の操作を行います。
- GCP Console で [プロジェクト] ページに移動します。プロジェクト ページに移動
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
9. 完了
お疲れさまでした。Cloud Data Fusion を使用して BigQuery に医療データを取り込むコードラボを完了しました。
CSV データを Pub/Sub トピックにパブリッシュし、BigQuery に読み込みました。
ヘルスケア データの読み込み、変換、マスキングを行うデータ統合パイプラインをリアルタイムで視覚的に構築しました。
これで、Google Cloud Platform の BigQuery でヘルスケア データ分析を開始するために必要な主な手順を理解できました。