Ingest CSV (Comma-separated values) data to BigQuery using Cloud Data Fusion - Real time ingestion

1. Introduction

509db33558ae025.png

Last Updated: 2020-02-28

This codelab demonstrates a data ingestion pattern to ingest CSV formatted healthcare data into BigQuery in real time. We will use Cloud Data fusion Real time Data pipeline for this lab. Realistic healthcare test data has been generated and made available in the Google Cloud Storage bucket (gs://hcls_testing_data_fhir_10_patients/csv/) for you.

In this code lab you will learn:

  • How to ingest CSV data (real time loading) from Pub/Sub to BigQuery using Cloud Data Fusion.
  • How to visually build a data integration pipeline in Cloud Data Fusion for loading, transforming, and masking healthcare data in real time.

What do you need to run this demo?

  • You need access to a GCP project.
  • You must be assigned an Owner role to the GCP project.
  • Healthcare data in CSV format, including the header.

If you don't have a GCP project, follow these steps to create a new GCP project.

Healthcare data in CSV format has been pre-loaded into the GCS bucket at gs://hcls_testing_data_fhir_10_patients/csv/. Each CSV resource file has a unique schema structure. For example, Patients.csv has a different schema than Providers.csv. Pre-loaded schema files can be found at gs://hcls_testing_data_fhir_10_patients/csv_schemas.

If you need a new dataset, you can always generate it using SyntheaTM. Then, upload it to GCS instead of copying it from the bucket at the Copy input data step.

2. GCP Project Setup

Initialize shell variables for your environment.

To find the PROJECT_ID, refer to Identifying projects.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Create a GCS bucket to store input data and error logs using gsutil tool.

gsutil mb -l us gs://$BUCKET_NAME

Get access to the synthetic dataset.

  1. From the email address you are using to login to Cloud Console, send an email to hcls-solutions-external+subscribe@google.com requesting to join.
  2. You will receive an email with instructions on how to confirm the action.
  3. Use the option to respond to the email to join the group. DO NOT click the 525a0fa752e0acae.pngbutton.
  4. Once you receive the confirmation email, you can proceed to the next step in the codelab.

Copy input data.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Create a BigQuery dataset.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

Install and initialize the Google Cloud SDK and create Pub or Sub Topic and Subscriptions.

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Cloud Data Fusion Environment Setup

Follow these steps to enable the Cloud Data Fusion API and grant the required permissions:

Enable APIs.

  1. Go to the GCP Console API Library.
  2. From the projects list, select your project.
  3. In the API Library, select the API you want to enable ( Cloud Data Fusion API, Cloud Pub/Sub API). If you need help finding the API, use the search field and the filters.
  4. On the API page, click ENABLE.

Create a Cloud Data Fusion instance.

  1. In the GCP Console, select your ProjectID.
  2. Select Data Fusion from the left menu, then click the CREATE AN INSTANCE button in the middle of the page (1st creation), or click the CREATE INSTANCE button at the top menu (additional creation).

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. Provide the instance name. Select Enterprise.

5af91e46917260ff.png

  1. Click the CREATE button.

Set up instance permissions.

After creating an instance, use the following steps to grant the service account associated with the instance permissions on your project:

  1. Navigate to the instance detail page by clicking the instance name.

76ad691f795e1ab3.png

  1. Copy the service account.

6c91836afb72209d.png

  1. Navigate to the IAM page of your project.
  2. On the IAM permissions page, grant the service account the Cloud Data Fusion API Service Agent role by clicking the Add button. Paste the "service account" in the New members field and select Service Management -> Cloud Data Fusion API Server Agent role.

36f03d11c2a4ce0.png

  1. Click + Add another role (or Edit Cloud Data Fusion API Service Agent) to add a Pub/Sub Subscriber role.

b4bf5500b8cbe5f9.png

  1. Click Save.

Once these steps are done, you can start using Cloud Data Fusion by clicking the View Instance link on the Cloud Data Fusion instances page, or the details page of an instance.

Set up the firewall rule.

  1. Navigate to GCP Console -> VPC Network -> Firewall rules to check if the default-allow-ssh rule exists or not.

102adef44bbe3a45.png

  1. If not, add a firewall rule that allows all ingress SSH traffic to the default network.

Using the command line:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Using UI: Click Create Firewall Rule and fill out the information:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Build nodes for the pipeline

Now that we have the Cloud Data Fusion environment in GCP, let's start building the Data pipelines in Cloud Data Fusion using the following steps:

  1. In the Cloud Data Fusion window, click the View Instance link in the Action column. You will be redirected to another page. Click the provided url to open Cloud Data Fusion instance. Your choice to click "Start Tour" or "No, Thanks" button at the Welcome popup.
  2. Expand the "hamburger" menu, select Pipeline -> List

317820def934a00a.png

  1. Click the green + button at the upper right corner, then select Create Pipeline. Or click "Create" a pipeline link.

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. Once the pipeline studio appears, in the upper left, select Data Pipeline - Realtime from the dropdown.

372a889a81da5e66.png

  1. In the Data Pipelines UI, you will see different sections on the left panel as Filter, Source, Transform, Analytics, Sink, Error Handlers and Alerts where you can select a node or nodes for the pipeline.

c63de071d4580f2f.png

Select a Source node.

  1. Under the Source section in the Plugin palette on the left, double-click on the Google Cloud PubSub node, which appears in the Data Pipelines UI.
  2. Point to the PubSub source node and click Properties.

ed857a5134148d7b.png

  1. Fill in the required fields. Set the following fields:
  • Label = {any text}
  • Reference name = {any text}
  • Project ID = auto detect
  • Subscription = Subscription created in Create Pub/Sub Topic section (for example, your-sub)
  • Topic = Topic created in Create Pub/Sub Topic section (for example, your-topic)
  1. Click Documentation for a detailed explanation. Click the Validate button to validate all input information. Green "No errors found" indicates success.

5c2774338b66bebe.png

  1. To close the Pub/Sub Properties, click the X button.

Select the Transform node.

  1. Under the Transform section in the Plugin palette on the left, double-click the Projection node, which appears in the Data Pipelines UI. Connect Pub/Sub source node to Projection transform node.
  2. Point to the Projection node and click Properties.

b3a9a3878879bfd7.png

  1. Fill in the required fields. Set the following fields:
  • Convert = convert message from byte type to string type.
  • Fields to drop = {any field}
  • Fields to keep = {message, timestamp, and attributes} (for example, attributes: key=‘filename':value=‘patients' sent from Pub/Sub)
  • Fields to rename = {message, timestamp}
  1. Click Documentation for a detailed explanation. Click the Validate button to validate all input information. Green "No errors found" indicates success.

b8c2f8efe18234ff.png

  1. Under the Transform section in the Plugin palette on the left, double click on Wrangler node, which appears in the Data Pipelines UI. Connect Projection transform node to Wrangler transform node. Point to the Wrangler node and click Properties.

aa44a4db5fe6623a.png

  1. Click Actions drop down and select Import to import a saved schema (for example: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json).
  2. Add the TIMESTAMP field in Output Schema (if it doesn't exist) by clicking the + button next to the last field and check ‘Null' box.
  3. Fill in the required fields. Set the following fields:
  • Label = {any text}
  • Input field name = {*}
  • Precondition = {attributes.get("filename") != "patients"} to distinguish each type of record or message (for example, patients, providers, allergies, etc.) sent from the PubSub source node.
  1. Click Documentation for a detailed explanation. Click the Validate button to validate all input information. Green "No errors found" indicates success.

3b8e552cd2e3442c.png

  1. Set the column names in a preferred order, and drop the fields you don't need. Copy the following code snippet and paste in the Recipe box.
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. Refer to Batch-Codelab - CSV to BigQuery via CDF for Data masking and de-identification. Or add this code snippet mask-number SSN xxxxxxx#### in the Recipe box
  2. To close the Transform Properties window, click the X button.

Select the Sink node.

  1. Under the Sink section in the Plugin palette on the left, double click on the BigQuery node, which appears in the Data Pipeline UI. Connect Wrangler transform node to BigQuery sink node.
  2. Point to the BigQuery sink node and click Properties.

1be711152c92c692.png

  1. Fill in the required fields:
  • Label = {any text}
  • Reference name = {any text}
  • Project ID = auto detect
  • Dataset = BigQuery dataset used in current project (for example, DATASET_ID)
  • Table = {table name}
  1. Click Documentation for a detailed explanation. Click the Validate button to validate all input information. Green "No errors found" indicates success.

bba71de9f31e842a.png

  1. To close the BigQuery Properties, click the X button.

5. Build Real time data pipeline

In the previous section we created nodes that are required for building a data pipeline in Cloud Data Fusion. In this section we connect the nodes to build the actual pipeline.

Connecting all nodes in a pipeline

  1. Drag a connection arrow > on the right edge of the source node and drop on the left edge of destination node.
  2. A pipeline can have multiple branches that get published messages from the same PubSub Source node.

b22908cc35364cdd.png

  1. Name the pipeline.

That's it. You've just created your first real time data pipeline to be deployed and run.

Send messages through Cloud Pub/Sub

Using the Pub/Sub UI:

  1. Navigate to GCP console -> Pub/Sub -> Topics, select your-topic, then click PUBLISH MESSAGE at the top menu.

d65b2a6af1668ecd.png

  1. Place only one record row at a time in the Message field. Click +ADD AN ATTRIBUTE button. Provide Key = filename, Value = <type of record> (for example, patients, providers, allergies, etc.).
  2. Click the Publish button to send the message.

Using the gcloud command:

  1. Manually provide the message.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. Semi-automatically provide the message using cat and sed unix commands. This command can be run repeatedly with different parameters.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. Configure, Deploy, and Run Pipeline

Now that we have developed the data pipeline, we can deploy and run it in Cloud Data Fusion.

1bb5b0b8e2953ffa.png

  1. Keep the Configure defaults.
  2. Click Preview to preview the data**.** Click **Preview** again to toggle back to the previous window. You can also run the pipeline in Preview mode by clicking **RUN**.

b3c891e5e1aa20ae.png

  1. Click Logs to view logs.
  2. Click Save to save all changes.
  3. Click Import to import the saved pipeline configuration when building a new pipeline.
  4. Click Export to export a pipeline configuration.
  5. Click Deploy to deploy the pipeline.
  6. Once deployed, click Run and wait for the pipeline to run to completion.

f01ba6b746ba53a.png

  1. Click Stop to stop the pipeline run at anytime.
  2. You can duplicate the pipeline by selecting Duplicate under the Actions button.
  3. You can export the pipeline configuration by selecting Export under the Actions button.

28ea4fc79445fad2.png

  1. Click Summary to show charts of Run history, records, error logs and warnings.

7. Validation

In this section we validate the execution of the data pipeline.

  1. Validate that the pipeline was executed successfully and running continuously.

1644dfac4a2d819d.png

  1. Validate that the BigQuery tables are loaded with updated records based on the TIMESTAMP. In this example, two patient records or messages and one allergy record or message were published to the Pub/Sub topic on 2019-06-25.
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. Validate that the messages published to <your-topic> were received by <your-sub> subscriber.
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

Viewing the results

To view the results after the messages are published to the Pub/Sub topic while the Realtime pipeline is running:

  1. Query the table in the BigQuery UI. GO TO THE BIGQUERY UI
  2. Update the query below to your own project name, dataset, and table.

6a1fb85bd868abc9.png

8. Cleaning up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:

After you've finished the tutorial, you can clean up the resources that you created on GCP so they won't take up quota and you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.

Deleting the BigQuery dataset

Follow these instructions to delete the BigQuery dataset you created as part of this tutorial.

Deleting the GCS Bucket

Follow these instructions to delete the GCS bucket you created as part of this tutorial.

Deleting the Cloud Data Fusion instance

Follow these instructions to delete your Cloud Data Fusion instance.

Deleting the project

The easiest way to eliminate billing is to delete the project that you created for the tutorial.

To delete the project:

  1. In the GCP Console, go to the Projects page. GO TO THE PROJECTS PAGE
  2. In the project list, select the project you want to delete and click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

9. Congratulations

Congratulations, you've successfully completed the code lab to ingest healthcare data in BigQuery using Cloud Data Fusion.

You published CSV data to Pub/Sub topic then loaded into BigQuery.

You visually built a data-integration pipeline for loading, transforming and masking healthcare data in real time.

You now know the key steps required to start your Healthcare Data Analytics journey with BigQuery on Google Cloud Platform.