Ingerir dados CSV para o BigQuery usando o Cloud Data Fusion: ingestão em lote

1. Introdução

12fb66cc134b50ef.png

Última atualização:28/02/2020

Este codelab demonstra um padrão de ingestão de dados para ingerir dados de saúde formatados em CSV em massa no BigQuery. Neste laboratório, vamos usar o pipeline de dados em lote do Cloud Data Fusion. Dados realistas de testes de assistência médica foram gerados e disponibilizados para você no bucket do Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).

Neste codelab, você vai aprender:

  • Como ingerir dados CSV (carregamento programado em lote) do GCS para o BigQuery usando o Cloud Data Fusion.
  • Como criar visualmente um pipeline de integração de dados no Cloud Data Fusion para carregar, transformar e mascarar dados de saúde em massa.

O que é necessário para executar este codelab?

  • Você precisa ter acesso a um projeto do GCP.
  • Você precisa ter o papel de proprietário no projeto do GCP.
  • Dados de saúde no formato CSV, incluindo o cabeçalho.

Se você não tiver um projeto do GCP, siga estas etapas para criar um.

Os dados de saúde no formato CSV foram pré-carregados no bucket do GCS em gs://hcls_testing_data_fhir_10_patients/csv/. Cada arquivo CSV de recursos tem uma estrutura de esquema única. Por exemplo, Patients.csv tem um esquema diferente de Providers.csv. Os arquivos de esquema pré-carregados podem ser encontrados em gs://hcls_testing_data_fhir_10_patients/csv_schemas.

Se você precisar de um novo conjunto de dados, poderá gerá-lo usando o SyntheaTM. Em seguida, faça o upload dele para o GCS em vez de copiá-lo do bucket na etapa "Copiar dados de entrada".

2. Configuração do projeto do GCP

Inicialize variáveis de shell para seu ambiente.

Para encontrar o PROJECT_ID, consulte Como identificar projetos.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Criar o bucket do GCS para armazenar dados de entrada e registros de erros com a ferramenta gsutil.

gsutil mb -l us gs://$BUCKET_NAME

Receba acesso ao conjunto de dados sintético.

  1. No endereço de e-mail que você está usando para fazer login no console do Cloud, envie um e-mail para hcls-solutions-external+subscribe@google.com pedindo para participar.
  2. Você vai receber um e-mail com instruções para confirmar a ação. 525a0fa752e0acae.png
  3. Use a opção de responder ao e-mail para participar do grupo. NÃO clique no botão.
  4. Depois de receber o e-mail de confirmação, prossiga para a próxima etapa do codelab.

Copiar dados de entrada.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Crie um conjunto de dados do BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

3. Configuração do ambiente do Cloud Data Fusion

Siga estas etapas para ativar a API Data Fusion e conceder as permissões necessárias:

Ativar APIs.

  1. Acesse a Biblioteca de APIs do Console do GCP.
  2. Selecione um projeto na lista.
  3. Na Biblioteca de APIs, selecione a API que você quer ativar. Se você precisar de ajuda para encontrar a API, use o campo de pesquisa e/ou os filtros.
  4. Na página da API, clique em ATIVAR.

Criar uma instância do Cloud Data Fusion.

  1. No Console do GCP, selecione o ID do projeto.
  2. Selecione Data Fusion no menu à esquerda e, em seguida, clique no botão CRIAR UMA INSTÂNCIA no meio da página (1a criação) ou clique no botão CRIAR INSTÂNCIA no menu superior (criação adicional).

a828690ff3bf3c46.png

8372c944c94737ea.png

  1. Forneça o nome da instância. Selecione Enterprise.

5af91e46917260ff.png

  1. Clique no botão "CRIAR".

Configure as permissões da instância.

Depois de criar uma instância, use as etapas a seguir para conceder as permissões da instância à conta de serviço associada no seu projeto:

  1. Clique no nome da instância para acessar a página de detalhes dela.

76ad691f795e1ab3.png

  1. Copie a conta de serviço.

6c91836afb72209d.png

  1. Acesse a página do IAM do projeto.
  2. Na página de permissões do IAM, vamos adicionar a conta de serviço como um novo membro e conceder a ela o papel Agente de serviço da API Cloud Data Fusion. Clique no botão Adicionar e cole a "conta de serviço" no campo "Novos membros" e selecione "Service Management -> Papel de agente de servidor da API Cloud Data Fusion.
  3. ea68b28d917a24b1.png
  4. Clique em Salvar.

Depois que essas etapas forem concluídas, o Cloud Data Fusion já estará pronto para ser usado. Basta clicar no link Visualizar instância na página de instâncias ou de detalhes da instância.

Configure a regra de firewall.

  1. Acesse o Console do GCP -> Rede VPC -> Regras de firewall para verificar se a regra default-allow-ssh existe ou não.

102adef44bbe3a45.png

  1. Caso contrário, adicione uma regra de firewall que permita todo o tráfego SSH de entrada para a rede padrão.

Usando a linha de comando:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Na interface: clique em "Criar regra de firewall" e preencha as informações:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Criar um esquema para transformação

Agora que temos o ambiente do Cloud Fusion no GCP, vamos criar um esquema. Precisamos desse esquema para transformar os dados CSV.

  1. Na janela do Cloud Data Fusion, clique no link "Visualizar instância" na coluna "Ação". Você será redirecionado para outra página. Clique no url fornecido para abrir a instância do Cloud Data Fusion. Clique em "Iniciar tour" ou "Agora não" no pop-up de boas-vindas.
  2. Expanda o menu de navegação selecione Pipeline -> Estúdio

6561b13f30e36c3a.png

  1. Na seção "Transformação" na paleta de plug-ins à esquerda, clique duas vezes no nó do Wrangler, que aparecerá na interface de pipelines de dados.

aa44a4db5fe6623a.png

  1. Aponte para o nó do Wrangler e clique em Propriedades. Clique no botão Wrangle e selecione um arquivo de origem .csv (por exemplo, pacientes.csv), que precisa ter todos os campos de dados para criar o esquema desejado.
  2. Clique na seta para baixo (transformações de coluna) ao lado do nome de cada coluna, por exemplo, corpo. 802edca8a97da18.png
  3. Por padrão, a importação inicial pressupõe que há apenas uma coluna no seu arquivo de dados. Para analisá-lo como um CSV, escolha AnalisarCSV, selecione o delimitador e marque a opção "Definir a primeira linha como cabeçalho". conforme apropriado. Clique no botão Aplicar.
  4. Clique na seta para baixo ao lado do campo Corpo e selecione Excluir coluna para remover o campo Corpo. Além disso, é possível testar outras transformações, como remover colunas, alterar o tipo de dados de algumas colunas (o padrão é tipo "string"), dividir colunas, definir nomes de colunas etc.

e6d2cda51ff298e7.png

  1. As "Colunas" e "Etapas de transformação" mostram o esquema de saída e o roteiro do Wrangler. Clique em Aplicar no canto superior direito. Clique no botão Validar. A mensagem verde "Nenhum erro encontrado" indica sucesso.

1add853c43f2abee.png

  1. Nas propriedades do Wrangler, clique no menu suspenso Ações para Exportar o esquema desejado para seu armazenamento local e Importar no futuro, se necessário.
  2. Salve o roteiro do Wrangler para uso futuro.
parse-as-csv :body ',' true
drop body
  1. Para fechar a janela "Propriedades do Wrangler", clique no botão X.

5. Criar nós para o pipeline

Nesta seção, vamos criar os componentes do pipeline.

  1. Na interface do Data Pipelines, no canto superior esquerdo, você verá que Pipeline de dados - Lote está selecionado como o tipo de pipeline.

af67c42ce3d98529.png

  1. No painel esquerdo, há seções diferentes, como "Filtro", "Origem", "Transformação", "Análise", "Coletor", "Condições e ações", "Gerenciadores de erros" e "Alertas", em que é possível selecionar um ou mais nós para o pipeline.

c4438f7682f8b19b.png

Nó de origem

  1. Selecione o nó de origem.
  2. Na seção "Origem" da paleta de plug-ins à esquerda, clique duas vezes no nó do Google Cloud Storage, que aparece na interface de pipelines de dados.
  3. Aponte para o nó de origem do GCS e clique em Propriedades.

87e51a3e8dae8b3f.png

  1. Preencha os campos obrigatórios. Defina os seguintes campos:
  • Rótulo = {qualquer texto}
  • Nome de referência = {qualquer texto}
  • ID do projeto = detecção automática
  • Caminho = URL do GCS para o bucket no projeto atual. Por exemplo, gs://$BUCKET_NAME/csv/
  • Formato = texto
  • Campo do caminho = nome do arquivo
  • Caminho somente com o nome de arquivo = true
  • Ler arquivos recursivamente = true
  1. Adicionar campo "filename" ao esquema de saída do GCS clicando no botão +.
  2. Clique em Documentação para ver uma explicação detalhada. Clique no botão Validar. A mensagem verde "Nenhum erro encontrado" indica sucesso.
  3. Para fechar as propriedades do GCS, clique no botão X.

Transformação node

  1. Selecione o nó de transformação.
  2. Na seção "Transformação" na paleta de plug-ins à esquerda, clique duas vezes no nó do Wrangler, que aparece na interface de pipelines de dados. Conecte o nó de origem do GCS ao nó de transformação do Wrangler.
  3. Aponte para o nó do Wrangler e clique em Propriedades.
  4. Clique no menu suspenso Ações e selecione Importar para importar um esquema salvo (por exemplo: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients.json)) e cole a receita salva da seção anterior.
  5. Ou reutilize o nó do Wrangler da seção: Criar um esquema para transformação.
  6. Preencha os campos obrigatórios. Defina os seguintes campos:
  • Rótulo = {qualquer texto}
  • Nome do campo de entrada = {*}
  • Precondition = {filename != "patients.csv"} para diferenciar cada arquivo de entrada (por exemplo, pacientes.csv, providers.csv, provedores.csv, allergies.csv etc.) do nó de origem.

2426f8f0a6c4c670.png

  1. Adicione um nó de JavaScript para executar o JavaScript fornecido pelo usuário que transforma ainda mais os registros. Neste codelab, usamos o nó JavaScript para receber um carimbo de data/hora para cada atualização de registro. Conecte o nó de transformação do Wrangler ao nó de transformação do JavaScript. Abra Propriedades do JavaScript e adicione a seguinte função:

75212f9ad98265a8.png

function transform(input, emitter, context) {
  input.TIMESTAMP = (new Date()).getTime()*1000;
  emitter.emit(input);
}
  1. Adicione o campo chamado TIMESTAMP ao esquema de saída (se ele não existir) clicando no sinal de +. Selecione o carimbo de data/hora como o tipo de dados.

4227389b57661135.png

  1. Clique em Documentação para conferir uma explicação detalhada. Clique no botão "Validar" para validar todas as informações de entrada. "Nenhum erro encontrado" em verde indica sucesso.
  2. Para fechar a janela "Transform Properties", clique no botão X.

Mascaramento e desidentificação de dados

  1. Para selecionar colunas de dados individuais, clique na seta para baixo e aplique as regras de mascaramento em "Mascarar dados" de acordo com seus requisitos (por exemplo, coluna SSN).

bb1eb067dd6e0946.png

  1. É possível adicionar mais diretivas na janela Recipe do nó do Wrangler. Por exemplo, use a diretiva hash com o algoritmo de hash seguindo esta sintaxe para fins de desidentificação:
hash <column> <algorithm> <encode>

<column>: name of the column
<algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.)
<encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

cbcc9a0932f53197.png

Nó do coletor

  1. Selecione o nó do coletor.
  2. Na seção "Coletor" na paleta de plug-ins à esquerda, clique duas vezes no nó do BigQuery, que vai aparecer na interface do pipeline de dados.
  3. Aponte para o nó do coletor do BigQuery e clique em "Propriedades".

1be711152c92c692.png

  1. Preencha os campos obrigatórios. Defina os seguintes campos:
  • Rótulo = {qualquer texto}
  • Nome de referência = {qualquer texto}
  • ID do projeto = detecção automática
  • Conjunto de dados = conjunto de dados do BigQuery usado no projeto atual (por exemplo, DATASET_ID)
  • Tabela = {nome da tabela}
  1. Clique em Documentação para conferir uma explicação detalhada. Clique no botão "Validar" para validar todas as informações de entrada. "Nenhum erro encontrado" em verde indica sucesso.

c5585747da2ef341.png

  1. Para fechar as propriedades do BigQuery, clique no botão X.

6. Criar pipeline de dados em lote

Como conectar todos os nós em um pipeline

  1. Arraste uma seta de conexão > na borda direita do nó de origem e soltam na borda esquerda do nó de destino.
  2. Um pipeline pode ter várias ramificações que recebem arquivos de entrada do mesmo nó de origem do GCS.

67510ab46bd44d36.png

  1. Dê um nome ao pipeline.

É isso. Você acabou de criar seu primeiro pipeline de dados em lote e pode implantá-lo e executá-lo.

Enviar alertas de pipeline por e-mail (opcional)

Para utilizar o recurso SendEmail de alerta de pipeline, a configuração exige que um servidor de e-mail seja definido para enviar e-mails de uma instância de máquina virtual. Consulte o link de referência abaixo para mais informações:

Como enviar e-mails de uma instância | Documentação do Compute Engine

Neste codelab, configuramos um serviço de redirecionamento de e-mail pelo Mailgun seguindo estas etapas:

  1. Siga as instruções em Como enviar e-mails com o Mailgun | Documentação do Compute Engine para configurar uma conta no Mailgun e configurar o serviço de redirecionamento de e-mail. Confira abaixo as modificações adicionais.
  2. Adicionar todos os destinatários para a lista autorizada do Mailgun. Essa lista pode ser encontrada na opção Mailgun>Envio>Visão geral no painel esquerdo.

7e6224cced3fa4e0.png fa78739f1ddf2dc2.png

Quando os destinatários clicarem em "Concordo" no e-mail enviado por support@mailgun.net, os endereços de e-mail dessas pessoas serão salvos na lista autorizada para receber e-mails de alerta de pipeline.

72847c97fd5fce0f.png

  1. Etapa 3 de "Antes de começar" - crie uma regra de firewall da seguinte maneira:

75b063c165091912.png

  1. Etapa 3 em "Configurar o Mailgun como um redirecionamento de e-mail com o Postfix". Selecione Site da Internet ou Internet com host inteligente, em vez de Somente local, como mencionado nas instruções.

8fd8474a4ef18f16.png

  1. Etapa 4 em "Configurar o Mailgun como um redirecionamento de e-mail com o Postfix". Edite vi /etc/postfix/main.cf para adicionar 10.128.0.0/9 ao final de mynetworks.

249fbf3edeff1ce8.png

  1. Edite vi /etc/postfix/master.cf para alterar o smtp (25) padrão para a porta 587.

86c82cf48c687e72.png

  1. No canto superior direito do Data Fusion Studio, clique em Configurar. Clique em Pipeline alert e clique no botão + para abrir a janela Alerts. Selecione SendEmail.

dc079a91f1b0da68.png

  1. Preencha o formulário de configuração de e-mail. Selecione conclusão, sucesso ou falha no menu suspenso Condição de execução para cada tipo de alerta. Se Incluir token de fluxo de trabalho = false, somente as informações do campo "Mensagem" serão enviadas. Se Incluir token de fluxo de trabalho = true, as informações do campo "Mensagem" e as informações detalhadas do token de fluxo de trabalho serão enviadas. É necessário usar letras minúsculas em Protocolo. Use qualquer "falso" e-mail diferente do endereço de e-mail corporativo para Remetente.

1fa619b6ce28f5e5.png

7. Configurar, implantar, executar/programar pipeline

db612e62a1c7ab7e.png

  1. No canto superior direito do Data Fusion Studio, clique em Configurar. Selecione o Spark para a configuração do mecanismo. Clique em Salvar na janela Configurar.

8ecf7c243c125882.png

  1. Clique em Visualizar para acessar os dados** e, depois, em **Visualizar** novamente para voltar à janela anterior. Também é possível **executar** o pipeline no modo de visualização.

b3c891e5e1aa20ae.png

  1. Clique em Registros para visualizá-los.
  2. Clique em Salvar para salvar todas as mudanças.
  3. Clique em Importar para importar a configuração de pipeline salva ao criar um novo pipeline.
  4. Clique em Exportar para exportar uma configuração de pipeline.
  5. Clique em Implantar para implantar o pipeline.
  6. Depois da implantação, clique em Executar e aguarde a conclusão do pipeline.

bb06001d46a293db.png

  1. Selecione "Duplicar" no botão Ações para fazer isso.
  2. Para exportar a configuração do pipeline, selecione "Exportar" no botão Ações.
  3. Clique em Acionadores de entrada ou em Acionadores de saída na borda esquerda ou direita da janela do Studio para definir acionadores de pipeline, se você quiser.
  4. Clique em Programar para programar a execução e o carregamento periódicos de dados do pipeline.

4167fa67550a49d5.png

  1. Resumo: mostra gráficos do histórico de execução, registros, registros de erros e avisos.

8. Validação

  1. O pipeline de validação foi executado.

7dee6e662c323f14.png

  1. Valide se o conjunto de dados do BigQuery tem todas as tabelas.
bq ls $PROJECT_ID:$DATASET_ID
     tableId       Type    Labels   Time Partitioning
----------------- ------- -------- -------------------
 Allergies         TABLE
 Careplans         TABLE
 Conditions        TABLE
 Encounters        TABLE
 Imaging_Studies   TABLE
 Immunizations     TABLE
 Medications       TABLE
 Observations      TABLE
 Organizations     TABLE
 Patients          TABLE
 Procedures        TABLE
 Providers         TABLE
  1. Receber e-mails de alerta (se configurado).

Como visualizar os resultados

Para ver os resultados depois da execução do pipeline:

  1. Consulte a tabela na interface do BigQuery. ACESSAR A interface do BIGQUERY
  2. Atualize a consulta abaixo para seu nome de projeto, conjunto de dados e tabela.

e32bfd5d965a117f.png

9. Limpar

Para evitar que os recursos usados nesse tutorial sejam cobrados na sua conta do Google Cloud Platform:

Depois de concluir este tutorial, será possível limpar os recursos criados no GCP para que eles não ocupem sua cota e você não seja cobrado por eles no futuro. As próximas seções descrevem como excluir ou desativar esses recursos.

Como excluir o conjunto de dados do BigQuery

Siga estas instruções para excluir o conjunto de dados do BigQuery que você criou como parte deste tutorial.

Como excluir o bucket do GCS

Siga estas instruções para excluir o bucket do GCS criado como parte deste tutorial.

Como excluir a instância do Cloud Data Fusion

Siga estas instruções para excluir sua instância do Cloud Data Fusion.

Excluir o projeto

O jeito mais fácil de evitar cobranças é excluindo o projeto que você criou para este tutorial.

Para excluir o projeto:

  1. No Console do GCP, acesse a página Projetos. ACESSAR A PÁGINA "PROJETOS"
  2. Na lista de projetos, selecione um e clique em Excluir.
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

10. Parabéns

Parabéns, você concluiu o codelab para ingerir dados de saúde no BigQuery usando o Cloud Data Fusion.

Você importou dados CSV do Google Cloud Storage para o BigQuery.

Você criou visualmente o pipeline de integração de dados para carregar, transformar e mascarar dados de saúde em massa.

Agora você sabe as principais etapas necessárias para iniciar sua jornada de análise de dados de saúde com o BigQuery no Google Cloud Platform.