Last Updated: 2019-08-23

This codelab demonstrates a data ingestion pattern to ingest CSV formatted healthcare data into BigQuery in bulk. We will use Cloud Data fusion Batch Data pipeline for this lab. Realistic healthcare test data has been generated and made available in the Google Cloud Storage bucket (gs://hcls-public-data-fhir-subset/csv/) for you.

In this code lab you will learn:

What do you need to run this codelab?

If you don't have a GCP Project, follow these steps to create a new GCP Project.

Healthcare data in CSV format has been pre-loaded into GCS bucket at gs://hcls-public-data-fhir-subset/csv/. Each resource CSV file has its unique schema structure. For example, Patients.csv has a different schema than Providers.csv. Pre-loaded schema files can be found at gs://hcls-public-data-fhir-subset/csv_schemas.

If you need a new dataset, you can always generate it using SyntheaTM. Then, upload it to GCS instead of copying it from bucket at Copy input data step.

Initialize shell variables for your environment.

To find the PROJECT_ID, refer to Identifying projects.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Create GCS bucket to store input data and error logs using gsutil tool.

gsutil mb -l us gs://$BUCKET_NAME

Copy input data.

gsutil -m cp -r gs://hcls-public-data-fhir-subset/csv gs://$BUCKET_NAME/input

Create a BigQuery Dataset.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

Follow these steps to enable the Cloud Data Fusion API and grant required permissions:

Enable APIs.

  1. Go to the GCP Console API Library.
  2. From the projects list, select your project.
  3. In the API Library, select the API you want to enable. If you need help finding the API, use the search field and/or the filters.
  4. On the API page, click ENABLE.

Create a Cloud Data Fusion instance.

  1. In GCP Console, select your ProjectID.
  2. Select Data Fusion from the left menu, then click the CREATE AN INSTANCE button in the middle of the page (1st creation), or click the CREATE INSTANCE button at the top menu (additional creation).

  1. Provide the instance name. Select Basic.

  1. Click the CREATE button.

Setup instance permissions.

After creating an instance, use the following steps to grant the service account associated with the instance permissions on your project:

  1. Navigate to the instance detail page by clicking the instance name.

  1. Copy the service account.

  1. Navigate to the IAM Page of your project.
  2. On the IAM permissions page, we will now add the service account as a new member and grant it the Cloud Data Fusion API Service Agent role. Click the Add button, then paste the "service account" in the New members field and select Service Management -> Cloud Data Fusion API Server Agent role.

  1. Click Save.

Once these steps are done, you can start using Cloud Data Fusion by clicking the View Instance link on the Cloud Data Fusion instances page, or the details page of an instance.

Set up the firewall rule.

  1. Navigate to GCP Console -> VPC Network -> Firewall rules to check if the default-allow-ssh rule exists or not.

  1. If not, add a firewall rule that allows all ingress SSH traffic to the default network.

Using command line:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Using UI: Click Create Firewall Rule and fill out the information:

Now that we have the Cloud Fusion environment in GCP let's build a schema. We need this schema for transformation of the CSV data.

  1. Navigate to your Data Fusion instance as described previously, then click on the menu drop down and enter Studio.

  1. Under the Transform section in the Plugin palette on the left, double-click on the Wrangler node, which will appear in the Data Pipelines UI.

  1. Point to the Wrangler node and click Properties. Click the Wrangle button, then select a .csv source file, which must have all data fields to build the desired schema.
  2. Apply data transformations by clicking the Down arrow (Column Transformations) next to each column name. By default, the initial import will assume there is only one column in your data file. To parse it as a CSV, choose ParseCSV, then select the delimiter and check the "Set first row as header" box as appropriate.

Additionally, you can try out other transformations such as removing columns, changing data type for some columns (default is "string" type), splitting columns, setting column names, etc.

  1. The "Columns" and "Transformation steps" tabs show output schema and the Wrangler's recipe. Click Apply at the upper right corner.
  2. In Wrangler Properties, click the Actions dropdown to Export the desired schema into your local storage for future Import if needed.
  3. Save the Wrangler Recipe for future usage.
  4. To close the Wrangler Properties window, click the X button.

In this section we will build the pipeline components.

  1. On the Cloud Data Fusion View Instance details page, click the green + button at the upper right corner, then select Create Pipeline.
  2. Once the pipeline studio appears, in the upper left, you should see that Data Pipeline - Batch is selected as the pipeline type.
  3. In the Data Pipelines UI, you will see different sections on the left panel as Filter, Source, Transform, Analytics, Sink, Conditions and Actions, Error Handlers and Alerts where you can select a node or nodes for the pipeline.

Source node

  1. Select the Source node.
  2. Under the Source section in the Plugin palette on the left, double-click on the Google Cloud Storage node, which appears in the Data Pipelines UI.
  3. Point to the GCS source node and click Properties.

  1. Fill in the required fields. Set following fields:
  1. Add field ‘filename' to the GCS Output Schema by clicking the + button.
  2. Click Reference for detailed explanation.
  3. To close the GCS Properties, click the X button.

Transform node

  1. Select the Transform node.
  2. Under the Transform section in the Plugin palette on the left, double-click the Wrangler node, which appears in the Data Pipelines UI.
  3. Point to the Wrangler node and click Properties.
  4. Click the Actions drop down to Import a saved schema and paste the saved recipe.
  5. Or, reuse the Wrangler node from Build a schema for transformation.
  6. Fill in the required fields. Set following fields:
  1. Click Reference for a detailed explanation.

  1. You can add a JavaScript node to execute the user-provided JavaScript that further transforms the records. In this codelab, we utilize the JavaScript node to get a timestamp for each record update. Open JavaScript Properties, and add the following function:

function transform(input, emitter, context) {
  input.TIMESTAMP = (new Date()).getTime()*1000;
  emitter.emit(input);
}
  1. Add the field named TIMESTAMP to the Output Schema by clicking the + sign. Select the timestamp as the data type.

  1. To close the Transform Properties window, click the X button.

Data masking and de-identification

  1. You can select individual data columns by clicking the down arrow in the column and applying masking rules under the Mask data selection as per your requirements (for example, SSN column).

  1. You can add more Directives in the Recipe window of the Wrangler node. For example, using the hash directive with the hashing algorithm following this syntax for de-identification purpose:
hash <column> <algorithm> [<encode>]

<column>: name of the column
<algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.)
<encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

Sink node

  1. Select the sink node.
  2. Under the Sink section in the Plugin palette on the left, double click on BigQuery node, which will appear in the Data Pipeline UI.
  3. Point to the BigQuery sink node and click Properties.

  1. Fill in required fields. Set following fields:
  1. Click Reference for a detailed explanation.

  1. To close the BigQuery Properties, click the X button.

Connecting all nodes in a pipeline

  1. Drag a connection arrow > on the right edge of the source node and drop on the left edge of destination node.
  2. A pipeline can have multiple branches that get input files from the same GCS Source node.

  1. Name the pipeline.

That's it. You've just created your first Batch data pipeline and can deploy and run the pipeline.

Send pipeline alerts via email (optional)

To utilize the Pipeline Alert SendEmail feature, the configuration requires a mail server to be setup for sending mail from a virtual machine instance. See the reference link below for more information:

https://cloud.google.com/compute/docs/tutorials/sending-mail/

In this codelab, we set up a mail relay service through Mailgun using the following steps:

  1. Follow the instructions at https://cloud.google.com/compute/docs/tutorials/sending-mail/using-mailgun to set up an account with Mailgun and configure the email relay service. Additional modifications are below.
  2. Add all recipients' email addresses to Mailgun's authorized list. This list can be found in Mailgun>Sending>Overview option on the left panel.

Once the recipients click "I Agree" on the email sent from support@mailgun.net, their email addresses are saved in the authorized list to receive pipeline alert emails.

  1. Step 3 of "Before you begin" section - create a Firewall rule as following:

  1. Step 3 of "Configuring Mailgun as a mail relay with Postfix". Select Internet Site or Internet with smarthost, instead of Local Only as mentioned in the instructions.

  1. Step 4 of "Configuring Mailgun as a mail relay with Postfix". Edit vi /etc/postfix/main.cf to add 10.128.0.0/9 at the end of mynetworks.

  1. Edit vi /etc/postfix/master.cf to change default smtp (25) to port 587.

  1. At the upper-right corner of Data Fusion studio, click Configure. Click Pipeline alert and click + button to open the Alerts window. Select SendEmail.

  1. Fill out the Email configuration form. Select completion, success, or failure from Run Condition dropdown for each alert type. If Include Workflow Token = false, only the information from the Message field is sent. If Include Workflow Token = true, the information from the Message field and Workflow Token detailed information issent. You must use lowercase for Protocol. Use any "fake" email other than your company email address for Sender.

  1. In the upper-right corner of Data Fusion studio, click Configure. Select Spark for Engine Config. Click Save in Configure window.

  1. Click Preview to Preview data, and click Preview again to toggle back to the previous window. You can also Run the pipeline in Preview mode.

  1. Click Logs to view logs.
  2. Click Save to save all changes.
  3. Click Import to import saved pipeline configuration when building new pipeline.
  4. Click Export to export a pipeline configuration.
  5. Click Deploy to deploy the pipeline.
  6. Once deployed, click Run and wait for the pipeline to run to completion.


  1. You can Duplicate the pipeline by selecting Duplicate under the Actions button.
  2. You can Export Pipeline Configuration by selecting Export under the Actions button.
  3. Click Inbound triggers or Outbound triggers on the left or right edge of the Studio window to set pipeline triggers if desired.
  4. Click Schedule to schedule the pipeline to run and load data periodically.

  1. Summary shows charts of Run history, records, error logs and warnings.
  1. The Validate pipeline was executed successfully.

  1. Validate if BigQuery Dataset has all tables.
bq ls $PROJECT_ID:$DATASET_ID
     tableId       Type    Labels   Time Partitioning
----------------- ------- -------- -------------------
 Allergies         TABLE
 Careplans         TABLE
 Conditions        TABLE
 Encounters        TABLE
 Imaging_Studies   TABLE
 Immunizations     TABLE
 Medications       TABLE
 Observations      TABLE
 Organizations     TABLE
 Patients          TABLE
 Procedures        TABLE
 Providers         TABLE
  1. Receive alert emails (if configured).

Viewing the results

To view the results after the pipeline runs:

  1. Query the table in the BigQuery UI.
    GO TO THE BIGQUERY UI
  2. Update the query below to your own project name, dataset, and table.

To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:

After you've finished the tutorial, you can clean up the resources that you created on GCP so they won't take up your quota, and you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.

Deleting the BigQuery dataset

Follow these instructions to delete the BigQuery dataset you created as part of this tutorial.

Deleting the GCS Bucket

Follow these instructions to delete the GCS bucket you created as part of this tutorial.

Deleting the Cloud Data Fusion instance

Follow these instructions to delete your Cloud Data Fusion instance.

Deleting the project

The easiest way to eliminate billing is to delete the project that you created for the tutorial.

To delete the project:

  1. In the GCP Console, go to the Projects page.
    GO TO THE PROJECTS PAGE
  2. In the project list, select the project you want to delete and click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Congratulations, you've successfully completed the code lab to ingest healthcare data in BigQuery using Cloud Data Fusion.

You imported CSV data from Google Cloud Storage into BigQuery.

You visually built the data integration pipeline for loading, transforming and masking healthcare data in bulk.

You now know the key steps required to start your Healthcare Data Analytics journey with BigQuery on Google Cloud Platform.