Загрузка данных CSV (значения, разделенные запятыми) в BigQuery с помощью Cloud Data Fusion — прием в режиме реального времени

1. Введение

509db33558ae025.png

Последнее обновление: 28.02.2020

В этом практическом занятии демонстрируется шаблон загрузки данных в формате CSV из медицинских учреждений в BigQuery в режиме реального времени. Для этого занятия мы будем использовать конвейер обработки данных Cloud Data Fusion Real time Data Pipeline. Реалистичные данные медицинских анализов были сгенерированы и предоставлены вам в хранилище Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).

В этой практической работе по программированию вы узнаете:

  • Как загрузить CSV-данные (в режиме реального времени) из системы Pub/Sub в BigQuery с помощью Cloud Data Fusion .
  • Как визуально построить конвейер интеграции данных в Cloud Data Fusion для загрузки, преобразования и маскирования медицинских данных в режиме реального времени .

Что вам понадобится для запуска этой демонстрации?

  • Вам необходим доступ к проекту GCP.
  • Для участия в проекте GCP вам должна быть назначена роль владельца.
  • Медицинские данные в формате CSV, включая заголовок.

Если у вас нет проекта GCP, выполните следующие шаги для создания нового проекта GCP.

Медицинские данные в формате CSV предварительно загружены в хранилище GCS по адресу gs://hcls_testing_data_fhir_10_patients/csv/ . Каждый файл ресурсов CSV имеет уникальную структуру схемы. Например, файл Patients.csv имеет другую схему, чем Providers.csv. Предварительно загруженные файлы схем можно найти по адресу gs://hcls_testing_data_fhir_10_patients/csv_schemas .

Если вам нужен новый набор данных, вы всегда можете сгенерировать его с помощью Synthea™ . Затем загрузите его в GCS вместо копирования из хранилища на этапе копирования входных данных.

2. Настройка проекта GCP

Инициализируйте переменные оболочки для вашей среды.

Чтобы найти PROJECT_ID , обратитесь к разделу «Идентификация проектов» .

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Создайте хранилище GCS для хранения входных данных и журналов ошибок с помощью инструмента gsutil .

gsutil mb -l us gs://$BUCKET_NAME

Получите доступ к синтетическому набору данных.

  1. Отправьте письмо на адрес hcls-solutions-external+subscribe@google.com с того адреса электронной почты, который вы используете для входа в Cloud Console, с просьбой о присоединении.
  2. Вы получите электронное письмо с инструкциями по подтверждению действия.
  3. Воспользуйтесь возможностью ответить на электронное письмо, чтобы присоединиться к группе. НЕ нажимайте на кнопку. 525a0fa752e0acae.png кнопка.
  4. После получения подтверждающего письма вы можете перейти к следующему шагу в практическом задании.

Скопировать входные данные.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Создайте набор данных BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

Установите и инициализируйте Google Cloud SDK , создайте публикации (Pub или Sub Topic) и подписки (Subscriptions).

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Настройка среды облачного слияния данных

Выполните следующие шаги, чтобы включить API Cloud Data Fusion и предоставить необходимые разрешения:

Включить API .

  1. Перейдите в библиотеку API консоли GCP .
  2. Выберите свой проект из списка проектов.
  3. В библиотеке API выберите API, который хотите включить ( Cloud Data Fusion API, Cloud Pub/Sub API). Если вам нужна помощь в поиске API, воспользуйтесь полем поиска и фильтрами.
  4. На странице API нажмите «Включить» .

Создайте экземпляр Cloud Data Fusion .

  1. В консоли GCP выберите свой идентификатор проекта (ProjectID).
  2. Выберите Data Fusion в левом меню, затем нажмите кнопку «СОЗДАТЬ ЭКЗЕМПЛЯР» в середине страницы (первое создание) или кнопку «СОЗДАТЬ ЭКЗЕМПЛЯР» в верхнем меню (дополнительное создание).

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. Укажите имя экземпляра. Выберите «Предприятие» .

5af91e46917260ff.png

  1. Нажмите кнопку СОЗДАТЬ.

Настройте права доступа к экземпляру.

После создания экземпляра выполните следующие действия, чтобы предоставить учетной записи службы, связанной с экземпляром, разрешения в вашем проекте:

  1. Чтобы перейти на страницу с подробной информацией об экземпляре, щелкните по его названию.

76ad691f795e1ab3.png

  1. Скопируйте учетную запись службы.

6c91836afb72209d.png

  1. Перейдите на страницу IAM вашего проекта.
  2. На странице разрешений IAM предоставьте учетной записи службы роль агента службы Cloud Data Fusion API, нажав кнопку «Добавить ». Вставьте «учетную запись службы» в поле «Новые участники» и выберите «Управление службами» -> «Роль агента сервера Cloud Data Fusion API».

36f03d11c2a4ce0.png

  1. Нажмите кнопку «+ Добавить еще одну роль» (или «Редактировать агент службы API Cloud Data Fusion»), чтобы добавить роль подписчика Pub/Sub.

b4bf5500b8cbe5f9.png

  1. Нажмите « Сохранить ».

После выполнения этих шагов вы можете начать использовать Cloud Data Fusion, щелкнув ссылку « Просмотреть экземпляр» на странице экземпляров Cloud Data Fusion или на странице с подробными сведениями об экземпляре.

Настройте правило брандмауэра.

  1. Перейдите в консоль GCP -> Сеть VPC -> Правила брандмауэра, чтобы проверить, существует ли правило default-allow-ssh.

102adef44bbe3a45.png

  1. В противном случае добавьте правило брандмауэра, разрешающее весь входящий SSH-трафик в сеть по умолчанию.

Использование командной строки:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Использование пользовательского интерфейса: нажмите «Создать правило брандмауэра» и заполните информацию:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Создайте узлы для конвейера.

Теперь, когда у нас есть среда Cloud Data Fusion в GCP, давайте начнем создавать конвейеры обработки данных в Cloud Data Fusion, выполнив следующие шаги:

  1. В окне Cloud Data Fusion в столбце «Действие» нажмите ссылку «Просмотреть экземпляр». Вы будете перенаправлены на другую страницу. Щелкните указанный URL-адрес , чтобы открыть экземпляр Cloud Data Fusion. На ваш выбор нажмите кнопку «Начать ознакомительный тур» или «Нет, спасибо» во всплывающем окне приветствия.
  2. Разверните меню-гамбургер, выберите Pipeline -> List

317820def934a00a.png

  1. Нажмите зеленую кнопку «+» в правом верхнем углу, затем выберите «Создать конвейер». Или нажмите ссылку «Создать» для создания конвейера.

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. После появления окна «Студия конвейера» в левом верхнем углу выберите в выпадающем списке «Конвейер данных — в реальном времени» .

372a889a81da5e66.png

  1. В пользовательском интерфейсе конвейеров данных на левой панели вы увидите различные разделы: Фильтр, Источник, Преобразование, Аналитика, Приемник, Обработчики ошибок и Оповещения, где вы можете выбрать узел или узлы для конвейера.

c63de071d4580f2f.png

Выберите узел «Источник» .

  1. В разделе «Источник» на панели плагинов слева дважды щелкните узел Google Cloud PubSub , который появится в пользовательском интерфейсе Data Pipelines.
  2. Укажите на узел источника PubSub и нажмите «Свойства» .

ed857a5134148d7b.png

  1. Заполните обязательные поля. Установите следующие параметры:
  • Метка = {любой текст}
  • Название ссылки = {любой текст}
  • Идентификатор проекта = автоматическое определение
  • Подписка = Подписка, созданная в разделе «Создать тему публикации/подписки» (например, your-sub ).
  • Тема = Тема, созданная в разделе «Создать тему публикации/подтемы» (например, ваша тема ).
  1. Для получения подробного объяснения нажмите «Документация» . Нажмите кнопку «Проверить», чтобы проверить всю введенную информацию. Зеленая надпись «Ошибок не обнаружено» означает успешную проверку.

5c2774338b66bebe.png

  1. Чтобы закрыть окно «Свойства публикации/подписки», нажмите кнопку «X» .

Выберите узел «Преобразование» .

  1. В разделе «Преобразование» на панели плагинов слева дважды щелкните узел «Проекция» , который появится в пользовательском интерфейсе конвейеров данных. Соедините узел источника Pub/Sub с узлом преобразования «Проекция».
  2. Наведите указатель мыши на узел «Проекция» и щелкните «Свойства» .

b3a9a3878879bfd7.png

  1. Заполните обязательные поля. Установите следующие параметры:
  • Convert = преобразовать сообщение из байтового типа в строковый.
  • Поля для удаления = {любое поле}
  • Поля для сохранения : { сообщение , метка времени и атрибуты } ( например, атрибуты: ключ='имя файла':значение='пациенты', отправленные из Pub/Sub)
  • Поля для переименования = { сообщение , метка времени }
  1. Для получения подробного объяснения нажмите «Документация» . Нажмите кнопку «Проверить», чтобы проверить всю введенную информацию. Зеленая надпись «Ошибок не обнаружено» означает успешную проверку.

b8c2f8efe18234ff.png

  1. В разделе «Преобразование» на панели плагинов слева дважды щелкните узел Wrangler , который появится в пользовательском интерфейсе Data Pipelines. Соедините узел преобразования Projection с узлом преобразования Wrangler. Наведите указатель мыши на узел Wrangler и щелкните «Свойства» .

aa44a4db5fe6623a.png

  1. Нажмите на выпадающее меню «Действия» и выберите «Импорт» , чтобы импортировать сохраненную схему (например: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json ).
  2. Добавьте поле TIMESTAMP в схему вывода (если оно отсутствует), нажав кнопку «+» рядом с последним полем и установив флажок «Null».
  3. Заполните обязательные поля. Установите следующие параметры:
  • Метка = {любой текст}
  • Название поля ввода = {*}
  • Предварительное условие = { attributes.get("filename") != "patients" } для различения каждого типа записей или сообщений ( например, пациенты, поставщики услуг, аллергии и т. д. ), отправляемых из исходного узла PubSub.
  1. Для получения подробного объяснения нажмите «Документация» . Нажмите кнопку «Проверить», чтобы проверить всю введенную информацию. Зеленая надпись «Ошибок не обнаружено» означает успешную проверку.

3b8e552cd2e3442c.png

  1. Укажите желаемый порядок названий столбцов и удалите ненужные поля. Скопируйте следующий фрагмент кода и вставьте его в поле «Рецепт».
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. Для маскирования и обезличивания данных обратитесь к руководству Batch-Codelab - CSV to BigQuery via CDF . Или добавьте этот фрагмент кода mask-number SSN xxxxxxx#### в поле «Рецепт».
  2. Чтобы закрыть окно «Свойства преобразования», нажмите кнопку «X» .

Выберите узел «Приёмник».

  1. В разделе «Приемник» на панели плагинов слева дважды щелкните узел BigQuery , который появится в пользовательском интерфейсе конвейера данных. Соедините узел преобразования Wrangler с узлом приемника BigQuery.
  2. Наведите указатель мыши на узел приемника BigQuery и щелкните «Свойства».

1be711152c92c692.png

  1. Заполните обязательные поля:
  • Метка = {любой текст}
  • Название ссылки = {любой текст}
  • Идентификатор проекта = автоматическое определение
  • Dataset = Набор данных BigQuery, используемый в текущем проекте (например, DATASET_ID)
  • Таблица = {название таблицы}
  1. Для получения подробного объяснения нажмите «Документация» . Нажмите кнопку «Проверить», чтобы проверить всю введенную информацию. Зеленая надпись «Ошибок не обнаружено» означает успешную проверку.

bba71de9f31e842a.png

  1. Чтобы закрыть окно свойств BigQuery, нажмите кнопку X.

5. Создайте конвейер обработки данных в реальном времени.

В предыдущем разделе мы создали узлы, необходимые для построения конвейера обработки данных в Cloud Data Fusion. В этом разделе мы соединим узлы для построения самого конвейера.

Соединение всех узлов в конвейере

  1. Перетащите стрелку соединения > на правый край исходного узла и отпустите на левый край целевого узла.
  2. Конвейер может иметь несколько ветвей, которые получают опубликованные сообщения от одного и того же узла источника PubSub.

b22908cc35364cdd.png

  1. Назовите трубопровод.

Вот и всё. Вы только что создали свой первый конвейер обработки данных в реальном времени, который можно развернуть и запустить.

Отправляйте сообщения через Cloud Pub/Sub.

Использование пользовательского интерфейса публикации/подписки :

  1. Перейдите в консоль GCP -> Публикация/Подписка -> Темы, выберите свою тему , затем нажмите ОПУБЛИКОВАТЬ СООБЩЕНИЕ в верхнем меню.

d65b2a6af1668ecd.png

  1. В поле «Сообщение» указывайте только одну строку записи за раз. Нажмите кнопку «+ДОБАВИТЬ АТРИБУТ» . Укажите ключ = имя файла , значение = < тип записи > ( например, пациенты, поставщики услуг, аллергии и т. д. ).
  2. Нажмите кнопку «Опубликовать», чтобы отправить сообщение.

Используя команду gcloud :

  1. Введите сообщение вручную.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. Полуавтоматически передавайте сообщение с помощью команд Unix cat и sed . Эту команду можно запускать многократно с различными параметрами.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. Настройка, развертывание и запуск конвейера.

Теперь, когда мы разработали конвейер обработки данных, мы можем развернуть и запустить его в Cloud Data Fusion.

1bb5b0b8e2953ffa.png

  1. Оставьте значения по умолчанию в настройках .
  2. Нажмите «Предварительный просмотр» , чтобы просмотреть данные. Нажмите «Предварительный просмотр» еще раз, чтобы вернуться к предыдущему окну. Вы также можете запустить конвейер в режиме предварительного просмотра, нажав «Запустить».

b3c891e5e1aa20ae.png

  1. Нажмите «Журналы» , чтобы просмотреть журналы.
  2. Нажмите «Сохранить» , чтобы сохранить все изменения.
  3. Нажмите «Импорт» , чтобы импортировать сохраненную конфигурацию конвейера при создании нового конвейера.
  4. Нажмите «Экспорт» , чтобы экспортировать конфигурацию конвейера.
  5. Нажмите «Развернуть» , чтобы развернуть конвейер.
  6. После развертывания нажмите кнопку «Запустить» и дождитесь завершения выполнения конвейера.

f01ba6b746ba53a.png

  1. Нажмите «Стоп» , чтобы остановить выполнение конвейера в любое время.
  2. Вы можете продублировать конвейер, выбрав пункт «Дублировать» под кнопкой «Действия» .
  3. Вы можете экспортировать конфигурацию конвейера, выбрав пункт «Экспорт» в разделе « Действия» .

28ea4fc79445fad2.png

  1. Нажмите «Сводка» , чтобы отобразить диаграммы истории выполнения, записи, журналы ошибок и предупреждения.

7. Валидация

В этом разделе мы проверяем выполнение конвейера обработки данных.

  1. Убедитесь, что конвейер был успешно выполнен и работает непрерывно.

1644dfac4a2d819d.png

  1. Убедитесь, что таблицы BigQuery загружены обновленными записями на основе временной метки (TIMESTAMP). В этом примере две записи или сообщения о пациентах и ​​одна запись или сообщение об аллергии были опубликованы в тему Pub/Sub 25 июня 2019 года.
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. Убедитесь, что сообщения, опубликованные в теме <your-topic>, были получены подписчиком <your-sub>.
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

Просмотр результатов

Чтобы просмотреть результаты после публикации сообщений в тему Pub/Sub во время работы конвейера обработки в реальном времени:

  1. Выполните запрос к таблице в пользовательском интерфейсе BigQuery. ПЕРЕЙДИТЕ В ИНТЕРФЕЙС BIGQUERY
  2. Обновите приведенный ниже запрос, указав название вашего проекта, набор данных и таблицу.

6a1fb85bd868abc9.png

8. Уборка

Чтобы избежать списания средств с вашего счета Google Cloud Platform за ресурсы, использованные в этом руководстве:

После завершения обучения вы можете очистить созданные вами ресурсы в GCP, чтобы они не занимали квоты и вам не выставлялись за них счета в будущем. В следующих разделах описано, как удалить или отключить эти ресурсы.

Удаление набора данных BigQuery

Следуйте этим инструкциям, чтобы удалить набор данных BigQuery, созданный вами в рамках этого руководства.

Удаление сегмента GCS

Следуйте этим инструкциям, чтобы удалить созданный вами в рамках этого руководства сегмент GCS .

Удаление экземпляра Cloud Data Fusion

Следуйте этим инструкциям, чтобы удалить свой экземпляр Cloud Data Fusion .

Удаление проекта

Самый простой способ избежать выставления счетов — удалить проект, созданный для этого урока.

Чтобы удалить проект:

  1. В консоли GCP перейдите на страницу «Проекты» . ПЕРЕЙДИТЕ НА СТРАНИЦУ ПРОЕКТОВ
  2. В списке проектов выберите проект, который хотите удалить, и нажмите «Удалить» .
  3. В диалоговом окне введите идентификатор проекта, а затем нажмите «Завершить» , чтобы удалить проект.

9. Поздравляем!

Поздравляем, вы успешно завершили практическое задание по загрузке медицинских данных в BigQuery с помощью Cloud Data Fusion.

Вы опубликовали данные в формате CSV в топик Pub/Sub, а затем загрузили их в BigQuery.

Вы визуально создали конвейер интеграции данных для загрузки, преобразования и маскирования медицинских данных в режиме реального времени.

Теперь вы знаете ключевые шаги, необходимые для начала работы с BigQuery на платформе Google Cloud Platform в сфере анализа данных в здравоохранении.