Ingest CSV data to BigQuery using Cloud Data Fusion - Batch ingestion

1. Introduction

12fb66cc134b50ef.png

Last Updated: 2020-02-28

This codelab demonstrates a data ingestion pattern to ingest CSV formatted healthcare data into BigQuery in bulk. We will use Cloud Data fusion Batch Data pipeline for this lab. Realistic healthcare test data has been generated and made available in the Google Cloud Storage bucket (gs://hcls_testing_data_fhir_10_patients/csv/) for you.

In this code lab you will learn:

  • How to ingest CSV data (batch-scheduled loading) from GCS to BigQuery using Cloud Data Fusion.
  • How to visually build a data integration pipeline in Cloud Data Fusion for loading, transforming and masking healthcare data in bulk.

What do you need to run this codelab?

  • You need access to a GCP Project.
  • You must be assigned an Owner role for the GCP Project.
  • Healthcare data in CSV format, including the header.

If you don't have a GCP Project, follow these steps to create a new GCP Project.

Healthcare data in CSV format has been pre-loaded into GCS bucket at gs://hcls_testing_data_fhir_10_patients/csv/. Each resource CSV file has its unique schema structure. For example, Patients.csv has a different schema than Providers.csv. Pre-loaded schema files can be found at gs://hcls_testing_data_fhir_10_patients/csv_schemas.

If you need a new dataset, you can always generate it using SyntheaTM. Then, upload it to GCS instead of copying it from the bucket at Copy input data step.

2. GCP Project Setup

Initialize shell variables for your environment.

To find the PROJECT_ID, refer to Identifying projects.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Create GCS bucket to store input data and error logs using gsutil tool.

gsutil mb -l us gs://$BUCKET_NAME

Get access to the synthetic dataset.

  1. From the email address you are using to login to Cloud Console, send an email to hcls-solutions-external+subscribe@google.com requesting to join.
  2. You will receive an email with instructions on how to confirm the action. 525a0fa752e0acae.png
  3. Use the option to respond to the email to join the group. DO NOT click the button.
  4. Once you receive the confirmation email, you can proceed to the next step in the codelab.

Copy input data.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Create a BigQuery Dataset.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

3. Cloud Data Fusion Environment Setup

Follow these steps to enable the Cloud Data Fusion API and grant required permissions:

Enable APIs.

  1. Go to the GCP Console API Library.
  2. From the projects list, select your project.
  3. In the API Library, select the API you want to enable. If you need help finding the API, use the search field and/or the filters.
  4. On the API page, click ENABLE.

Create a Cloud Data Fusion instance.

  1. In GCP Console, select your ProjectID.
  2. Select Data Fusion from the left menu, then click the CREATE AN INSTANCE button in the middle of the page (1st creation), or click the CREATE INSTANCE button at the top menu (additional creation).

a828690ff3bf3c46.png

8372c944c94737ea.png

  1. Provide the instance name. Select Enterprise.

5af91e46917260ff.png

  1. Click the CREATE button.

Setup instance permissions.

After creating an instance, use the following steps to grant the service account associated with the instance permissions on your project:

  1. Navigate to the instance detail page by clicking the instance name.

76ad691f795e1ab3.png

  1. Copy the service account.

6c91836afb72209d.png

  1. Navigate to the IAM Page of your project.
  2. On the IAM permissions page, we will now add the service account as a new member and grant it the Cloud Data Fusion API Service Agent role. Click the Add button, then paste the "service account" in the New members field and select Service Management -> Cloud Data Fusion API Server Agent role.
  3. ea68b28d917a24b1.png
  4. Click Save.

Once these steps are done, you can start using Cloud Data Fusion by clicking the View Instance link on the Cloud Data Fusion instances page, or the details page of an instance.

Set up the firewall rule.

  1. Navigate to GCP Console -> VPC Network -> Firewall rules to check if the default-allow-ssh rule exists or not.

102adef44bbe3a45.png

  1. If not, add a firewall rule that allows all ingress SSH traffic to the default network.

Using command line:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Using UI: Click Create Firewall Rule and fill out the information:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Build a Schema for transformation

Now that we have the Cloud Fusion environment in GCP let's build a schema. We need this schema for transformation of the CSV data.

  1. In the Cloud Data Fusion window, click the View Instance link in the Action column. You will be redirected to another page. Click the provided url to open Cloud Data Fusion instance. Your choice to click "Start Tour" or "No, Thanks" button at the Welcome popup.
  2. Expand the "hamburger" menu, select Pipeline -> Studio

6561b13f30e36c3a.png

  1. Under the Transform section in the Plugin palette on the left, double-click on the Wrangler node, which will appear in the Data Pipelines UI.

aa44a4db5fe6623a.png

  1. Point to the Wrangler node and click Properties. Click the Wrangle button, then select a .csv source file (for example, patients.csv), which must have all data fields to build the desired schema.
  2. Click the Down arrow (Column Transformations) next to each column name (for example, body). 802edca8a97da18.png
  3. By default, the initial import will assume there is only one column in your data file. To parse it as a CSV, choose ParseCSV, then select the delimiter and check the "Set first row as header" box as appropriate. Click the Apply button.
  4. Click down arrow next to Body field, select Delete Column to remove Body field. Additionally, you can try out other transformations such as removing columns, changing data type for some columns (default is "string" type), splitting columns, setting column names, etc.

e6d2cda51ff298e7.png

  1. The "Columns" and "Transformation steps" tabs show output schema and the Wrangler's recipe. Click Apply at the upper right corner. Click the Validate button. The green "No errors found" indicates success.

1add853c43f2abee.png

  1. In Wrangler Properties, click the Actions dropdown to Export the desired schema into your local storage for future Import if needed.
  2. Save the Wrangler Recipe for future usage.
parse-as-csv :body ',' true
drop body
  1. To close the Wrangler Properties window, click the X button.

5. Build nodes for the pipeline

In this section we will build the pipeline components.

  1. In the Data Pipelines UI, in the upper left, you should see that Data Pipeline - Batch is selected as the pipeline type.

af67c42ce3d98529.png

  1. There are different sections on the left panel as Filter, Source, Transform, Analytics, Sink, Conditions and Actions, Error Handlers and Alerts where you can select a node or nodes for the pipeline.

c4438f7682f8b19b.png

Source node

  1. Select the Source node.
  2. Under the Source section in the Plugin palette on the left, double-click on the Google Cloud Storage node, which appears in the Data Pipelines UI.
  3. Point to the GCS source node and click Properties.

87e51a3e8dae8b3f.png

  1. Fill in the required fields. Set following fields:
  • Label = {any text}
  • Reference name = {any text}
  • Project ID = auto detect
  • Path = GCS URL to bucket in your current project. For example, gs://$BUCKET_NAME/csv/
  • Format = text
  • Path Field = filename
  • Path Filename Only = true
  • Read Files Recursively = true
  1. Add field ‘filename' to the GCS Output Schema by clicking the + button.
  2. Click Documentation for detailed explanation. Click the Validate button. The green "No errors found" indicates success.
  3. To close the GCS Properties, click the X button.

Transform node

  1. Select the Transform node.
  2. Under the Transform section in the Plugin palette on the left, double-click the Wrangler node, which appears in the Data Pipelines UI. Connect GCS source node to Wrangler transform node.
  3. Point to the Wrangler node and click Properties.
  4. Click Actions drop down and select Import to import a saved schema (for example: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json), and paste the saved recipe from previous section.
  5. Or, reuse the Wrangler node from the section: Build a schema for transformation.
  6. Fill in the required fields. Set following fields:
  • Label = {any text}
  • Input field name = {*}
  • Precondition = {filename != "patients.csv"} to distinguish each input file (for example,. patients.csv, providers.csv, allergies.csv, etc.) from the Source node.

2426f8f0a6c4c670.png

  1. Add a JavaScript node to execute the user-provided JavaScript that further transforms the records. In this codelab, we utilize the JavaScript node to get a timestamp for each record update. Connect Wrangler transform node to JavaScript transform node. Open JavaScript Properties, and add the following function:

75212f9ad98265a8.png

function transform(input, emitter, context) {
  input.TIMESTAMP = (new Date()).getTime()*1000;
  emitter.emit(input);
}
  1. Add the field named TIMESTAMP to the Output Schema (if it doesn't exist) by clicking the + sign. Select the timestamp as the data type.

4227389b57661135.png

  1. Click Documentation for a detailed explanation. Click the Validate button to validate all input information. Green "No errors found" indicates success.
  2. To close the Transform Properties window, click the X button.

Data masking and de-identification

  1. You can select individual data columns by clicking the down arrow in the column and applying masking rules under the Mask data selection as per your requirements (for example, SSN column).

bb1eb067dd6e0946.png

  1. You can add more Directives in the Recipe window of the Wrangler node. For example, using the hash directive with the hashing algorithm following this syntax for de-identification purpose:
hash <column> <algorithm> <encode>

<column>: name of the column
<algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.)
<encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

cbcc9a0932f53197.png

Sink node

  1. Select the sink node.
  2. Under the Sink section in the Plugin palette on the left, double click on BigQuery node, which will appear in the Data Pipeline UI.
  3. Point to the BigQuery sink node and click Properties.

1be711152c92c692.png

  1. Fill in required fields. Set following fields:
  • Label = {any text}
  • Reference name = {any text}
  • Project ID = auto detect
  • Dataset = BigQuery dataset used in current project (i.e. DATASET_ID)
  • Table = {table name}
  1. Click Documentation for a detailed explanation. Click the Validate button to validate all input information. Green "No errors found" indicates success.

c5585747da2ef341.png

  1. To close the BigQuery Properties, click the X button.

6. Build Batch data pipeline

Connecting all nodes in a pipeline

  1. Drag a connection arrow > on the right edge of the source node and drop on the left edge of destination node.
  2. A pipeline can have multiple branches that get input files from the same GCS Source node.

67510ab46bd44d36.png

  1. Name the pipeline.

That's it. You've just created your first Batch data pipeline and can deploy and run the pipeline.

Send pipeline alerts via email (optional)

To utilize the Pipeline Alert SendEmail feature, the configuration requires a mail server to be setup for sending mail from a virtual machine instance. See the reference link below for more information:

Sending email from an instance | Compute Engine Documentation

In this codelab, we set up a mail relay service through Mailgun using the following steps:

  1. Follow the instructions at Sending email with Mailgun | Compute Engine Documentation to set up an account with Mailgun and configure the email relay service. Additional modifications are below.
  2. Add all recipients' email addresses to Mailgun's authorized list. This list can be found in Mailgun>Sending>Overview option on the left panel.

7e6224cced3fa4e0.png fa78739f1ddf2dc2.png

Once the recipients click "I Agree" on the email sent from support@mailgun.net, their email addresses are saved in the authorized list to receive pipeline alert emails.

72847c97fd5fce0f.png

  1. Step 3 of "Before you begin" section - create a Firewall rule as following:

75b063c165091912.png

  1. Step 3 of "Configuring Mailgun as a mail relay with Postfix". Select Internet Site or Internet with smarthost, instead of Local Only as mentioned in the instructions.

8fd8474a4ef18f16.png

  1. Step 4 of "Configuring Mailgun as a mail relay with Postfix". Edit vi /etc/postfix/main.cf to add 10.128.0.0/9 at the end of mynetworks.

249fbf3edeff1ce8.png

  1. Edit vi /etc/postfix/master.cf to change default smtp (25) to port 587.

86c82cf48c687e72.png

  1. At the upper-right corner of Data Fusion studio, click Configure. Click Pipeline alert and click + button to open the Alerts window. Select SendEmail.

dc079a91f1b0da68.png

  1. Fill out the Email configuration form. Select completion, success, or failure from Run Condition dropdown for each alert type. If Include Workflow Token = false, only the information from the Message field is sent. If Include Workflow Token = true, the information from the Message field and Workflow Token detailed information issent. You must use lowercase for Protocol. Use any "fake" email other than your company email address for Sender.

1fa619b6ce28f5e5.png

7. Configure, Deploy, Run/Schedule Pipeline

db612e62a1c7ab7e.png

  1. In the upper-right corner of Data Fusion studio, click Configure. Select Spark for Engine Config. Click Save in Configure window.

8ecf7c243c125882.png

  1. Click Preview to Preview data**,** and click **Preview** again to toggle back to the previous window. You can also **Run** the pipeline in Preview mode.

b3c891e5e1aa20ae.png

  1. Click Logs to view logs.
  2. Click Save to save all changes.
  3. Click Import to import saved pipeline configuration when building new pipeline.
  4. Click Export to export a pipeline configuration.
  5. Click Deploy to deploy the pipeline.
  6. Once deployed, click Run and wait for the pipeline to run to completion.

bb06001d46a293db.png

  1. You can Duplicate the pipeline by selecting Duplicate under the Actions button.
  2. You can Export Pipeline Configuration by selecting Export under the Actions button.
  3. Click Inbound triggers or Outbound triggers on the left or right edge of the Studio window to set pipeline triggers if desired.
  4. Click Schedule to schedule the pipeline to run and load data periodically.

4167fa67550a49d5.png

  1. Summary shows charts of Run history, records, error logs and warnings.

8. Validation

  1. The Validate pipeline was executed successfully.

7dee6e662c323f14.png

  1. Validate if BigQuery Dataset has all tables.
bq ls $PROJECT_ID:$DATASET_ID
     tableId       Type    Labels   Time Partitioning
----------------- ------- -------- -------------------
 Allergies         TABLE
 Careplans         TABLE
 Conditions        TABLE
 Encounters        TABLE
 Imaging_Studies   TABLE
 Immunizations     TABLE
 Medications       TABLE
 Observations      TABLE
 Organizations     TABLE
 Patients          TABLE
 Procedures        TABLE
 Providers         TABLE
  1. Receive alert emails (if configured).

Viewing the results

To view the results after the pipeline runs:

  1. Query the table in the BigQuery UI. GO TO THE BIGQUERY UI
  2. Update the query below to your own project name, dataset, and table.

e32bfd5d965a117f.png

9. Cleaning up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:

After you've finished the tutorial, you can clean up the resources that you created on GCP so they won't take up your quota, and you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.

Deleting the BigQuery dataset

Follow these instructions to delete the BigQuery dataset you created as part of this tutorial.

Deleting the GCS Bucket

Follow these instructions to delete the GCS bucket you created as part of this tutorial.

Deleting the Cloud Data Fusion instance

Follow these instructions to delete your Cloud Data Fusion instance.

Deleting the project

The easiest way to eliminate billing is to delete the project that you created for the tutorial.

To delete the project:

  1. In the GCP Console, go to the Projects page. GO TO THE PROJECTS PAGE
  2. In the project list, select the project you want to delete and click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

10. Congratulations

Congratulations, you've successfully completed the code lab to ingest healthcare data in BigQuery using Cloud Data Fusion.

You imported CSV data from Google Cloud Storage into BigQuery.

You visually built the data integration pipeline for loading, transforming and masking healthcare data in bulk.

You now know the key steps required to start your Healthcare Data Analytics journey with BigQuery on Google Cloud Platform.