Ingerir dados CSV (valores separados por vírgula) no BigQuery usando o Cloud Data Fusion: ingestão em tempo real

1. Introdução

509db33558ae025.png

Última atualização:28/02/2020

Este codelab demonstra um padrão de ingestão de dados para ingerir dados de saúde formatados em CSV no BigQuery em tempo real. Neste laboratório, vamos usar o pipeline de dados em tempo real do Cloud Data Fusion. Dados realistas de testes de assistência médica foram gerados e disponibilizados para você no bucket do Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).

Neste codelab, você vai aprender:

  • Como ingerir dados CSV (carregamento em tempo real) do Pub/Sub para o BigQuery usando o Cloud Data Fusion.
  • Como criar visualmente um pipeline de integração de dados no Cloud Data Fusion para carregar, transformar e mascarar dados de saúde em tempo real (em inglês).

O que é necessário para executar esta demonstração?

  • Você precisa ter acesso a um projeto do GCP.
  • Você precisa ter um papel de proprietário no projeto do GCP.
  • Dados de saúde no formato CSV, incluindo o cabeçalho.

Se você não tiver um projeto do GCP, siga estas etapas para criar um.

Os dados de saúde no formato CSV foram pré-carregados no bucket do GCS em gs://hcls_testing_data_fhir_10_patients/csv/. Cada arquivo de recurso CSV tem uma estrutura de esquema única. Por exemplo, Patients.csv tem um esquema diferente de Providers.csv. Os arquivos de esquema pré-carregados podem ser encontrados em gs://hcls_testing_data_fhir_10_patients/csv_schemas.

Se você precisar de um novo conjunto de dados, poderá gerá-lo usando o SyntheaTM. Em seguida, faça o upload dele para o GCS em vez de copiá-lo do bucket na etapa "Copiar dados de entrada".

2. Configuração do projeto do GCP

Inicialize variáveis de shell para seu ambiente.

Para encontrar o PROJECT_ID, consulte Como identificar projetos.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Criar um bucket do GCS para armazenar dados de entrada e registros de erros com a ferramenta gsutil.

gsutil mb -l us gs://$BUCKET_NAME

Receba acesso ao conjunto de dados sintético.

  1. No endereço de e-mail que você está usando para fazer login no console do Cloud, envie um e-mail para hcls-solutions-external+subscribe@google.com pedindo para participar.
  2. Você vai receber um e-mail com instruções para confirmar a ação.
  3. Use a opção de responder ao e-mail para participar do grupo. NÃO clique no botão 525a0fa752e0acae.png.
  4. Depois de receber o e-mail de confirmação, prossiga para a próxima etapa do codelab.

Copiar dados de entrada.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Criar um conjunto de dados do BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

Instale e inicialize o SDK Google Cloud e crie tópicos e assinaturas do Pub ou Sub.

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Configuração do ambiente do Cloud Data Fusion

Siga estas etapas para ativar a API Data Fusion e conceder as permissões necessárias:

Ative as APIs.

  1. Acesse a Biblioteca de APIs do Console do GCP.
  2. Selecione um projeto na lista.
  3. Na biblioteca de APIs, selecione a API que você quer ativar ( API Data Fusion ou API Cloud Pub/Sub). Se precisar de ajuda para encontrar a API, use o campo de pesquisa e os filtros.
  4. Na página da API, clique em ATIVAR.

Crie uma instância do Cloud Data Fusion.

  1. No Console do GCP, selecione o ID do projeto.
  2. Selecione Data Fusion no menu à esquerda e, em seguida, clique no botão CRIAR UMA INSTÂNCIA no meio da página (1a criação) ou clique no botão CRIAR INSTÂNCIA no menu superior (criação adicional).

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. Forneça o nome da instância. Selecione Enterprise.

5af91e46917260ff.png

  1. Clique no botão "CRIAR".

Configure as permissões da instância.

Depois de criar uma instância, use as etapas a seguir para conceder as permissões da instância à conta de serviço associada no seu projeto:

  1. Clique no nome da instância para acessar a página de detalhes dela.

76ad691f795e1ab3.png

  1. Copie a conta de serviço.

6c91836afb72209d.png

  1. Navegue até a página do IAM do seu projeto.
  2. Na página de permissões do IAM, clique no botão Adicionar para conceder à conta de serviço o papel Agente de serviço da API Cloud Data Fusion. Cole a "conta de serviço" no campo "Novos membros" e selecione "Service Management -> Papel de agente de servidor da API Cloud Data Fusion.

36f03d11c2a4ce0.png

  1. Clique em + Adicionar outro papel (ou Editar agente de serviço da API Cloud Data Fusion) para adicionar um papel de Assinante do Pub/Sub.

b4bf5500b8cbe5f9.png

  1. Clique em Salvar.

Depois que essas etapas forem concluídas, o Cloud Data Fusion já estará pronto para ser usado. Basta clicar no link Visualizar instância na página de instâncias ou de detalhes da instância.

Configure a regra de firewall.

  1. Acesse o Console do GCP -> Rede VPC -> Regras de firewall para verificar se a regra default-allow-ssh existe ou não.

102adef44bbe3a45.png

  1. Caso contrário, adicione uma regra de firewall que permita todo o tráfego SSH de entrada para a rede padrão.

Usando a linha de comando:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Na interface: clique em "Criar regra de firewall" e preencha as informações:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Criar nós para o pipeline

Agora que temos o ambiente do Cloud Data Fusion no GCP, vamos começar a criar pipelines de dados no Cloud Data Fusion seguindo estas etapas:

  1. Na janela do Cloud Data Fusion, clique no link "Visualizar instância" na coluna "Ação". Você será redirecionado para outra página. Clique no url fornecido para abrir a instância do Cloud Data Fusion. Clique em "Iniciar tour" ou "Agora não" no pop-up de boas-vindas.
  2. Expanda o menu de navegação selecione Pipeline -> Lista

317820def934a00a.png

  1. Clique no botão verde + no canto superior direito e selecione Criar pipeline. Ou clique em "Criar" um link de pipeline.

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. Quando a janela do pipeline aparecer, no canto superior esquerdo, selecione Pipeline de dados – Tempo real no menu suspenso.

372a889a81da5e66.png

  1. Na interface de pipelines de dados, você vai encontrar seções diferentes no painel esquerdo, como "Filtro", "Origem", "Transformação", "Análise", "Coletor", "Gerenciadores de erros" e "Alertas", em que é possível selecionar um ou mais nós para o pipeline.

c63de071d4580f2f.png

Selecione um de origem.

  1. Na seção "Origem" da paleta de plug-ins à esquerda, clique duas vezes no nó Google Cloud PubSub, que aparece na interface de pipelines de dados.
  2. Aponte para o nó de origem do PubSub e clique em Propriedades.

ed857a5134148d7b.png

  1. Preencha os campos obrigatórios. Defina os seguintes campos:
  • Rótulo = {qualquer texto}
  • Nome de referência = {qualquer texto}
  • ID do projeto = detecção automática
  • Assinatura = assinatura criada na seção Criar tópico do Pub/Sub (por exemplo, your-sub)
  • Tópico = tópico criado na seção "Criar tópico do Pub/Sub" (por exemplo, your-topic)
  1. Clique em Documentação para conferir uma explicação detalhada. Clique no botão "Validar" para validar todas as informações de entrada. "Nenhum erro encontrado" em verde indica sucesso.

5c2774338b66bebe.png

  1. Para fechar as propriedades do Pub/Sub, clique no botão X.

Selecione o de transformação.

  1. Na seção "Transformação" na paleta de plug-ins à esquerda, clique duas vezes no nó Projeção, que aparece na interface de pipelines de dados. Conecte o nó de origem do Pub/Sub ao nó de transformação "Projeção".
  2. Aponte para o nó Projeção e clique em Propriedades.

b3a9a3878879bfd7.png

  1. Preencha os campos obrigatórios. Defina os seguintes campos:
  • Convert = converte a mensagem do tipo de byte em tipo de string.
  • Campos a serem soltos = {qualquer campo}
  • Campos a serem mantidos = {message, timestamp e attributes} (por exemplo, atributos: key=‘filename':value=‘patients' enviado do Pub/Sub)
  • Campos a serem renomeados = {mensagem, timestamp}
  1. Clique em Documentação para conferir uma explicação detalhada. Clique no botão "Validar" para validar todas as informações de entrada. "Nenhum erro encontrado" em verde indica sucesso.

b8c2f8efe18234ff.png

  1. Na seção "Transformação" na paleta de plug-ins à esquerda, clique duas vezes no nó do Wrangler, que aparece na interface de pipelines de dados. Conecte o nó de transformação de projeção ao nó de transformação do Wrangler. Aponte para o nó do Wrangler e clique em Propriedades.

aa44a4db5fe6623a.png

  1. Clique no menu suspenso Ações e selecione Importar para importar um esquema salvo (por exemplo: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json).
  2. Adicione o campo TIMESTAMP no Esquema de saída (se ele não existir). Para isso, clique no botão + ao lado do último campo e marque "Nulo". caixa
  3. Preencha os campos obrigatórios. Defina os seguintes campos:
  • Rótulo = {qualquer texto}
  • Nome do campo de entrada = {*}
  • Precondition = {attributes.get("filename") != "patients"} para distinguir cada tipo de registro ou mensagem (por exemplo, pacientes, provedores, alergias etc.) enviado do nó de origem do Pub/Sub.
  1. Clique em Documentação para conferir uma explicação detalhada. Clique no botão "Validar" para validar todas as informações de entrada. "Nenhum erro encontrado" em verde indica sucesso.

3b8e552cd2e3442c.png

  1. Defina os nomes das colunas em uma ordem preferencial e elimine os campos que não forem necessários. Copie o snippet de código a seguir e cole na caixa "Roteiro".
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. Consulte Batch-Codelab: CSV para BigQuery via CDF para mascaramento e desidentificação de dados. Ou adicione este snippet de código mask-number SSN xxxxxxx#### na caixa "Recipe"
  2. Para fechar a janela "Transform Properties", clique no botão X.

Selecione o nó do coletor.

  1. Na seção "Coletor" na paleta de plug-ins à esquerda, clique duas vezes no nó do BigQuery, que aparece na interface do pipeline de dados. Conecte o nó de transformação do Wrangler ao nó do coletor do BigQuery.
  2. Aponte para o nó do coletor do BigQuery e clique em "Propriedades".

1be711152c92c692.png

  1. Preencha os campos obrigatórios:
  • Rótulo = {qualquer texto}
  • Nome de referência = {qualquer texto}
  • ID do projeto = detecção automática
  • Conjunto de dados = conjunto de dados do BigQuery usado no projeto atual (por exemplo, DATASET_ID)
  • Tabela = {nome da tabela}
  1. Clique em Documentação para conferir uma explicação detalhada. Clique no botão "Validar" para validar todas as informações de entrada. "Nenhum erro encontrado" em verde indica sucesso.

bba71de9f31e842a.png

  1. Para fechar as propriedades do BigQuery, clique no botão X.

5. Criar pipeline de dados em tempo real

Na seção anterior, criamos nós que são necessários para criar um pipeline de dados no Cloud Data Fusion. Nesta seção, conectamos os nós para criar o pipeline real.

Como conectar todos os nós em um pipeline

  1. Arraste uma seta de conexão > na borda direita do nó de origem e soltam na borda esquerda do nó de destino.
  2. Um pipeline pode ter várias ramificações que recebem mensagens publicadas do mesmo nó de origem do PubSub.

b22908cc35364cdd.png

  1. Dê um nome ao pipeline.

É isso. Você acabou de criar seu primeiro pipeline de dados em tempo real para ser implantado e executado.

Enviar mensagens pelo Cloud Pub/Sub

Usando a IU do Pub/Sub:

  1. Acesse o Console do GCP -> Pub/Sub -> Tópicos, selecione seu-tópico e clique em PUBLICAR MENSAGEM no menu superior.

d65b2a6af1668ecd.png

  1. Coloque apenas uma linha de registro por vez no campo Mensagem. Clique no botão +ADICIONAR UM ATRIBUTO. Forneça Chave = filename, Valor = <type of record> Por exemplo, pacientes, profissionais de saúde, alergias etc.
  2. Clique no botão "Publicar" para enviar a mensagem.

Usando o comando gcloud:

  1. Insira a mensagem manualmente.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. Fornece a mensagem de maneira semiautomática usando os comandos cat e sed do Unix. Esse comando pode ser executado repetidamente com parâmetros diferentes.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. Configurar, implantar e executar pipeline

Agora que desenvolvemos o pipeline de dados, podemos implantá-lo e executá-lo no Cloud Data Fusion.

1bb5b0b8e2953ffa.png

  1. Mantenha a opção padrão em Configurar.
  2. Clique em Visualizar para ver os dados**.** Clique em **Visualizar** novamente para voltar à janela anterior. Também é possível executar o pipeline no modo de visualização clicando em **EXECUTAR**.

b3c891e5e1aa20ae.png

  1. Clique em Registros para visualizá-los.
  2. Clique em Salvar para salvar todas as mudanças.
  3. Clique em Importar para importar a configuração do pipeline salva ao criar um novo pipeline.
  4. Clique em Exportar para exportar uma configuração de pipeline.
  5. Clique em Implantar para implantar o pipeline.
  6. Depois da implantação, clique em Executar e aguarde a conclusão do pipeline.

f01ba6b746ba53a.png

  1. Clique em Interromper para interromper a execução do pipeline a qualquer momento.
  2. Selecione "Duplicar" no botão Ações para duplicar o pipeline.
  3. Para exportar a configuração do pipeline, selecione "Exportar" no botão Ações.

28ea4fc79445fad2.png

  1. Clique em Resumo para mostrar gráficos de histórico de execução, registros, registros de erros e avisos.

7. Validação

Nesta seção, validamos a execução do pipeline de dados.

  1. Valide se o pipeline foi executado com sucesso e em execução contínua.

1644dfac4a2d819d.png

  1. Confirmar se as tabelas do BigQuery estão carregadas com registros atualizados com base no TIMESTAMP. Neste exemplo, dois registros ou mensagens de pacientes e um registro ou mensagem de alergia foram publicados no tópico do Pub/Sub em 25/06/2019.
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. Valide se as mensagens publicadas em <your-topic> foram recebidas por <your-sub> assinante.
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

Como visualizar os resultados

Para ver os resultados após a publicação das mensagens no tópico do Pub/Sub enquanto o pipeline em tempo real está em execução:

  1. Consulte a tabela na interface do BigQuery. ACESSAR A interface do BIGQUERY
  2. Atualize a consulta abaixo para seu nome de projeto, conjunto de dados e tabela.

6a1fb85bd868abc9.png

8. Limpar

Para evitar que os recursos usados nesse tutorial sejam cobrados na sua conta do Google Cloud Platform:

Depois de concluir este tutorial, limpe os recursos criados no GCP para que eles não ocupem cota e você não seja cobrado por eles no futuro. As próximas seções descrevem como excluir ou desativar esses recursos.

Como excluir o conjunto de dados do BigQuery

Siga estas instruções para excluir o conjunto de dados do BigQuery que você criou como parte deste tutorial.

Como excluir o bucket do GCS

Siga estas instruções para excluir o bucket do GCS criado como parte deste tutorial.

Como excluir a instância do Cloud Data Fusion

Siga estas instruções para excluir sua instância do Cloud Data Fusion.

Excluir o projeto

O jeito mais fácil de evitar cobranças é excluindo o projeto que você criou para este tutorial.

Para excluir o projeto:

  1. No Console do GCP, acesse a página Projetos. ACESSAR A PÁGINA "PROJETOS"
  2. Na lista de projetos, selecione um e clique em Excluir.
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

9. Parabéns

Parabéns, você concluiu o codelab para ingerir dados de saúde no BigQuery usando o Cloud Data Fusion.

Você publicou dados CSV no tópico do Pub/Sub e os carregou no BigQuery.

Você criou visualmente um pipeline de integração de dados para carregar, transformar e mascarar dados de saúde em tempo real.

Agora você sabe as principais etapas necessárias para iniciar sua jornada de análise de dados de saúde com o BigQuery no Google Cloud Platform.