Transfiere datos CSV (valores separados por comas) a BigQuery con Cloud Data Fusion: transferencia en tiempo real

1. Introducción

509db33558ae025.png

Última actualización: 28 de febrero de 2020

En este codelab, se muestra un patrón de transferencia de datos para transferir datos de atención médica con formato CSV a BigQuery en tiempo real. Para este lab, usaremos la canalización de datos en tiempo real de Cloud Data Fusion. Se generaron datos realistas de pruebas de salud y se pusieron a disposición en el bucket de Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).

En este codelab, aprenderás lo siguiente:

  • Cómo transferir datos de CSV (carga en tiempo real) de Pub/Sub a BigQuery con Cloud Data Fusion.
  • Cómo crear de forma visual una canalización de integración de datos en Cloud Data Fusion para cargar, transformar y enmascarar datos de atención médica en tiempo real

¿Qué necesitas para ejecutar esta demostración?

  • Necesitas acceso a un proyecto de GCP.
  • Debes tener asignada una función de propietario para el proyecto de GCP.
  • Datos de atención médica en formato CSV, incluido el encabezado.

Si no tienes un proyecto de GCP, sigue estos pasos para crear uno nuevo.

Los datos de atención médica en formato CSV se cargaron previamente en el bucket de GCS, en gs://hcls_testing_data_fhir_10_patients/csv/. Cada archivo de recursos CSV tiene una estructura de esquema única. Por ejemplo, Patients.csv tiene un esquema diferente al de Providers.csv. Los archivos de esquema precargados se pueden encontrar en gs://hcls_testing_data_fhir_10_patients/csv_schemas.

Si necesitas un conjunto de datos nuevo, siempre puedes generarlo con SyntheaTM. Luego, súbelos a GCS en lugar de copiarlos del bucket en el paso Copiar datos de entrada.

2. Configuración de un proyecto de GCP

Inicializar variables de shell para tu entorno

Para encontrar el PROJECT_ID, consulta Identifica proyectos.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Crea un bucket de GCS para almacenar datos de entrada y registros de errores con la herramienta gsutil.

gsutil mb -l us gs://$BUCKET_NAME

Obtén acceso al conjunto de datos sintético.

  1. Desde la dirección de correo electrónico que usas para acceder a la consola de Cloud, envía un correo electrónico a hcls-solutions-external+subscribe@google.com para solicitar unirte.
  2. Recibirás un correo electrónico con instrucciones para confirmar la acción.
  3. Usa la opción de responder el correo electrónico para unirte al grupo. NO hagas clic en el botón 525a0fa752e0acae.png.
  4. Una vez que recibas el correo electrónico de confirmación, puedes continuar con el siguiente paso del codelab.

Copia los datos de entrada.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Crear un conjunto de datos de BigQuery

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

Instala e inicializa el SDK de Google Cloud y crea el tema y las suscripciones de Pub o Sub.

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Configuración del entorno de Cloud Data Fusion

Sigue estos pasos para habilitar la API de Cloud Data Fusion y otorgar los permisos necesarios:

Habilita las API.

  1. Ve a la Biblioteca de API de GCP Console.
  2. Selecciona el proyecto desde la lista de proyectos.
  3. En la Biblioteca de APIs, selecciona la API que deseas habilitar ( API de Cloud Data Fusion,API de Cloud Pub/Sub). Si necesitas ayuda para encontrar la API, usa el campo de búsqueda y los filtros.
  4. En la página de API, haz clic en HABILITAR.

Crea una instancia de Cloud Data Fusion.

  1. En GCP Console, selecciona el ID del proyecto.
  2. Selecciona Data Fusion en el menú de la izquierda y, luego, haz clic en el botón CREATE AN INSTANCE en el medio de la página (primera creación) o haz clic en el botón CREATE INSTANCE en el menú superior (creación adicional).

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. Proporciona el nombre de la instancia. Selecciona Enterprise.

5af91e46917260ff.png

  1. Haz clic en el botón CREAR.

Configura permisos de instancia.

Después de crear una instancia, sigue estos pasos para otorgar a la cuenta de servicio asociada con la instancia los permisos para tu proyecto:

  1. Haga clic en el nombre de la instancia para navegar a su página de detalles.

76ad691f795e1ab3.png

  1. Copia la cuenta de servicio.

6c91836afb72209d.png

  1. Navega a la página IAM de tu proyecto.
  2. En la página de permisos de IAM, haz clic en el botón Agregar para otorgar a la cuenta de servicio el rol Agente de servicio de la API de Cloud Data Fusion. Pega la "cuenta de servicio" en el campo Miembros nuevos y selecciona Administración de servicio -> Rol de agente del servidor de la API de Cloud Data Fusion.

36f03d11c2a4ce0.png

  1. Haz clic en + Agregar otro rol (o Editar agente de servicio de la API de Cloud Data Fusion) para agregar un rol de suscriptor de Pub/Sub.

b4bf5500b8cbe5f9.png

  1. Haz clic en Guardar.

Una vez que hayas completado estos pasos, puedes comenzar a usar Cloud Data Fusion haciendo clic en el vínculo Ver instancia en la página de instancias de Cloud Data Fusion o en la página de detalles de una instancia.

Configura la regla de firewall.

  1. Navega a la consola de GCP -> Red de VPC -> Reglas de firewall para verificar si existe o no la regla default-allow-ssh.

102adef44bbe3a45.png

  1. De lo contrario, agrega una regla de firewall que permita todo el tráfico SSH de entrada a la red predeterminada.

Con la línea de comandos:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Mediante la IU: Haz clic en Crear regla de firewall y completa la información:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Compila nodos para la canalización

Ahora que tenemos el entorno de Cloud Data Fusion en GCP, siga estos pasos para comenzar a compilar las canalizaciones de datos en Cloud Data Fusion:

  1. En la ventana de Cloud Data Fusion, haz clic en el vínculo Ver instancia en la columna Acción. Se te redireccionará a otra página. Haz clic en la url proporcionada para abrir la instancia de Cloud Data Fusion. Hacer clic en "Iniciar visita guiada" o "No, gracias" de la ventana emergente de bienvenida.
  2. Expandir el menú “hamburguesa” en el menú, selecciona Canalización -> Mostrar en lista

317820def934a00a.png

  1. Haz clic en el botón verde + en la esquina superior derecha y, luego, selecciona Crear canalización. O haz clic en "Crear" un vínculo de canalización.

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. Cuando aparezca el estudio de canalización, en la parte superior izquierda, selecciona Data Pipeline - Realtime en el menú desplegable.

372a889a81da5e66.png

  1. En la IU de Data Pipelines, verás diferentes secciones en el panel izquierdo como Filtro, Fuente, Transformación, Estadísticas, Receptor, Controladores de errores y alertas, en las que podrás seleccionar uno o varios nodos para la canalización.

c63de071d4580f2f.png

Selecciona un nodo de origen.

  1. En la sección Fuente en la paleta de complementos a la izquierda, haz doble clic en el nodo de Google Cloud PubSub, que aparece en la IU de Data Pipelines.
  2. Dirígete al nodo fuente de Pub/Sub y haz clic en Propiedades.

ed857a5134148d7b.png

  1. Llena los campos obligatorios. Configura los siguientes campos:
  • Etiqueta = {any text}
  • Nombre de referencia = {any text}
  • ID del proyecto = detección automática
  • Suscripción: Suscripción creada en la sección Crear tema de Pub/Sub (por ejemplo, tu-suscriptor)
  • Tema = Tema creado en la sección Crear tema de Pub/Sub (por ejemplo, tu-tema)
  1. Haz clic en Documentation para obtener una explicación detallada. Haz clic en el botón Validar (Validate) para validar toda la información de entrada. Verde "No se encontraron errores" indica éxito.

5c2774338b66bebe.png

  1. Para cerrar las propiedades de Pub/Sub, haz clic en el botón X.

Selecciona el nodo Transform.

  1. En la sección Transform en la paleta de complementos a la izquierda, haz doble clic en el nodo Projection, que aparece en la IU de Data Pipelines. Conecta el nodo fuente de Pub/Sub al nodo de transformación de proyección.
  2. Selecciona el nodo Projection y haz clic en Properties.

b3a9a3878879bfd7.png

  1. Llena los campos obligatorios. Configura los siguientes campos:
  • Convert = convierte message de tipo de byte a tipo de string.
  • Campos que se van a descartar = {any field}
  • Campos que se conservarán = {message, timestamp y attributes} (por ejemplo, atributos: key=‘filename':value=‘patients’ enviado desde Pub/Sub)
  • Campos para cambiar el nombre = {message, timestamp}
  1. Haz clic en Documentation para obtener una explicación detallada. Haz clic en el botón Validar (Validate) para validar toda la información de entrada. Verde "No se encontraron errores" indica éxito.

b8c2f8efe18234ff.png

  1. En la sección Transformar en la paleta de complementos a la izquierda, haz doble clic en el nodo Wrangler, que aparece en la IU de Data Pipelines. Conecta el nodo de transformación de proyección a un nodo de transformación de Wrangler. Apunta al nodo Wrangler y haz clic en Properties.

aa44a4db5fe6623a.png

  1. Haz clic en el menú desplegable Acciones y selecciona Importar para importar un esquema guardado (por ejemplo: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ esquema (Patients.json).
  2. Agrega el campo TIMESTAMP en el esquema de salida (si no existe). Para ello, haz clic en el botón + junto al último campo y marca “Nulo”. .
  3. Llena los campos obligatorios. Configura los siguientes campos:
  • Etiqueta = {any text}
  • Nombre del campo de entrada = {*}
  • Precondition = {attributes.get("filename") != "patients"} para distinguir cada tipo de registro o mensaje (por ejemplo, pacientes, proveedores, alergias, etc.) que se envía desde el nodo fuente de Pub/Sub.
  1. Haz clic en Documentation para obtener una explicación detallada. Haz clic en el botón Validar (Validate) para validar toda la información de entrada. Verde "No se encontraron errores" indica éxito.

3b8e552cd2e3442c.png

  1. Configura los nombres de las columnas en un orden preferido y descarta los campos que no necesites. Copia el siguiente fragmento de código y pégalo en el cuadro Receta.
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. Consulta Codelab por lotes: CSV a BigQuery a través de CDF para enmascarar y desidentificación de datos. O agrega este fragmento de código mask-number SSN xxxxxxx#### en el cuadro Receta.
  2. Para cerrar la ventana Transform Properties, haz clic en el botón X.

Selecciona el nodo Receptor.

  1. En la sección Receptor en la paleta de complementos a la izquierda, haz doble clic en el nodo de BigQuery, que aparece en la IU de Data Pipeline. Conectar el nodo de transformación de Wrangler al nodo receptor de BigQuery
  2. Coloca el cursor sobre el nodo receptor de BigQuery y haz clic en Propiedades.

1be711152c92c692.png

  1. Completa los campos obligatorios:
  • Etiqueta = {any text}
  • Nombre de referencia = {any text}
  • ID del proyecto = detección automática
  • Conjunto de datos = Conjunto de datos de BigQuery que se usa en el proyecto actual (por ejemplo, DATASET_ID)
  • Tabla = {table name}
  1. Haz clic en Documentation para obtener una explicación detallada. Haz clic en el botón Validar (Validate) para validar toda la información de entrada. Verde "No se encontraron errores" indica éxito.

bba71de9f31e842a.png

  1. Para cerrar las Propiedades de BigQuery, haz clic en el botón X.

5. Compila una canalización de datos en tiempo real

En la sección anterior, creamos nodos necesarios para crear una canalización de datos en Cloud Data Fusion. En esta sección, conectaremos los nodos para compilar la canalización real.

Conecta todos los nodos de una canalización

  1. Arrastra una flecha de conexión > en el borde derecho del nodo de origen y soltarlo en el borde izquierdo del nodo de destino.
  2. Una canalización puede tener varias ramas que obtienen mensajes publicados del mismo nodo fuente de Pub/Sub.

b22908cc35364cdd.png

  1. Asigna un nombre a la canalización.

Eso es todo. Acabas de crear tu primera canalización de datos en tiempo real para que se implemente y ejecute.

Envía mensajes a través de Cloud Pub/Sub.

Con la IU de Pub/Sub:

  1. Navega a la consola de GCP -> Pub/Sub -> Temas, selecciona tu tema y, luego, haz clic en PUBLICAR MENSAJE en el menú de la parte superior.

d65b2a6af1668ecd.png

  1. Coloca solo una fila de registro a la vez en el campo Mensaje. Haz clic en el botón +ADD AN ATTRIBUTE. Proporciona la clave = filename, valor = <type of record> (por ejemplo, pacientes, proveedores, alergias, etc.).
  2. Haz clic en el botón Publicar para enviar el mensaje.

Usa el comando de gcloud:

  1. Proporciona el mensaje de forma manual.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. Proporciona el mensaje de forma semiautomática con los comandos cat y sed de Unix. Este comando se puede ejecutar de manera repetida con diferentes parámetros.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. Configura, implementa y ejecuta la canalización

Ahora que desarrollamos la canalización de datos, podemos implementarla y ejecutarla en Cloud Data Fusion.

1bb5b0b8e2953ffa.png

  1. Mantén los valores predeterminados en Configure.
  2. Haz clic en Preview (Vista previa) para obtener una vista previa de los datos**.** Vuelve a hacer clic en **Preview** para volver a la ventana anterior. También puedes ejecutar la canalización en modo de vista previa haciendo clic en **EJECUTAR**.

b3c891e5e1aa20ae.png

  1. Haz clic en Registros para ver los registros.
  2. Haz clic en Guardar para guardar todos los cambios.
  3. Haz clic en Import para importar la configuración de canalización guardada cuando compiles una canalización nueva.
  4. Haz clic en Exportar para exportar una configuración de canalización.
  5. Haz clic en Implementar para implementar la canalización.
  6. Una vez implementada, haga clic en Ejecutar y espere que la canalización se ejecute hasta el final.

f01ba6b746ba53a.png

  1. Haz clic en Detener para detener la ejecución de la canalización en cualquier momento.
  2. Si deseas duplicar la canalización, selecciona Duplicar en el botón Acciones.
  3. Para exportar la configuración de la canalización, selecciona Exportar en el botón Acciones.

8ea4fc79445fad2.png

  1. Haz clic en Resumen para ver gráficos del historial de ejecuciones, los registros, los registros de errores y las advertencias.

7. Validación

En esta sección, validaremos la ejecución de la canalización de datos.

  1. Validar que la canalización se haya ejecutado correctamente y de forma continua

1644dfac4a2d819d.png

  1. Validar que las tablas de BigQuery se carguen con registros actualizados basados en TIMESTAMP En este ejemplo, se publicaron dos mensajes o registros de pacientes y un mensaje o registro de alergias en el tema de Pub/Sub el 25-06-2019.
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. Valida que los mensajes publicados en <your-topic> fueron recibidos por <your-sub> suscriptor.
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

Cómo ver los resultados

Para ver los resultados después de publicar los mensajes en el tema de Pub/Sub mientras se ejecuta la canalización Realtime, haz lo siguiente:

  1. Consultar la tabla en la IU de BigQuery IR A LA IU DE BIGQUERY
  2. Actualiza la siguiente consulta con el nombre, el conjunto de datos y la tabla de tu proyecto.

6a1fb85bd868abc9.png

8. Realice una limpieza

Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de Google Cloud Platform para los recursos que se usaron en este instructivo:

Una vez que hayas terminado el instructivo, puedes limpiar los recursos que creaste en GCP para que no consuman tu cuota y no se te facture por ellos en el futuro. En las secciones siguientes, se describe cómo borrar o desactivar estos recursos.

Borra el conjunto de datos de BigQuery

Sigue estas instrucciones para borrar el conjunto de datos de BigQuery que creaste como parte de este instructivo.

Borra el bucket de GCS

Sigue estas instrucciones para borrar el bucket de GCS que creaste como parte de este instructivo.

Borra la instancia de Cloud Data Fusion

Sigue estas instrucciones para borrar tu instancia de Cloud Data Fusion.

Borra el proyecto

La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo.

Para borrar el proyecto, haz lo siguiente:

  1. En GCP Console, ve a la página Proyectos. IR A LA PÁGINA PROYECTOS
  2. En la lista de proyectos, selecciona el proyecto que deseas borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrarlo.

9. Felicitaciones

Felicitaciones, completaste correctamente el codelab para transferir datos de atención médica a BigQuery con Cloud Data Fusion.

Publicaste datos de CSV en el tema de Pub/Sub y, luego, los cargaste en BigQuery.

Compilaste visualmente una canalización de integración de datos para cargar, transformar y enmascarar datos de atención médica en tiempo real.

Ahora conoces los pasos clave necesarios para comenzar tu recorrido de análisis de datos de atención médica con BigQuery en Google Cloud.