Transfiere datos CSV (valores separados por comas) a BigQuery con Cloud Data Fusion: transferencia en tiempo real

1. Introducción

509db33558ae025.png

Última actualización: 28/02/2020

En este codelab, se muestra un patrón de transferencia de datos para transferir datos de atención médica con formato CSV a BigQuery en tiempo real. En este lab, usaremos la canalización de datos en tiempo real de Cloud Data Fusion. Se generaron datos de prueba realistas del sector de la salud y se pusieron a tu disposición en el bucket de Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).

En este codelab, aprenderás lo siguiente:

  • Cómo transferir datos CSV (carga en tiempo real) de Pub/Sub a BigQuery con Cloud Data Fusion
  • Cómo compilar visualmente una canalización de integración de datos en Cloud Data Fusion para cargar, transformar y enmascarar datos de atención médica en tiempo real.

¿Qué necesitas para ejecutar esta demostración?

  • Necesitas acceso a un proyecto de GCP.
  • Se te debe asignar el rol de propietario del proyecto de GCP.
  • Son datos de atención médica en formato CSV, incluido el encabezado.

Si no tienes un proyecto de GCP, sigue estos pasos para crear uno nuevo.

Los datos de atención médica en formato CSV se cargaron previamente en el bucket de GCS en gs://hcls_testing_data_fhir_10_patients/csv/. Cada archivo de recursos CSV tiene una estructura de esquema única. Por ejemplo, Patients.csv tiene un esquema diferente al de Providers.csv. Los archivos de esquema precargados se pueden encontrar en gs://hcls_testing_data_fhir_10_patients/csv_schemas.

Si necesitas un conjunto de datos nuevo, siempre puedes generarlo con SyntheaTM. Luego, súbelo a GCS en lugar de copiarlo del bucket en el paso Copia los datos de entrada.

2. Configuración del proyecto de GCP

Inicializa las variables de shell para tu entorno.

Para encontrar el PROJECT_ID, consulta Cómo identificar proyectos.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Crea un bucket de GCS para almacenar los datos de entrada y los registros de errores con la herramienta gsutil.

gsutil mb -l us gs://$BUCKET_NAME

Obtén acceso al conjunto de datos sintéticos.

  1. Desde la dirección de correo electrónico que usas para acceder a Cloud Console, envía un correo electrónico a hcls-solutions-external+subscribe@google.com para solicitar unirte.
  2. Recibirás un correo electrónico con instrucciones para confirmar la acción.
  3. Usa la opción para responder el correo electrónico y unirte al grupo. NO hagas clic en el botón 525a0fa752e0acae.png.
  4. Una vez que recibas el correo electrónico de confirmación, puedes continuar con el siguiente paso del codelab.

Copia los datos de entrada.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Crea un conjunto de datos de BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

Instala e inicializa el SDK de Google Cloud y crea temas y suscripciones de Pub/Sub.

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Configuración del entorno de Cloud Data Fusion

Sigue estos pasos para habilitar la API de Cloud Data Fusion y otorgar los permisos necesarios:

Habilita las API.

  1. Ve a la Biblioteca de API de GCP Console.
  2. Selecciona el proyecto desde la lista de proyectos.
  3. En la biblioteca de APIs, selecciona la API que quieres habilitar ( API de Cloud Data Fusion,API de Cloud Pub/Sub). Si necesitas ayuda para encontrar la API, usa el campo de búsqueda y los filtros.
  4. En la página de API, haz clic en HABILITAR.

Crea una instancia de Cloud Data Fusion.

  1. En GCP Console, selecciona tu ProjectID.
  2. Selecciona Data Fusion en el menú de la izquierda y, luego, haz clic en el botón CREAR UNA INSTANCIA en el centro de la página (primera creación) o en el botón CREAR INSTANCIA en el menú superior (creación adicional).

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. Proporciona el nombre de la instancia. Selecciona Enterprise.

5af91e46917260ff.png

  1. Haz clic en el botón CREAR.

Configura los permisos de la instancia.

Después de crear una instancia, sigue estos pasos para otorgarle a la cuenta de servicio asociada con la instancia permisos en tu proyecto:

  1. Haga clic en el nombre de la instancia para navegar a su página de detalles.

76ad691f795e1ab3.png

  1. Copia la cuenta de servicio.

6c91836afb72209d.png

  1. Navega a la página de IAM de tu proyecto.
  2. En la página de permisos de IAM, otorga a la cuenta de servicio el rol de Agente de servicio de la API de Cloud Data Fusion haciendo clic en el botón Agregar. Pega la "cuenta de servicio" en el campo Miembros nuevos y selecciona el rol Service Management -> Cloud Data Fusion API Server Agent.

36f03d11c2a4ce0.png

  1. Haz clic en + Agregar otro rol (o en Editar agente de servicio de la API de Cloud Data Fusion) para agregar un rol de suscriptor de Pub/Sub.

b4bf5500b8cbe5f9.png

  1. Haz clic en Guardar.

Una vez que completes estos pasos, puedes comenzar a usar Cloud Data Fusion. Para ello, haz clic en el vínculo Ver instancia en la página de instancias de Cloud Data Fusion o en la página de detalles de una instancia.

Configura la regla de firewall.

  1. Navega a GCP Console -> Red de VPC -> Reglas de firewall para verificar si existe la regla default-allow-ssh.

102adef44bbe3a45.png

  1. Si no es así, agrega una regla de firewall que permita todo el tráfico SSH de entrada a la red predeterminada.

Usa la línea de comandos:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Con la IU: Haz clic en Crear regla de firewall y completa la información:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Compila nodos para la canalización

Ahora que tenemos el entorno de Cloud Data Fusion en GCP, comencemos a crear las canalizaciones de datos en Cloud Data Fusion con los siguientes pasos:

  1. En la ventana de Cloud Data Fusion, haz clic en el vínculo Ver instancia en la columna Acción. Se te redireccionará a otra página. Haz clic en la URL proporcionada para abrir la instancia de Cloud Data Fusion. Tu elección de hacer clic en el botón "Iniciar recorrido" o "No, gracias" en la ventana emergente de bienvenida
  2. Expande el menú "hamburguesa" y selecciona Canalización -> Lista.

317820def934a00a.png

  1. Haz clic en el botón verde + en la esquina superior derecha y, luego, selecciona Crear canalización. También puedes hacer clic en "Crear" un vínculo de canalización.

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. Una vez que aparezca Pipeline Studio, en la esquina superior izquierda, selecciona Data Pipeline - Realtime en el menú desplegable.

372a889a81da5e66.png

  1. En la IU de Data Pipelines, verás diferentes secciones en el panel izquierdo, como Filter, Source, Transform, Analytics, Sink, Error Handlers y Alerts, en las que puedes seleccionar uno o varios nodos para la canalización.

c63de071d4580f2f.png

Selecciona un nodo de fuente.

  1. En la sección Source de la paleta Plugin que se encuentra a la izquierda, haz doble clic en el nodo Google Cloud Pub/Sub, que aparece en la IU de Data Pipelines.
  2. Selecciona el nodo fuente de Pub/Sub y haz clic en Propiedades.

ed857a5134148d7b.png

  1. Llena los campos obligatorios. Configura los siguientes campos:
  • Label = {cualquier texto}
  • Nombre de referencia = {cualquier texto}
  • ID del proyecto = detección automática
  • Subscription = Suscripción creada en la sección Crear un tema de Pub/Sub (por ejemplo, your-sub)
  • Tema = Tema creado en la sección Crear un tema de Pub/Sub (por ejemplo, tu-tema)
  1. Haz clic en Documentación para obtener una explicación detallada. Haz clic en el botón Validar para validar toda la información de entrada. El mensaje verde "No se encontraron errores" indica que la operación se realizó correctamente.

5c2774338b66bebe.png

  1. Para cerrar las propiedades de Pub/Sub, haz clic en el botón X.

Selecciona el nodo Transform.

  1. En la sección Transformar de la paleta de complementos a la izquierda, haz doble clic en el nodo Projection, que aparece en la IU de Data Pipelines. Conecta el nodo de origen de Pub/Sub al nodo de transformación Projection.
  2. Selecciona el nodo Projection y haz clic en Properties.

b3a9a3878879bfd7.png

  1. Llena los campos obligatorios. Configura los siguientes campos:
  • Convert = convierte message de tipo byte a tipo string.
  • Campos para descartar = {cualquier campo}
  • Campos que se conservarán = {message, timestamp y attributes} (por ejemplo, attributes: key=‘filename’:value=‘patients’ enviados desde Pub/Sub)
  • Campos para cambiar de nombre = {message, timestamp}
  1. Haz clic en Documentación para obtener una explicación detallada. Haz clic en el botón Validar para validar toda la información de entrada. El mensaje verde "No se encontraron errores" indica que la operación se realizó correctamente.

b8c2f8efe18234ff.png

  1. En la sección Transform de la paleta de complementos que se encuentra a la izquierda, haz doble clic en el nodo Wrangler, que aparece en la IU de Data Pipelines. Conecta el nodo de transformación Projection al nodo de transformación Wrangler. Selecciona el nodo Wrangler y haz clic en Propiedades.

aa44a4db5fe6623a.png

  1. Haz clic en el menú desplegable Acciones y selecciona Importar para importar un esquema guardado (por ejemplo, gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json).
  2. Para agregar el campo TIMESTAMP en el esquema de salida (si no existe), haz clic en el botón + junto al último campo y marca la casilla "Nulo".
  3. Llena los campos obligatorios. Configura los siguientes campos:
  • Label = {cualquier texto}
  • Nombre del campo de entrada = {*}
  • Precondition = {attributes.get("filename") != "patients"} para distinguir cada tipo de registro o mensaje (por ejemplo, pacientes, proveedores, alergias, etcétera) que se envía desde el nodo fuente de Pub/Sub.
  1. Haz clic en Documentación para obtener una explicación detallada. Haz clic en el botón Validar para validar toda la información de entrada. El mensaje verde "No se encontraron errores" indica que la operación se realizó correctamente.

3b8e552cd2e3442c.png

  1. Establece los nombres de las columnas en el orden que prefieras y descarta los campos que no necesites. Copia el siguiente fragmento de código y pégalo en el cuadro Receta.
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. Consulta Batch-Codelab - CSV to BigQuery via CDF para obtener información sobre el enmascaramiento y la desidentificación de datos. O agrega este fragmento de código mask-number SSN xxxxxxx#### en el cuadro Receta.
  2. Para cerrar la ventana Transform Properties, haz clic en el botón X.

Selecciona el nodo Sink.

  1. En la sección Sink de la paleta de complementos que se encuentra a la izquierda, haz doble clic en el nodo BigQuery, que aparece en la IU de Data Pipeline. Conecta el nodo de transformación de Wrangler al nodo de receptor de BigQuery.
  2. Selecciona el nodo receptor de BigQuery y haz clic en Propiedades.

1be711152c92c692.png

  1. Completa los campos obligatorios:
  • Label = {cualquier texto}
  • Nombre de referencia = {cualquier texto}
  • ID del proyecto = detección automática
  • Dataset = Conjunto de datos de BigQuery que se usa en el proyecto actual (por ejemplo, DATASET_ID)
  • Table = {nombre de la tabla}
  1. Haz clic en Documentación para obtener una explicación detallada. Haz clic en el botón Validar para validar toda la información de entrada. El mensaje verde "No se encontraron errores" indica que la operación se realizó correctamente.

bba71de9f31e842a.png

  1. Para cerrar las propiedades de BigQuery, haz clic en el botón X.

5. Crea una canalización de datos en tiempo real

En la sección anterior, creamos los nodos necesarios para compilar una canalización de datos en Cloud Data Fusion. En esta sección, conectaremos los nodos para compilar la canalización real.

Cómo conectar todos los nodos de una canalización

  1. Arrastra una flecha de conexión > del borde derecho del nodo fuente y suéltala en el borde izquierdo del nodo de destino.
  2. Una canalización puede tener varias ramas que reciben mensajes publicados desde el mismo nodo de fuente de Pub/Sub.

b22908cc35364cdd.png

  1. Asigna un nombre a la canalización.

Eso es todo. Acabas de crear tu primera canalización de datos en tiempo real para implementarla y ejecutarla.

Envía mensajes a través de Cloud Pub/Sub

Con la IU de Pub/Sub, haz lo siguiente:

  1. Navega a la consola de GCP -> Pub/Sub -> Temas, selecciona tu-tema y, luego, haz clic en PUBLICAR MENSAJE en el menú superior.

d65b2a6af1668ecd.png

  1. Coloca solo una fila de registro a la vez en el campo Mensaje. Haz clic en el botón + AGREGAR UN ATRIBUTO. Proporciona la clave = nombre de archivo y el valor = <tipo de registro> (por ejemplo, pacientes, proveedores, alergias, etc.).
  2. Haz clic en el botón Publicar para enviar el mensaje.

Con el comando de gcloud, ejecuta lo siguiente:

  1. Proporciona el mensaje de forma manual.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. Proporciona el mensaje de forma semiautomática con los comandos de Unix cat y sed. Este comando se puede ejecutar varias veces con diferentes parámetros.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. Configura, implementa y ejecuta la canalización

Ahora que desarrollamos la canalización de datos, podemos implementarla y ejecutarla en Cloud Data Fusion.

1bb5b0b8e2953ffa.png

  1. Mantén los valores predeterminados de Configurar.
  2. Haz clic en Vista previa para obtener una vista previa de los datos**.** Haz clic en **Vista previa** de nuevo para volver a la ventana anterior. También puedes hacer clic en **EJECUTAR** para ejecutar la canalización en el modo de vista previa.

b3c891e5e1aa20ae.png

  1. Haz clic en Registros para ver los registros.
  2. Haz clic en Guardar para guardar todos los cambios.
  3. Haz clic en Importar para importar la configuración de canalización guardada cuando crees una canalización nueva.
  4. Haz clic en Exportar para exportar la configuración de una canalización.
  5. Haz clic en Implementar para implementar la canalización.
  6. Una vez implementada, haga clic en Ejecutar y espere que la canalización se ejecute hasta el final.

f01ba6b746ba53a.png

  1. Haz clic en Detener para detener la ejecución de la canalización en cualquier momento.
  2. Para duplicar la canalización, selecciona Duplicar en el botón Acciones.
  3. Para exportar la configuración de la canalización, selecciona Exportar en el botón Acciones.

28ea4fc79445fad2.png

  1. Haz clic en Resumen para mostrar gráficos del historial de ejecución, los registros, los registros de errores y las advertencias.

7. Validación

En esta sección, validaremos la ejecución de la canalización de datos.

  1. Valida que la canalización se haya ejecutado correctamente y que se esté ejecutando de forma continua.

1644dfac4a2d819d.png

  1. Valida que las tablas de BigQuery se carguen con registros actualizados según la marca de tiempo. En este ejemplo, se publicaron dos registros o mensajes de pacientes y un registro o mensaje de alergias en el tema de Pub/Sub el 25/06/2019.
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. Valida que el suscriptor <your-sub> haya recibido los mensajes publicados en <your-topic>.
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

Cómo ver los resultados

Para ver los resultados después de que se publiquen los mensajes en el tema de Pub/Sub mientras se ejecuta la canalización en tiempo real, haz lo siguiente:

  1. Consulta la tabla en la IU de BigQuery. IR A LA IU DE BIGQUERY
  2. Actualiza la siguiente consulta con el nombre de tu proyecto, conjunto de datos y tabla.

6a1fb85bd868abc9.png

8. Realiza una limpieza

Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de Google Cloud Platform para los recursos que se usaron en este instructivo:

Una vez que completes el instructivo, puedes limpiar los recursos que creaste en GCP para que no consuman tu cuota y no se te facture por ellos en el futuro. En las secciones siguientes, se describe cómo borrar o desactivar estos recursos.

Borra el conjunto de datos de BigQuery

Sigue estas instrucciones para borrar el conjunto de datos de BigQuery que creaste como parte de este instructivo.

Cómo borrar el bucket de GCS

Sigue estas instrucciones para borrar el bucket de GCS que creaste como parte de este instructivo.

Borra la instancia de Cloud Data Fusion

Sigue estas instrucciones para borrar tu instancia de Cloud Data Fusion.

Borra el proyecto

La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo.

Para borrar el proyecto, haz lo siguiente:

  1. En GCP Console, ve a la página Proyectos. IR A LA PÁGINA PROYECTOS
  2. En la lista de proyectos, elige el proyecto que deseas borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrarlo.

9. Felicitaciones

¡Felicitaciones! Completaste correctamente el codelab para transferir datos de atención médica a BigQuery con Cloud Data Fusion.

Publicaste datos CSV en un tema de Pub/Sub y, luego, los cargaste en BigQuery.

Creaste visualmente una canalización de integración de datos para cargar, transformar y enmascarar datos de atención médica en tiempo real.

Ahora conoces los pasos clave necesarios para comenzar tu recorrido de análisis de datos de atención médica con BigQuery en Google Cloud.