Last Updated: 2019-08-23

This codelab demonstrates a data ingestion pattern to ingest CSV formatted healthcare data into BigQuery in real time. We will use Cloud Data fusion Real time Data pipeline for this lab. Realistic healthcare test data has been generated and made available in the Google Cloud Storage bucket (gs://hcls-public-data-fhir-subset/csv/) for you.

In this code lab you will learn:

What do you need to run this demo?

If you don't have a GCP project, follow these steps to create a new GCP project.

Healthcare data in CSV format has been pre-loaded into the GCS bucket at gs://hcls-public-data-fhir-subset/csv/. Each CSV resource file has a unique schema structure. For example, Patients.csv has a different schema than Providers.csv. Pre-loaded schema files can be found at gs://hcls-public-data-fhir-subset/csv_schemas.

If you need a new dataset, you can always generate it using SyntheaTM. Then, upload it to GCS instead of copying it from the bucket at the Copy input data step.

Initialize shell variables for your environment.

To find the PROJECT_ID, refer to Identifying projects.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Create a GCS bucket to store input data and error logs using gsutil tool.

gsutil mb -l us gs://$BUCKET_NAME

Copy input data.

gsutil -m cp -r gs://hcls-public-data-fhir-subset/csv gs://$BUCKET_NAME

Create a BigQuery dataset.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

Install and initialize the Google Cloud SDK and create Pub or Sub Topic and Subscriptions.

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

Follow these steps to enable the Cloud Data Fusion API and grant the required permissions:

Enable APIs.

  1. Go to the GCP Console API Library.
  2. From the projects list, select your project.
  3. In the API Library, select the API you want to enable (Cloud Data Fusion API, Cloud Pub/Sub API). If you need help finding the API, use the search field and the filters.
  4. On the API page, click ENABLE.

Create a Cloud Data Fusion instance.

  1. In the GCP Console, select your ProjectID.
  2. Select Data Fusion from the left menu, then click the CREATE AN INSTANCE button in the middle of the page (1st creation), or click the CREATE INSTANCE button at the top menu (additional creation).

  1. Provide the instance name. Select Enterprise.

  1. Click the CREATE button.

Set up instance permissions.

After creating an instance, use the following steps to grant the service account associated with the instance permissions on your project:

  1. Navigate to the instance detail page by clicking the instance name.

  1. Copy the service account.

  1. Navigate to the IAM page of your project.
  2. On the IAM permissions page, grant the service account the Cloud Data Fusion API Service Agent role by clicking the Add button. Paste the "service account" in the New members field and select Service Management -> Cloud Data Fusion API Server Agent role.

  1. Click + Add another role (or Edit Cloud Data Fusion API Service Agent) to add a Pub/Sub Subscriber role.

  1. Click Save.

Once these steps are done, you can start using Cloud Data Fusion by clicking the View Instance link on the Cloud Data Fusion instances page, or the details page of an instance.

Set up the firewall rule.

  1. Navigate to GCP Console -> VPC Network -> Firewall rules to check if the default-allow-ssh rule exists or not.

  1. If not, add a firewall rule that allows all ingress SSH traffic to the default network.

Using the command line:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Using UI: Click Create Firewall Rule and fill out the information:

Now that we have the Cloud Data Fusion environment in GCP, let's start building the Data pipelines in Cloud Data Fusion using the following steps:

  1. On the Cloud Data Fusion View Instance details page, click the green + button at the upper right corner, then select Create Pipeline.
  2. Once the pipeline studio appears, in the upper left, select Data Pipeline - Realtime from the dropdown.
  3. In the Data Pipelines UI, you will see different sections on the left panel as Filter, Source, Transform, Analytics, Sink, Conditions and Actions, Error Handlers and Alerts where you can select a node or nodes for the pipeline.

Select a Source node.

  1. Under the Source section in the Plugin palette on the left, double-click on the Google Cloud PubSub node, which appears in the Data Pipelines UI.
  2. Point to the PubSub source node and click Properties.

  1. Fill in the required fields. Set the following fields:
  1. Click Reference for a detailed explanation.

  1. To close the Pub/Sub Properties, click the X button.

Select the Transform node.

  1. Under the Transform section in the Plugin palette on the left, double-click the Projection node, which appears in the Data Pipelines UI.
  2. Point to the Projection node and click Properties.

  1. Fill in the required fields. Set the following fields:
  1. Click Reference for a detailed explanation.

  1. Under the Transform section in the Plugin palette on the left, double click on Wrangler node, which appears in the Data Pipelines UI. Point to the Wrangler node and click Properties.

  1. Click Actions and drop down to Import saved schema ( gs://hcls-public-data-fhir-subset/csv_schemas). Add the TIMESTAMP field in Output Schema if it doesn't exist.
  2. Fill in the required fields. Set the following fields:
  1. Click Reference for a detailed explanation.

  1. Set the column names in a preferred order and drop fields you don't need as shown in the Recipe.

  1. Refer to Batch-Codelab - CSV to BigQuery via CDF for Data masking and de-identification.
  2. To close the Transform Properties window, click the X button.

Select the Sink node.

  1. Under the Sink section in the Plugin palette on the left, double click on the BigQuery node, which appears in the Data Pipeline UI.
  2. Point to the BigQuery sink node and click Properties.

  1. Fill in the required fields:
  1. Click Reference for a detailed explanation.

  1. To close the BigQuery Properties, click the X button.

In the previous section we created nodes that are required for building a data pipeline in Cloud Data Fusion. In this section we connect the nodes to build the actual pipeline.

Connecting all nodes in a pipeline

  1. Drag a connection arrow > on the right edge of the source node and drop on the left edge of destination node.
  2. A pipeline can have multiple branches that get published messages from the same PubSub Source node.

  1. Name the pipeline.

That's it. You've just created your first real time data pipeline to be deployed and run.

Send messages through Cloud Pub/Sub

Using the Pub/Sub UI:

  1. Navigate to GCP console -> Pub/Sub -> Topics, select your-topic, then click PUBLISH MESSAGE at the top menu.

  1. Place only one record row at a time in the Message field. Provide Key = filename, Value = <type of record> (for example, patients, providers, allergies, etc.).
  2. Click the Publish button to send the message.

Using the gcloud command:

  1. Manually provide the message.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. Semi-automatically provide the message using cat and sed unix commands. This command can be run repeatedly with different parameters.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

Now that we have developed the data pipeline, we can deploy and run it in Cloud Data Fusion.

  1. Keep the Configure defaults.
  2. Click Preview to preview the data. Click Preview again to toggle back to the previous window. You can also run the pipeline in Preview mode by clicking RUN.

  1. Click Logs to view logs.
  2. Click Save to save all changes.
  3. Click Import to import the saved pipeline configuration when building a new pipeline.
  4. Click Export to export a pipeline configuration.
  5. Click Deploy to deploy the pipeline.
  6. Once deployed, click Run and wait for the pipeline to run to completion.

  1. Click Stop to stop the pipeline run at anytime.
  2. You can duplicate the pipeline by selecting Duplicate under the Actions button.
  3. You can export the pipeline configuration by selecting Export under the Actions button.

  1. Click Summary to show charts of Run history, records, error logs and warnings.

In this section we validate the execution of the data pipeline.

  1. Validate that the pipeline was executed successfully and running continuously.

  1. Validate that the BigQuery tables are loaded with updated records based on the TIMESTAMP. In this example, two patient records or messages and one allergy record or message were published to the Pub/Sub topic on 2019-06-26.
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. Validate that the messages published to <your-topic> were received by <your-sub> subscriber.
gcloud pubsub subscription pull --auto-ack <your-sub>

Viewing the results

To view the results after the messages are published to the Pub/Sub topic while the Realtime pipeline is running:

  1. Query the table in the BigQuery UI.
    GO TO THE BIGQUERY UI
  2. Update the query below to your own project name, dataset, and table.

To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:

After you've finished the tutorial, you can clean up the resources that you created on GCP so they won't take up quota and you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.

Deleting the BigQuery dataset

Follow these instructions to delete the BigQuery dataset you created as part of this tutorial.

Deleting the GCS Bucket

Follow these instructions to delete the GCS bucket you created as part of this tutorial.

Deleting the Cloud Data Fusion instance

Follow these instructions to delete your Cloud Data Fusion instance.

Deleting the project

The easiest way to eliminate billing is to delete the project that you created for the tutorial.

To delete the project:

  1. In the GCP Console, go to the Projects page.
    GO TO THE PROJECTS PAGE
  2. In the project list, select the project you want to delete and click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Congratulations, you've successfully completed the code lab to ingest healthcare data in BigQuery using Cloud Data Fusion.

You published CSV data to Pub/Sub topic then loaded into BigQuery.

You visually built a data-integration pipeline for loading, transforming and masking healthcare data in real time.

You now know the key steps required to start your Healthcare Data Analytics journey with BigQuery on Google Cloud Platform.