1. Introducción

Última actualización: 28/02/2020
En este codelab, se demuestra un patrón de transferencia de datos para transferir de forma masiva datos de atención médica con formato CSV a BigQuery. En este lab, usaremos la canalización de datos por lotes de Cloud Data Fusion. Se generaron datos de prueba de atención médica realistas y se pusieron a tu disposición en el bucket de Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).
En este codelab, aprenderás lo siguiente:
- Cómo transferir datos CSV (carga programada por lotes) de GCS a BigQuery con Cloud Data Fusion
- Cómo crear visualmente una canalización de integración de datos en Cloud Data Fusion para cargar, transformar y enmascarar datos de atención médica de forma masiva
¿Qué necesitas para ejecutar este codelab?
- Necesitas acceso a un proyecto de GCP.
- Se te debe asignar el rol de propietario del proyecto de GCP.
- Son datos de atención médica en formato CSV, incluido el encabezado.
Si no tienes un proyecto de GCP, sigue estos pasos para crear uno nuevo.
Los datos de atención médica en formato CSV se cargaron previamente en el bucket de GCS en gs://hcls_testing_data_fhir_10_patients/csv/. Cada archivo CSV de recursos tiene su propia estructura de esquema única. Por ejemplo, Patients.csv tiene un esquema diferente al de Providers.csv. Los archivos de esquema precargados se pueden encontrar en gs://hcls_testing_data_fhir_10_patients/csv_schemas.
Si necesitas un conjunto de datos nuevo, siempre puedes generarlo con SyntheaTM. Luego, súbelo a GCS en lugar de copiarlo del bucket en el paso Copia los datos de entrada.
2. Configuración del proyecto de GCP
Inicializa las variables de shell para tu entorno.
Para encontrar el PROJECT_ID, consulta Cómo identificar proyectos.
<!-- CODELAB: Initialize shell variables -> <!-- Your current GCP Project ID -> export PROJECT_ID=<PROJECT_ID> <!-- A new GCS Bucket in your current Project - INPUT -> export BUCKET_NAME=<BUCKET_NAME> <!-- A new BQ Dataset ID - OUTPUT -> export DATASET_ID=<DATASET_ID>
Crea un bucket de GCS para almacenar los datos de entrada y los registros de errores con la herramienta gsutil.
gsutil mb -l us gs://$BUCKET_NAME
Obtén acceso al conjunto de datos sintéticos.
- Desde la dirección de correo electrónico que usas para acceder a Cloud Console, envía un correo electrónico a hcls-solutions-external+subscribe@google.com para solicitar unirte.
- Recibirás un correo electrónico con instrucciones para confirmar la acción.

- Usa la opción para responder el correo electrónico y unirte al grupo. NO hagas clic en el botón.
- Una vez que recibas el correo electrónico de confirmación, puedes continuar con el siguiente paso del codelab.
Copia los datos de entrada.
gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME
Crea un conjunto de datos de BigQuery.
bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID
3. Configuración del entorno de Cloud Data Fusion
Sigue estos pasos para habilitar la API de Cloud Data Fusion y otorgar los permisos necesarios:
Habilita las APIs.
- Ve a la Biblioteca de API de GCP Console.
- Selecciona el proyecto desde la lista de proyectos.
- En la biblioteca de API, selecciona la API que quieres habilitar. Si necesitas ayuda para encontrar la API, usa el campo de búsqueda o los filtros.
- En la página de la API, haz clic en HABILITAR.
Crea una instancia de Cloud Data Fusion.
- En GCP Console, selecciona tu ProjectID.
- Selecciona Data Fusion en el menú de la izquierda y, luego, haz clic en el botón CREAR UNA INSTANCIA en el centro de la página (primera creación) o en el botón CREAR INSTANCIA en el menú superior (creación adicional).


- Proporciona el nombre de la instancia. Selecciona Enterprise.

- Haz clic en el botón CREAR.
Configura los permisos de la instancia.
Después de crear una instancia, sigue estos pasos para otorgarle a la cuenta de servicio asociada con la instancia permisos en tu proyecto:
- Haga clic en el nombre de la instancia para navegar a su página de detalles.

- Copia la cuenta de servicio.

- Navega a la página de IAM de tu proyecto.
- En la página Permisos de IAM, ahora agregaremos la cuenta de servicio como un miembro nuevo y le otorgaremos el rol Agente de servicio de la API de Cloud Data Fusion. Haz clic en el botón Agregar, pega la "cuenta de servicio" en el campo Miembros nuevos y selecciona el rol Agente de servicio de la API de Cloud Data Fusion -> Administración de servicio.

- Haz clic en Guardar.
Una vez que completes estos pasos, puedes comenzar a usar Cloud Data Fusion. Para ello, haz clic en el vínculo Ver instancia en la página de instancias de Cloud Data Fusion o en la página de detalles de una instancia.
Configura la regla de firewall.
- Navega a GCP Console -> Red de VPC -> Reglas de firewall para verificar si existe la regla default-allow-ssh.

- Si no es así, agrega una regla de firewall que permita todo el tráfico SSH de entrada a la red predeterminada.
Usa la línea de comandos:
gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging
Con la IU: Haz clic en Crear regla de firewall y completa la información:


4. Cómo compilar un esquema para la transformación
Ahora que tenemos el entorno de Cloud Fusion en GCP, vamos a compilar un esquema. Necesitamos este esquema para transformar los datos CSV.
- En la ventana de Cloud Data Fusion, haz clic en el vínculo Ver instancia en la columna Acción. Se te redireccionará a otra página. Haz clic en la URL proporcionada para abrir la instancia de Cloud Data Fusion. Tu elección de hacer clic en el botón "Iniciar recorrido" o "No, gracias" en la ventana emergente de bienvenida
- Expande el menú "hamburguesa" y selecciona Pipeline -> Studio.

- En la sección Transform de la paleta de complementos que se encuentra a la izquierda, haz doble clic en el nodo Wrangler, que aparecerá en la IU de Data Pipelines.

- Selecciona el nodo Wrangler y haz clic en Propiedades. Haz clic en el botón Wrangle y, luego, selecciona un archivo fuente .csv (por ejemplo, patients.csv), que debe tener todos los campos de datos para compilar el esquema deseado.
- Haz clic en la flecha hacia abajo (Transformaciones de columnas) junto a cada nombre de columna (por ejemplo, body).

- De forma predeterminada, la importación inicial supondrá que solo hay una columna en tu archivo de datos. Para analizarlo como un archivo CSV, elige Parse → CSV y, luego, selecciona el delimitador y marca la casilla "Set first row as header" según corresponda. Haz clic en el botón Aplicar.
- Haz clic en la flecha hacia abajo junto al campo Cuerpo y selecciona Borrar columna para quitar el campo Cuerpo. Además, puedes probar otras transformaciones, como quitar columnas, cambiar el tipo de datos de algunas columnas (el valor predeterminado es el tipo "cadena"), dividir columnas, establecer nombres de columnas, etcétera.

- Las pestañas "Columnas" y "Pasos de transformación" muestran el esquema de salida y la receta de Wrangler. Haz clic en Aplicar en la esquina superior derecha. Haz clic en el botón Validar. El mensaje verde "No se encontraron errores" indica que la operación se realizó correctamente.

- En Wrangler Properties, haz clic en el menú desplegable Actions para Export el esquema deseado en tu almacenamiento local para una futura Import si es necesario.
- Guarda la receta de Wrangler para usarla en el futuro.
parse-as-csv :body ',' true drop body
- Para cerrar la ventana Wrangler Properties, haz clic en el botón X.
5. Compila nodos para la canalización
En esta sección, compilaremos los componentes de la canalización.
- En la esquina superior izquierda de la IU de Data Pipelines, deberías ver que Data Pipeline - Batch está seleccionado como el tipo de canalización.

- En el panel izquierdo, hay diferentes secciones, como Filter, Source, Transform, Analytics, Sink, Conditions and Actions, Error Handlers y Alerts, en las que puedes seleccionar uno o varios nodos para la canalización.

Nodo fuente
- Selecciona el nodo Source.
- En la sección Source de la paleta Plugin que se encuentra a la izquierda, haz doble clic en el nodo Google Cloud Storage, que aparece en la IU de Data Pipelines.
- Selecciona el nodo fuente de GCS y haz clic en Propiedades.

- Llena los campos obligatorios. Configura los siguientes campos:
- Label = {cualquier texto}
- Nombre de referencia = {cualquier texto}
- ID del proyecto = detección automática
- Ruta de acceso = URL de GCS al bucket en tu proyecto actual. Por ejemplo, gs://$BUCKET_NAME/csv/
- Formato = texto
- Path Field = filename
- Path Filename Only = true
- Read Files Recursively = true
- Haz clic en el botón + para agregar el campo "filename" al esquema de salida de GCS.
- Haz clic en Documentación para obtener una explicación detallada. Haz clic en el botón Validar. El mensaje verde "No se encontraron errores" indica que la operación se realizó correctamente.
- Para cerrar las propiedades de GCS, haz clic en el botón X.
Nodo de transformación
- Selecciona el nodo Transform.
- En la sección Transform de la paleta de complementos que se encuentra a la izquierda, haz doble clic en el nodo Wrangler, que aparece en la IU de Data Pipelines. Conecta el nodo fuente de GCS al nodo de transformación de Wrangler.
- Selecciona el nodo Wrangler y haz clic en Propiedades.
- Haz clic en el menú desplegable Acciones y selecciona Importar para importar un esquema guardado (por ejemplo, gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json) y pega la receta guardada de la sección anterior.
- También puedes reutilizar el nodo Wrangler de la sección Crea un esquema para la transformación.
- Llena los campos obligatorios. Configura los siguientes campos:
- Label = {cualquier texto}
- Nombre del campo de entrada = {*}
- Precondition = {filename != "patients.csv"} para distinguir cada archivo de entrada (por ejemplo, patients.csv, providers.csv, allergies.csv, etc.) del nodo Source.

- Agrega un nodo de JavaScript para ejecutar el código JavaScript proporcionado por el usuario que transforma aún más los registros. En este codelab, utilizamos el nodo JavaScript para obtener una marca de tiempo para cada actualización de registro. Conecta el nodo de transformación de Wrangler al nodo de transformación de JavaScript. Abre Propiedades de JavaScript y agrega la siguiente función:

function transform(input, emitter, context) {
input.TIMESTAMP = (new Date()).getTime()*1000;
emitter.emit(input);
}
- Haz clic en el signo + para agregar el campo llamado TIMESTAMP al esquema de salida (si no existe). Selecciona la marca de tiempo como el tipo de datos.

- Haz clic en Documentación para obtener una explicación detallada. Haz clic en el botón Validar para validar toda la información de entrada. El mensaje verde "No se encontraron errores" indica que la operación se realizó correctamente.
- Para cerrar la ventana Transform Properties, haz clic en el botón X.
Enmascaramiento y desidentificación de datos
- Puedes seleccionar columnas de datos individuales haciendo clic en la flecha hacia abajo de la columna y aplicando reglas de enmascaramiento en la selección Enmascarar datos según tus requisitos (por ejemplo, la columna de número de seguridad social).

- Puedes agregar más directivas en la ventana Recipe del nodo Wrangler. Por ejemplo, usar la directiva hash con el algoritmo de hash siguiendo esta sintaxis para fines de desidentificación:
hash <column> <algorithm> <encode> <column>: name of the column <algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.) <encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

Nodo receptor
- Selecciona el nodo receptor.
- En la sección Sink de la paleta de complementos que se encuentra a la izquierda, haz doble clic en el nodo BigQuery, que aparecerá en la IU de Data Pipeline.
- Selecciona el nodo receptor de BigQuery y haz clic en Propiedades.

- Completa los campos obligatorios. Configura los siguientes campos:
- Label = {cualquier texto}
- Nombre de referencia = {cualquier texto}
- ID del proyecto = detección automática
- Conjunto de datos = Conjunto de datos de BigQuery que se usa en el proyecto actual (es decir, DATASET_ID)
- Table = {nombre de la tabla}
- Haz clic en Documentación para obtener una explicación detallada. Haz clic en el botón Validar para validar toda la información de entrada. El mensaje verde "No se encontraron errores" indica que la operación se realizó correctamente.

- Para cerrar las propiedades de BigQuery, haz clic en el botón X.
6. Compila una canalización de datos por lotes
Cómo conectar todos los nodos de una canalización
- Arrastra una flecha de conexión > del borde derecho del nodo fuente y suéltala en el borde izquierdo del nodo de destino.
- Una canalización puede tener varias ramas que obtienen archivos de entrada del mismo nodo de origen de GCS.

- Asigna un nombre a la canalización.
Eso es todo. Acabas de crear tu primera canalización de datos por lotes y puedes implementarla y ejecutarla.
Envía alertas de la canalización por correo electrónico (opcional)
Para utilizar la función SendEmail de Pipeline Alert, la configuración requiere que se configure un servidor de correo para enviar correos desde una instancia de máquina virtual. Consulta el siguiente vínculo de referencia para obtener más información:
Envía correos electrónicos desde una instancia | Documentación de Compute Engine
En este codelab, configuraremos un servicio de retransmisión de correo a través de Mailgun con los siguientes pasos:
- Sigue las instrucciones en Envía correos electrónicos con Mailgun | Documentación de Compute Engine para configurar una cuenta con Mailgun y configurar el servicio de retransmisión de correo electrónico. A continuación, se indican las modificaciones adicionales.
- Agrega todas las direcciones de correo electrónico de los destinatarios a la lista de autorizados de Mailgun. Puedes encontrar esta lista en la opción Mailgun> Sending> Overview del panel izquierdo.

Una vez que los destinatarios hagan clic en "Acepto" en el correo electrónico enviado desde support@mailgun.net, sus direcciones de correo electrónico se guardarán en la lista de direcciones autorizadas para recibir correos electrónicos de alerta de la canalización.

- Paso 3 de la sección "Antes de comenzar": Crea una regla de firewall de la siguiente manera:

- Paso 3 de "Configura Mailgun como retransmisión de correo con Postfix". Selecciona Sitio de Internet o Internet con host inteligente, en lugar de Solo local, como se menciona en las instrucciones.

- Paso 4 de "Configura Mailgun como retransmisión de correo con Postfix". Edita vi /etc/postfix/main.cf para agregar 10.128.0.0/9 al final de mynetworks.

- Edita vi /etc/postfix/master.cf para cambiar el puerto smtp predeterminado (25) al puerto 587.

- En la esquina superior derecha de Data Fusion Studio, haz clic en Configurar. Haz clic en Alerta de canalización y, luego, en el botón + para abrir la ventana Alertas. Selecciona SendEmail.

- Completa el formulario de configuración de Correo electrónico. Selecciona completion, success o failure en el menú desplegable Run Condition para cada tipo de alerta. Si Include Workflow Token = false, solo se envía la información del campo Message. Si Include Workflow Token = true, se envía la información del campo Message y la información detallada del token de flujo de trabajo. Debes usar minúsculas para Protocol. Usa cualquier correo electrónico "falso" que no sea la dirección de correo electrónico de tu empresa para el Remitente.

7. Configura, implementa, ejecuta o programa la canalización

- En la esquina superior derecha de Data Fusion Studio, haz clic en Configurar. Selecciona Spark en Engine Config. Haz clic en Guardar en la ventana Configurar.

- Haz clic en Vista previa para obtener una vista previa de los datos y, luego, vuelve a hacer clic en **Vista previa** para volver a la ventana anterior. También puedes **ejecutar** la canalización en el modo de vista previa.

- Haz clic en Registros para ver los registros.
- Haz clic en Guardar para guardar todos los cambios.
- Haz clic en Importar para importar la configuración de la canalización guardada cuando compiles una canalización nueva.
- Haz clic en Exportar para exportar la configuración de una canalización.
- Haz clic en Implementar para implementar la canalización.
- Una vez implementada, haga clic en Ejecutar y espere que la canalización se ejecute hasta el final.

- Para duplicar la canalización, selecciona Duplicar en el botón Acciones.
- Puedes exportar la configuración de la canalización seleccionando Exportar en el botón Acciones.
- Haz clic en Activadores de entrada o Activadores de salida en el borde izquierdo o derecho de la ventana de Studio para establecer activadores de canalización si lo deseas.
- Haz clic en Programar para programar la ejecución de la canalización y cargar datos de forma periódica.

- En Resumen, se muestran gráficos del historial de ejecuciones, los registros, los registros de errores y las advertencias.
8. Validación
- La canalización de Validate se ejecutó correctamente.

- Valida si el conjunto de datos de BigQuery tiene todas las tablas.
bq ls $PROJECT_ID:$DATASET_ID
tableId Type Labels Time Partitioning
----------------- ------- -------- -------------------
Allergies TABLE
Careplans TABLE
Conditions TABLE
Encounters TABLE
Imaging_Studies TABLE
Immunizations TABLE
Medications TABLE
Observations TABLE
Organizations TABLE
Patients TABLE
Procedures TABLE
Providers TABLE
- Recibir correos electrónicos de alerta (si se configuraron)
Cómo ver los resultados
Para ver los resultados después de la ejecución de una canalización, haga lo siguiente:
- Consulta la tabla en la IU de BigQuery. IR A LA IU DE BIGQUERY
- Actualiza la siguiente consulta con el nombre de tu proyecto, conjunto de datos y tabla.

9. Realiza una limpieza
Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de Google Cloud Platform para los recursos que se usaron en este instructivo:
Una vez que completes el instructivo, puedes limpiar los recursos que creaste en GCP para que no consuman tu cuota y no se te facture por ellos en el futuro. En las secciones siguientes, se describe cómo borrar o desactivar estos recursos.
Borra el conjunto de datos de BigQuery
Sigue estas instrucciones para borrar el conjunto de datos de BigQuery que creaste como parte de este instructivo.
Cómo borrar el bucket de GCS
Sigue estas instrucciones para borrar el bucket de GCS que creaste como parte de este instructivo.
Borra la instancia de Cloud Data Fusion
Sigue estas instrucciones para borrar tu instancia de Cloud Data Fusion.
Borra el proyecto
La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo.
Para borrar el proyecto, haz lo siguiente:
- En GCP Console, ve a la página Proyectos. IR A LA PÁGINA PROYECTOS
- En la lista de proyectos, elige el proyecto que deseas borrar y haz clic en Borrar.
- En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrarlo.
10. Felicitaciones
¡Felicitaciones! Completaste correctamente el codelab para transferir datos de atención médica a BigQuery con Cloud Data Fusion.
Importaste datos CSV de Google Cloud Storage a BigQuery.
Creaste visualmente la canalización de integración de datos para cargar, transformar y enmascarar datos de atención médica de forma masiva.
Ahora conoces los pasos clave necesarios para comenzar tu recorrido de análisis de datos de atención médica con BigQuery en Google Cloud.