Прием данных в формате CSV (значения, разделенные запятыми) в BigQuery с помощью Cloud Data Fusion – прием в режиме реального времени.

1. Введение

509db33558ae025.png

Последнее обновление: 28 февраля 2020 г.

В этой кодовой лаборатории демонстрируется шаблон приема данных для приема данных здравоохранения в формате CSV в BigQuery в режиме реального времени. Для этой лабораторной работы мы будем использовать конвейер данных Cloud Data Fusion в реальном времени. Реалистичные данные медицинских тестов были созданы и доступны для вас в сегменте Google Cloud Storage (gs://hcls_testing_data_fhir_10_phases/csv/).

В этой лабораторной работе по кодированию вы узнаете:

  • Как импортировать данные CSV (загрузка в реальном времени) из Pub/Sub в BigQuery с помощью Cloud Data Fusion .
  • Как визуально построить конвейер интеграции данных в Cloud Data Fusion для загрузки, преобразования и маскировки медицинских данных в режиме реального времени .

Что вам нужно для запуска этой демонстрации?

  • Вам нужен доступ к проекту GCP.
  • Вам должна быть назначена роль владельца проекта GCP.
  • Данные о здравоохранении в формате CSV, включая заголовок.

Если у вас нет проекта GCP, выполните следующие действия , чтобы создать новый проект GCP.

Медицинские данные в формате CSV были предварительно загружены в корзину GCS по адресу gs://hcls_testing_data_fhir_10_phases/csv/ . Каждый файл ресурсов CSV имеет уникальную структуру схемы. Например, в файле Patients.csv используется другая схема, чем в файле Providers.csv. Предварительно загруженные файлы схем можно найти по адресу gs://hcls_testing_data_fhir_10_phases/csv_schemas .

Если вам понадобится новый набор данных, вы всегда можете сгенерировать его с помощью SyntheaTM . Затем загрузите его в GCS вместо копирования из корзины на этапе копирования входных данных.

2. Настройка проекта GCP

Инициализируйте переменные оболочки для вашей среды.

Чтобы найти PROJECT_ID , обратитесь к разделу Идентификация проектов .

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Создайте сегмент GCS для хранения входных данных и журналов ошибок с помощью инструмента gsutil .

gsutil mb -l us gs://$BUCKET_NAME

Получите доступ к синтетическому набору данных.

  1. С адреса электронной почты, который вы используете для входа в Cloud Console, отправьте электронное письмо на адрес hcls-solutions-external+subscribe@google.com с просьбой присоединиться.
  2. Вы получите электронное письмо с инструкциями о том, как подтвердить действие.
  3. Используйте возможность ответить на электронное письмо, чтобы присоединиться к группе. НЕ нажимайте кнопку 525a0fa752e0acae.png кнопка.
  4. Как только вы получите электронное письмо с подтверждением, вы сможете перейти к следующему шагу в кодовой лаборатории.

Скопируйте входные данные.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Создайте набор данных BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

Установите и инициализируйте Google Cloud SDK , а также создайте публикацию или подтему и подписки.

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Настройка среды Cloud Data Fusion

Выполните следующие действия, чтобы включить API Cloud Data Fusion и предоставить необходимые разрешения:

Включите API .

  1. Перейдите в библиотеку API консоли GCP .
  2. В списке проектов выберите свой проект.
  3. В библиотеке API выберите API, который вы хотите включить ( Cloud Data Fusion API, Cloud Pub/Sub API). Если вам нужна помощь в поиске API, воспользуйтесь полем поиска и фильтрами.
  4. На странице API нажмите ВКЛЮЧИТЬ .

Создайте экземпляр Cloud Data Fusion .

  1. В консоли GCP выберите свой ProjectID.
  2. Выберите «Data Fusion» в левом меню, затем нажмите кнопку «СОЗДАТЬ ЭКЗЕМПЛЯР» в середине страницы (первое создание) или нажмите кнопку «СОЗДАТЬ ЭКЗЕМПЛЯР» в верхнем меню (дополнительное создание).

а828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. Укажите имя экземпляра. Выберите Предприятие .

5af91e46917260ff.png

  1. Нажмите кнопку СОЗДАТЬ.

Настройте разрешения экземпляра.

После создания экземпляра выполните следующие действия, чтобы предоставить учетной записи службы, связанной с разрешениями экземпляра, в вашем проекте:

  1. Перейдите на страницу сведений об экземпляре, щелкнув имя экземпляра.

76ad691f795e1ab3.png

  1. Скопируйте учетную запись службы.

6c91836afb72209d.png

  1. Перейдите на страницу IAM вашего проекта.
  2. На странице разрешений IAM предоставьте сервисному аккаунту роль агента службы Cloud Data Fusion API, нажав кнопку «Добавить» . Вставьте «служебный аккаунт» в поле «Новые участники» и выберите «Управление услугами» -> «Роль агента сервера Cloud Data Fusion API».

36f03d11c2a4ce0.png

  1. Нажмите + Добавить другую роль (или Изменить агент службы Cloud Data Fusion API), чтобы добавить роль подписчика Pub/Sub.

b4bf5500b8cbe5f9.png

  1. Нажмите Сохранить .

После выполнения этих шагов вы можете начать использовать Cloud Data Fusion, щелкнув ссылку «Просмотреть экземпляр» на странице экземпляров Cloud Data Fusion или странице сведений об экземпляре.

Настройте правило брандмауэра.

  1. Перейдите в Консоль GCP -> Сеть VPC -> Правила брандмауэра, чтобы проверить, существует ли правило default-allow-ssh или нет.

102adef44bbe3a45.png

  1. Если нет, добавьте правило брандмауэра, которое разрешает весь входящий SSH-трафик в сеть по умолчанию.

Используя командную строку:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Использование пользовательского интерфейса: нажмите «Создать правило брандмауэра» и заполните информацию:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Создайте узлы для конвейера

Теперь, когда у нас есть среда Cloud Data Fusion в GCP, давайте начнем создавать конвейеры данных в Cloud Data Fusion, выполнив следующие шаги:

  1. В окне Cloud Data Fusion нажмите ссылку «Просмотреть экземпляр» в столбце «Действие». Вы будете перенаправлены на другую страницу. Нажмите предоставленный URL-адрес , чтобы открыть экземпляр Cloud Data Fusion. На ваш выбор: нажать кнопку «Начать тур» или кнопку «Нет, спасибо» во всплывающем окне приветствия.
  2. Разверните меню «гамбургер», выберите Pipeline -> List.

317820def934a00a.png

  1. Нажмите зеленую кнопку + в правом верхнем углу, затем выберите «Создать конвейер». Или нажмите «Создать» ссылку на конвейер.

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. Когда появится студия конвейера, в левом верхнем углу выберите «Конвейер данных — в реальном времени» в раскрывающемся списке.

372a889a81da5e66.png

  1. В пользовательском интерфейсе конвейеров данных на левой панели вы увидите различные разделы: «Фильтр», «Источник», «Преобразование», «Аналитика», «Приемник», «Обработчики ошибок» и «Предупреждения», где вы можете выбрать узел или узлы для конвейера.

c63de071d4580f2f.png

Выберите узел источника .

  1. В разделе «Источник» на палитре плагинов слева дважды щелкните узел Google Cloud PubSub , который появляется в пользовательском интерфейсе конвейеров данных.
  2. Укажите на исходный узел PubSub и нажмите «Свойства» .

ed857a5134148d7b.png

  1. Заполните необходимые поля. Задайте следующие поля:
  • Метка = {любой текст}
  • Имя ссылки = {любой текст}
  • Идентификатор проекта = автоматическое определение
  • Подписка = подписка, созданная в разделе «Создать тему Pub/Sub» (например, your-sub ).
  • Тема = тема, созданная в разделе «Создать публикацию/подтему» ​​(например, your-topic ).
  1. Нажмите «Документация» для подробного объяснения. Нажмите кнопку «Проверить», чтобы проверить всю входную информацию. Зеленый цвет «Ошибок не обнаружено» указывает на успех.

5c2774338b66bebe.png

  1. Чтобы закрыть свойства публикации/подписки, нажмите кнопку X.

Выберите узел Преобразование .

  1. В разделе «Преобразование» на палитре плагинов слева дважды щелкните узел «Проекция» , который появляется в пользовательском интерфейсе конвейеров данных. Подключите исходный узел Pub/Sub к узлу преобразования проекции.
  2. Укажите узел «Проекция» и нажмите «Свойства» .

b3a9a3878879bfd7.png

  1. Заполните необходимые поля. Задайте следующие поля:
  • Convert = преобразовать сообщение из байтового типа в строковый.
  • Поля для удаления = {любое поле}
  • Поля для сохранения = { message , timestamp и атрибуты } ( например, атрибуты: key='filename':value='peoples', отправленные из Pub/Sub)
  • Поля для переименования = { сообщение , метка времени }
  1. Нажмите «Документация» для подробного объяснения. Нажмите кнопку «Проверить», чтобы проверить всю входную информацию. Зеленый цвет «Ошибок не обнаружено» указывает на успех.

b8c2f8efe18234ff.png

  1. В разделе «Преобразование» на палитре плагинов слева дважды щелкните узел Wrangler , который появляется в пользовательском интерфейсе конвейеров данных. Соедините узел преобразования проекции с узлом преобразования Wrangler. Наведите указатель мыши на узел Wrangler и нажмите «Свойства» .

аа44а4db5fe6623a.png

  1. Нажмите раскрывающийся список «Действия» и выберите «Импорт» , чтобы импортировать сохраненную схему (например: gs://hcls_testing_data_fhir_10_phases/csv_schemas/ schema (Patients).json ).
  2. Добавьте поле TIMESTAMP в схему вывода (если оно не существует), нажав кнопку + рядом с последним полем и установите флажок «Null».
  3. Заполните необходимые поля. Задайте следующие поля:
  • Метка = {любой текст}
  • Имя поля ввода = {*}
  • Предварительное условие = { атрибуты.get("filename") != "пациенты" }, чтобы различать каждый тип записи или сообщения ( например, пациенты, поставщики услуг, аллергии и т. д. ), отправленные из исходного узла PubSub.
  1. Нажмите «Документация» для получения подробного объяснения. Нажмите кнопку «Проверить», чтобы проверить всю входную информацию. Зеленый цвет «Ошибок не обнаружено» указывает на успех.

3b8e552cd2e3442c.png

  1. Установите имена столбцов в предпочтительном порядке и удалите ненужные поля. Скопируйте следующий фрагмент кода и вставьте его в поле «Рецепт».
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. См. Batch-Codelab — CSV в BigQuery через CDF для маскировки и деидентификации данных. Или добавьте этот фрагмент кода с номером маски SSN xxxxxxx#### в поле «Рецепт».
  2. Чтобы закрыть окно «Свойства преобразования», нажмите кнопку X.

Выберите узел Раковина.

  1. В разделе «Приёмник» на палитре плагинов слева дважды щелкните узел BigQuery , который появляется в пользовательском интерфейсе конвейера данных. Подключите узел преобразования Wrangler к узлу приемника BigQuery.
  2. Наведите указатель мыши на узел приемника BigQuery и нажмите «Свойства».

1be711152c92c692.png

  1. Заполните необходимые поля:
  • Метка = {любой текст}
  • Имя ссылки = {любой текст}
  • Идентификатор проекта = автоматическое определение
  • Набор данных = набор данных BigQuery, используемый в текущем проекте (например, DATASET_ID).
  • Таблица = {имя таблицы}
  1. Нажмите «Документация» для подробного объяснения. Нажмите кнопку «Проверить», чтобы проверить всю входную информацию. Зеленый цвет «Ошибок не обнаружено» указывает на успех.

bba71de9f31e842a.png

  1. Чтобы закрыть свойства BigQuery, нажмите кнопку X.

5. Создайте конвейер данных в реальном времени.

В предыдущем разделе мы создали узлы, необходимые для построения конвейера данных в Cloud Data Fusion. В этом разделе мы соединяем узлы для построения настоящего конвейера.

Соединение всех узлов в конвейере

  1. Перетащите стрелку соединения > на правый край исходного узла и опустите на левый край узла назначения.
  2. Конвейер может иметь несколько ветвей, которые получают опубликованные сообщения из одного и того же узла источника PubSub.

b22908cc35364cdd.png

  1. Назовите трубопровод.

Вот и все. Вы только что создали свой первый конвейер данных в реальном времени, который нужно развернуть и запустить.

Отправка сообщений через Cloud Pub/Sub

Использование пользовательского интерфейса Pub/Sub :

  1. Перейдите в консоль GCP -> Pub/Sub -> Topics, выберите свою тему , затем нажмите «ОПУБЛИКОВАТЬ СООБЩЕНИЕ» в верхнем меню.

d65b2a6af1668ecd.png

  1. Помещайте в поле «Сообщение» только одну строку записи за раз. Нажмите кнопку +ДОБАВИТЬ АТРИБУТ . Укажите Key = имя файла , Value = < тип записи > ( например, пациенты, поставщики услуг, аллергия и т. д. ).
  2. Нажмите кнопку «Опубликовать», чтобы отправить сообщение.

Используя команду gcloud :

  1. Введите сообщение вручную.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. Полуавтоматически предоставьте сообщение с помощью команд unix cat и sed . Эту команду можно запускать неоднократно с разными параметрами.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. Настройка, развертывание и запуск конвейера

Теперь, когда мы разработали конвейер данных, мы можем развернуть и запустить его в Cloud Data Fusion.

1bb5b0b8e2953ffa.png

  1. Сохраните настройки по умолчанию.
  2. Нажмите «Просмотр» , чтобы просмотреть данные**.** Нажмите «Просмотр» еще раз, чтобы вернуться к предыдущему окну. Вы также можете запустить конвейер в режиме предварительного просмотра, нажав **ЗАПУСК**.

b3c891e5e1aa20ae.png

  1. Нажмите Журналы , чтобы просмотреть журналы.
  2. Нажмите «Сохранить» , чтобы сохранить все изменения.
  3. Нажмите «Импортировать» , чтобы импортировать сохраненную конфигурацию конвейера при создании нового конвейера.
  4. Нажмите «Экспорт» , чтобы экспортировать конфигурацию конвейера.
  5. Нажмите «Развернуть» , чтобы развернуть конвейер.
  6. После развертывания нажмите «Выполнить» и дождитесь завершения работы конвейера.

f01ba6b746ba53a.png

  1. Нажмите «Стоп» , чтобы в любой момент остановить работу конвейера.
  2. Вы можете дублировать конвейер, выбрав «Дублировать» под кнопкой «Действия» .
  3. Вы можете экспортировать конфигурацию конвейера, выбрав «Экспорт» под кнопкой «Действия» .

28ea4fc79445fad2.png

  1. Нажмите «Сводка» , чтобы отобразить диаграммы истории запусков, записей, журналов ошибок и предупреждений.

7. Проверка

В этом разделе мы проверяем работу конвейера данных.

  1. Убедитесь, что конвейер выполнен успешно и работает непрерывно.

1644dfac4a2d819d.png

  1. Убедитесь, что таблицы BigQuery загружены обновленными записями на основе TIMESTAMP. В этом примере две записи или сообщения о пациентах и ​​одна запись или сообщение об аллергии были опубликованы в теме Pub/Sub 25 июня 2019 г.
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. Убедитесь, что сообщения, опубликованные в <your-topic>, были получены подписчиком <your-sub>.
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

Просмотр результатов

Чтобы просмотреть результаты после публикации сообщений в теме Pub/Sub во время работы конвейера Realtime:

  1. Запросите таблицу в пользовательском интерфейсе BigQuery. ПЕРЕХОДИТЬ В пользовательский интерфейс BIGQUERY
  2. Обновите запрос ниже, указав собственное имя проекта, набор данных и таблицу.

6a1fb85bd868abc9.png

8. Уборка

Чтобы избежать списания средств с вашей учетной записи Google Cloud Platform за ресурсы, используемые в этом руководстве:

После завершения обучения вы можете очистить ресурсы, созданные вами в GCP, чтобы они не занимали квоту и вам не выставлялись счета за них в будущем. В следующих разделах описано, как удалить или отключить эти ресурсы.

Удаление набора данных BigQuery

Следуйте этим инструкциям, чтобы удалить набор данных BigQuery, созданный вами в рамках этого руководства.

Удаление сегмента GCS

Следуйте этим инструкциям, чтобы удалить сегмент GCS, созданный вами в рамках этого руководства.

Удаление экземпляра Cloud Data Fusion

Следуйте этим инструкциям, чтобы удалить экземпляр Cloud Data Fusion .

Удаление проекта

Самый простой способ избавиться от выставления счетов — удалить проект, созданный вами для этого руководства.

Чтобы удалить проект:

  1. В консоли GCP перейдите на страницу «Проекты» . ПЕРЕЙТИ НА СТРАНИЦУ ПРОЕКТОВ
  2. В списке проектов выберите проект, который хотите удалить, и нажмите «Удалить» .
  3. В диалоговом окне введите идентификатор проекта, а затем нажмите «Завершить работу» , чтобы удалить проект.

9. Поздравления

Поздравляем, вы успешно завершили лабораторную работу по кодированию для приема медицинских данных в BigQuery с помощью Cloud Data Fusion.

Вы опубликовали данные CSV в теме Pub/Sub, а затем загрузили их в BigQuery.

Вы визуально построили конвейер интеграции данных для загрузки, преобразования и маскировки медицинских данных в режиме реального времени.

Теперь вы знаете ключевые шаги, необходимые для начала работы в области анализа данных здравоохранения с помощью BigQuery на Google Cloud Platform.