Transfiere datos CSV a BigQuery con Cloud Data Fusion: transferencia por lotes

1. Introducción

12fb66cc134b50ef.png

Última actualización: 28 de febrero de 2020

En este codelab, se muestra un patrón de transferencia de datos para transferir datos de atención médica con formato CSV a BigQuery de forma masiva. Para este lab, usaremos la canalización de datos por lotes de Cloud Data Fusion. Se generaron datos realistas de pruebas de salud y se pusieron a disposición en el bucket de Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).

En este codelab, aprenderás lo siguiente:

  • Cómo transferir datos de CSV (carga programada por lotes) de GCS a BigQuery con Cloud Data Fusion.
  • Cómo crear de forma visual una canalización de integración de datos en Cloud Data Fusion para cargar, transformar y enmascarar datos de atención médica de forma masiva

¿Qué necesitas para ejecutar este codelab?

  • Necesitas acceso a un proyecto de GCP.
  • Debes tener asignada una función de propietario para el proyecto de GCP.
  • Datos de atención médica en formato CSV, incluido el encabezado.

Si no tienes un proyecto de GCP, sigue estos pasos para crear uno nuevo.

Los datos de atención médica en formato CSV se precargaron en el bucket de GCS en gs://hcls_testing_data_fhir_10_patients/csv/. Cada archivo CSV de recursos tiene su estructura de esquema única. Por ejemplo, Patients.csv tiene un esquema diferente al de Providers.csv. Los archivos de esquema precargados se pueden encontrar en gs://hcls_testing_data_fhir_10_patients/csv_schemas.

Si necesitas un conjunto de datos nuevo, siempre puedes generarlo con SyntheaTM. Luego, súbelos a GCS en lugar de copiarlos del bucket en el paso Copiar datos de entrada.

2. Configuración de un proyecto de GCP

Inicializa variables de shell para tu entorno.

Para encontrar el PROJECT_ID, consulta Identifica proyectos.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Crea un bucket de GCS para almacenar datos de entrada y registros de errores con la herramienta gsutil.

gsutil mb -l us gs://$BUCKET_NAME

Obtén acceso al conjunto de datos sintético.

  1. Desde la dirección de correo electrónico que usas para acceder a la consola de Cloud, envía un correo electrónico a hcls-solutions-external+subscribe@google.com para solicitar unirte.
  2. Recibirás un correo electrónico con instrucciones para confirmar la acción. 525a0fa752e0acae.png
  3. Usa la opción de responder el correo electrónico para unirte al grupo. NO hagas clic en el botón.
  4. Una vez que recibas el correo electrónico de confirmación, puedes continuar con el siguiente paso del codelab.

Copia los datos de entrada.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Crea un conjunto de datos de BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

3. Configuración del entorno de Cloud Data Fusion

Sigue estos pasos para habilitar la API de Cloud Data Fusion y otorgar los permisos necesarios:

Habilita las APIs.

  1. Ve a la Biblioteca de API de GCP Console.
  2. Selecciona el proyecto desde la lista de proyectos.
  3. En la biblioteca de API, selecciona la API que quieres habilitar. Si necesitas ayuda para encontrar la API, usa el campo de búsqueda o los filtros.
  4. En la página de la API, haz clic en HABILITAR.

Crea una instancia de Cloud Data Fusion.

  1. En GCP Console, selecciona el ID del proyecto.
  2. Selecciona Data Fusion en el menú de la izquierda y, luego, haz clic en el botón CREATE AN INSTANCE en el medio de la página (primera creación) o haz clic en el botón CREATE INSTANCE en el menú superior (creación adicional).

a828690ff3bf3c46.png

8372c944c94737ea.png

  1. Proporciona el nombre de la instancia. Selecciona Enterprise.

5af91e46917260ff.png

  1. Haz clic en el botón CREAR.

Configura los permisos de la instancia.

Después de crear una instancia, sigue estos pasos para otorgar a la cuenta de servicio asociada con la instancia los permisos para tu proyecto:

  1. Haga clic en el nombre de la instancia para navegar a su página de detalles.

76ad691f795e1ab3.png

  1. Copia la cuenta de servicio.

6c91836afb72209d.png

  1. Navega a la página IAM de tu proyecto.
  2. En la página de permisos de IAM, agregaremos la cuenta de servicio como un miembro nuevo y le otorgaremos el rol de Agente de servicio de la API de Cloud Data Fusion. Haz clic en el botón Agregar y pega la “cuenta de servicio” en el campo Miembros nuevos y selecciona Administración de servicio -> Rol de agente del servidor de la API de Cloud Data Fusion.
  3. ea68b28d917a24b1.png
  4. Haz clic en Guardar.

Una vez que hayas completado estos pasos, puedes comenzar a usar Cloud Data Fusion haciendo clic en el vínculo Ver instancia en la página de instancias de Cloud Data Fusion o en la página de detalles de una instancia.

Configura la regla de firewall.

  1. Navega a la consola de GCP -> Red de VPC -> Reglas de firewall para verificar si existe o no la regla default-allow-ssh.

102adef44bbe3a45.png

  1. De lo contrario, agrega una regla de firewall que permita todo el tráfico SSH de entrada a la red predeterminada.

Con la línea de comandos:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Mediante la IU: Haz clic en Crear regla de firewall y completa la información:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Crea un esquema para la transformación

Ahora que tenemos el entorno de Cloud Fusion en GCP, creemos un esquema. Necesitamos este esquema para la transformación de los datos CSV.

  1. En la ventana de Cloud Data Fusion, haz clic en el vínculo Ver instancia en la columna Acción. Se te redireccionará a otra página. Haz clic en la url proporcionada para abrir la instancia de Cloud Data Fusion. Hacer clic en "Iniciar visita guiada" o "No, gracias" de la ventana emergente de bienvenida.
  2. Expandir el menú “hamburguesa” en el menú, selecciona Canalización -> Estudio

6561b13f30e36c3a.png

  1. En la sección Transform en la paleta de complementos a la izquierda, haz doble clic en el nodo Wrangler, que aparecerá en la IU de Data Pipelines.

aa44a4db5fe6623a.png

  1. Apunta al nodo Wrangler y haz clic en Properties. Haz clic en el botón Wrangle y, luego, selecciona un archivo fuente .csv (por ejemplo, paciente.csv), que debe tener todos los campos de datos para compilar el esquema deseado.
  2. Haz clic en la flecha hacia abajo (transformaciones de columnas) junto al nombre de cada columna (por ejemplo, cuerpo). 802edca8a97da18.png
  3. De forma predeterminada, la importación inicial supondrá que solo hay una columna en tu archivo de datos. Para analizarlo como un archivo CSV, elige AnalizarCSV, selecciona el delimitador y marca la opción "Set first row as header" según corresponda. Haz clic en el botón Aplicar.
  4. Haz clic en la flecha hacia abajo junto al campo Cuerpo y selecciona Borrar columna para quitar el campo Cuerpo. Además, puedes probar otras transformaciones, como quitar columnas, cambiar el tipo de datos para algunas columnas (la configuración predeterminada es el tipo “cadena”), dividir columnas, configurar nombres de columnas, etcétera.

e6d2cda51ff298e7.png

  1. La sección “Columnas” y “Pasos de transformación” Las pestañas muestran el esquema de salida y la receta de Wrangler. Haz clic en Aplicar en la esquina superior derecha. Haz clic en el botón Validar. El botón verde "No se encontraron errores" indica éxito.

1add853c43f2abee.png

  1. En Propiedades de Wrangler, haga clic en el menú desplegable Acciones para exportar el esquema deseado a su almacenamiento local para realizar una importación futura, si es necesario.
  2. Guarda la receta de Wrangler para usarla más adelante.
parse-as-csv :body ',' true
drop body
  1. Para cerrar la ventana Wrangler Properties, haz clic en el botón X.

5. Compila nodos para la canalización

En esta sección, compilaremos los componentes de la canalización.

  1. En la parte superior izquierda de la IU de Data Pipelines, deberías ver que Data Pipeline - Batch está seleccionada como el tipo de canalización.

af67c42ce3d98529.png

  1. En el panel izquierdo, hay diferentes secciones en el panel izquierdo como Filtro, Fuente, Transformación, Estadísticas, Receptor, Condiciones y acciones, Controladores de errores y alertas, en las que puedes seleccionar uno o varios nodos para la canalización.

c4438f7682f8b19b.png

Nodo fuente

  1. Selecciona el nodo de origen.
  2. En la sección Fuente en la paleta de complementos a la izquierda, haz doble clic en el nodo de Google Cloud Storage, que aparece en la IU de Data Pipelines.
  3. Coloca el cursor sobre el nodo fuente de GCS y haz clic en Propiedades.

87e51a3e8dae8b3f.png

  1. Llena los campos obligatorios. Configura los siguientes campos:
  • Etiqueta = {any text}
  • Nombre de referencia = {any text}
  • ID del proyecto = detección automática
  • Ruta = URL de GCS para el bucket en tu proyecto actual. Por ejemplo, gs://$BUCKET_NAME/csv/.
  • Formato = texto
  • Campo de ruta de acceso = nombre de archivo
  • Solo nombre de archivo de la ruta de acceso = verdadero
  • Leer archivos de forma recurrente = verdadero
  1. Agrega el campo “filename” al esquema de salida de GCS con el botón +.
  2. Haz clic en Documentation para obtener una explicación detallada. Haz clic en el botón Validar. El botón verde "No se encontraron errores" indica éxito.
  3. Para cerrar las propiedades de GCS, haz clic en el botón X.

Transformar el nodo

  1. Selecciona el nodo Transformar.
  2. En la sección Transformar en la paleta de complementos a la izquierda, haz doble clic en el nodo Wrangler, que aparece en la IU de Data Pipelines. Conecta el nodo fuente de GCS al nodo de transformación de Wrangler.
  3. Apunta al nodo Wrangler y haz clic en Properties.
  4. Haz clic en el menú desplegable Acciones, selecciona Importar para importar un esquema guardado (por ejemplo: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ esquema (Patients.json) y pega la receta guardada de la sección anterior.
  5. O bien, vuelve a usar el nodo de Wrangler de la sección: Crea un esquema para la transformación.
  6. Llena los campos obligatorios. Configura los siguientes campos:
  • Etiqueta = {any text}
  • Nombre del campo de entrada = {*}
  • Condición previa = {filename != "patients.csv"} para distinguir cada archivo de entrada (por ejemplo, pacientes.csv, providers.csv, alergies.csv, etc.) del nodo fuente.

2426f8f0a6c4c670.png

  1. Agrega un nodo de JavaScript para ejecutar el JavaScript proporcionado por el usuario que transforma aún más los registros. En este codelab, usamos el nodo de JavaScript para obtener una marca de tiempo de cada actualización de registro. Conectar el nodo de transformación de Wrangler al nodo de transformación de JavaScript Abre las Propiedades de JavaScript y agrega la siguiente función:

75212f9ad98265a8.png

function transform(input, emitter, context) {
  input.TIMESTAMP = (new Date()).getTime()*1000;
  emitter.emit(input);
}
  1. Agrega el campo llamado TIMESTAMP al esquema de salida (si no existe). Para ello, haz clic en el signo +. Selecciona la marca de tiempo como el tipo de datos.

4227389b57661135.png

  1. Haz clic en Documentation para obtener una explicación detallada. Haz clic en el botón Validar (Validate) para validar toda la información de entrada. Verde "No se encontraron errores" indica éxito.
  2. Para cerrar la ventana Transform Properties, haz clic en el botón X.

Enmascaramiento y desidentificación de datos

  1. Para seleccionar columnas de datos individuales, haz clic en la flecha hacia abajo de la columna y aplica las reglas de enmascaramiento en la selección de datos de enmascaramiento según tus requisitos (por ejemplo, la columna NSS).

bb1eb067dd6e0946.png

  1. Puedes agregar más directivas en la ventana Receta del nodo de Wrangler. Por ejemplo, se usa la directiva de hash con el algoritmo de hash mediante esta sintaxis para fines de desidentificación:
hash <column> <algorithm> <encode>

<column>: name of the column
<algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.)
<encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

cbcc9a0932f53197.png

Nodo receptor

  1. Selecciona el nodo receptor.
  2. En la sección Receptor en la paleta de complementos a la izquierda, haz doble clic en el nodo de BigQuery, que aparecerá en la IU de Data Pipeline.
  3. Coloca el cursor sobre el nodo receptor de BigQuery y haz clic en Propiedades.

1be711152c92c692.png

  1. Completa los campos obligatorios. Configura los siguientes campos:
  • Etiqueta = {any text}
  • Nombre de referencia = {any text}
  • ID del proyecto = detección automática
  • Conjunto de datos = Conjunto de datos de BigQuery que se usa en el proyecto actual (es decir, DATASET_ID)
  • Tabla = {table name}
  1. Haz clic en Documentation para obtener una explicación detallada. Haz clic en el botón Validar (Validate) para validar toda la información de entrada. Verde "No se encontraron errores" indica éxito.

c5585747da2ef341.png

  1. Para cerrar las Propiedades de BigQuery, haz clic en el botón X.

6. Crea una canalización de datos por lotes

Conecta todos los nodos de una canalización

  1. Arrastra una flecha de conexión > en el borde derecho del nodo de origen y soltarlo en el borde izquierdo del nodo de destino.
  2. Una canalización puede tener varias ramas que obtienen archivos de entrada del mismo nodo de origen de GCS.

67510ab46bd44d36.png

  1. Asigna un nombre a la canalización.

Eso es todo. Acabas de crear tu primera canalización de datos por lotes y puedes implementarla y ejecutarla.

Enviar alertas de canalización por correo electrónico (opcional)

Para utilizar la función SendEmail de alerta de canalización, la configuración requiere que se establezca un servidor de correo electrónico para enviar mensajes desde una instancia de máquina virtual. Consulta el siguiente vínculo de referencia para obtener más información:

Envía correos electrónicos desde una instancia | Documentación de Compute Engine

En este codelab, configuramos un servicio de retransmisión de correo a través de Mailgun con los siguientes pasos:

  1. Sigue las instrucciones que se indican en Envía correos electrónicos con Mailgun |. documentación de Compute Engine para configurar una cuenta con Mailgun y el servicio de retransmisión de correo electrónico. A continuación, se muestran modificaciones adicionales.
  2. Agregar todos los destinatarios de correo electrónico a la lista autorizada de Mailgun. Esta lista se encuentra en la opción Mailgun>Envío>Descripción general en el panel izquierdo.

7e6224cced3fa4e0.png fa78739f1ddf2dc2.png

Cuando los destinatarios hagan clic en “Acepto”, en el correo electrónico enviado desde support@mailgun.net, sus direcciones de correo electrónico se guardan en la lista autorizada para recibir correos electrónicos de alerta de la canalización.

72847c97fd5fce0f.png

  1. Paso 3 de "Antes de comenzar" , crea una regla de firewall como se muestra a continuación:

80b7c3666655.png

  1. Paso 3 de "Configura Mailgun como retransmisión de correo con Postfix". Selecciona Internet Site o Internet with smarthost, en lugar de Solo local, como se menciona en las instrucciones.

8fd8474a4ef18f16.png

  1. Paso 4 de "Configura Mailgun como retransmisión de correo con Postfix". Edita vi /etc/postfix/main.cf para agregar 10.128.0.0/9 al final de mynetworks.

249fbf3edeff1ce8.png

  1. Edita vi /etc/postfix/master.cf para cambiar el smtp predeterminado (25) al puerto 587.

86c82cf48c687e72.png

  1. En la esquina superior derecha de Data Fusion Studio, haz clic en Configurar. Haz clic en Pipeline alert y, luego, en el botón + para abrir la ventana Alerts. Selecciona SendEmail.

dc079a91f1b0da68.png

  1. Completa el formulario de configuración de Correo electrónico. Selecciona finalización, éxito o falla en el menú desplegable Condición de ejecución para cada tipo de alerta. Si Include Workflow Token = false, solo se enviará la información del campo Mensaje. Si Include Workflow Token = true, se enviará la información detallada del campo Mensaje y del token de flujo de trabajo. Debes usar minúsculas en Protocolo. Usa cualquier valor “falso” correo electrónico que no sea la dirección de correo electrónico de tu empresa para el Remitente.

1fa619b6ce28f5e5.png

7. Configuración, implementación, ejecución y programación de canalizaciones

db612e62a1c7ab7e.png

  1. En la esquina superior derecha de Data Fusion Studio, haz clic en Configurar. Selecciona Spark para la configuración del motor. Haz clic en Save en la ventana Configure.

8ecf7c243c125882.png

  1. Haz clic en Preview para obtener una vista previa de los datos**,** y vuelve a hacer clic en **Preview** para volver a la ventana anterior. También puedes **Ejecutar** la canalización en el modo Vista previa.

b3c891e5e1aa20ae.png

  1. Haz clic en Registros para ver los registros.
  2. Haz clic en Guardar para guardar todos los cambios.
  3. Haz clic en Import para importar la configuración de la canalización guardada cuando compiles la canalización nueva.
  4. Haz clic en Exportar para exportar una configuración de canalización.
  5. Haz clic en Implementar para implementar la canalización.
  6. Una vez implementada, haga clic en Ejecutar y espere que la canalización se ejecute hasta el final.

bb06001d46a293db.png

  1. Para duplicar la canalización, selecciona Duplicar en el botón Acciones.
  2. Para exportar la configuración de la canalización, selecciona Exportar en el botón Acciones.
  3. Haz clic en Activadores de entrada o Activadores de salida, en el borde izquierdo o derecho de la ventana de Studio, para configurar los activadores de canalización si lo deseas.
  4. Haz clic en Programar para programar la canalización de forma que se ejecute y cargue los datos de forma periódica.

4167fa67550a49d5.png

  1. En Resumen, se muestran gráficos del historial de ejecuciones, los registros, los registros de errores y las advertencias.

8. Validación

  1. La canalización de validación se ejecutó correctamente.

7dee6e662c323f14.png

  1. Valida si el conjunto de datos de BigQuery tiene todas las tablas.
bq ls $PROJECT_ID:$DATASET_ID
     tableId       Type    Labels   Time Partitioning
----------------- ------- -------- -------------------
 Allergies         TABLE
 Careplans         TABLE
 Conditions        TABLE
 Encounters        TABLE
 Imaging_Studies   TABLE
 Immunizations     TABLE
 Medications       TABLE
 Observations      TABLE
 Organizations     TABLE
 Patients          TABLE
 Procedures        TABLE
 Providers         TABLE
  1. Recibir correos electrónicos de alerta (si se configuró)

Cómo ver los resultados

Para ver los resultados después de la ejecución de una canalización, haga lo siguiente:

  1. Consultar la tabla en la IU de BigQuery IR A LA IU DE BIGQUERY
  2. Actualiza la siguiente consulta con el nombre, el conjunto de datos y la tabla de tu proyecto.

e32bfd5d965a117f.png

9. Realice una limpieza

Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de Google Cloud Platform para los recursos que se usaron en este instructivo:

Una vez que hayas terminado el instructivo, puedes limpiar los recursos que creaste en GCP para que no consuman tu cuota y no se te facture por ellos en el futuro. En las secciones siguientes, se describe cómo borrar o desactivar estos recursos.

Borra el conjunto de datos de BigQuery

Sigue estas instrucciones para borrar el conjunto de datos de BigQuery que creaste como parte de este instructivo.

Borra el bucket de GCS

Sigue estas instrucciones para borrar el bucket de GCS que creaste como parte de este instructivo.

Borra la instancia de Cloud Data Fusion

Sigue estas instrucciones para borrar tu instancia de Cloud Data Fusion.

Borra el proyecto

La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo.

Para borrar el proyecto, haz lo siguiente:

  1. En GCP Console, ve a la página Proyectos. IR A LA PÁGINA PROYECTOS
  2. En la lista de proyectos, selecciona el proyecto que deseas borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrarlo.

10. Felicitaciones

Felicitaciones, completaste correctamente el codelab para transferir datos de atención médica a BigQuery con Cloud Data Fusion.

Importaste datos CSV de Google Cloud Storage a BigQuery.

Compilaste visualmente la canalización de integración de datos para cargar, transformar y enmascarar datos de atención médica de forma masiva.

Ahora conoces los pasos clave necesarios para comenzar tu recorrido de análisis de datos de atención médica con BigQuery en Google Cloud.