Прием данных CSV в BigQuery с помощью Cloud Data Fusion – пакетный прием

1. Введение

12fb66cc134b50ef.png

Последнее обновление: 28.02.2020

В этом практическом занятии демонстрируется шаблон загрузки данных в формате CSV из медицинских учреждений в BigQuery в пакетном режиме. Для этого занятия мы будем использовать конвейер пакетной обработки данных Cloud Data Fusion. Реалистичные данные медицинских анализов были сгенерированы и предоставлены вам в хранилище Google Cloud Storage ( gs://hcls_testing_data_fhir_10_patients/csv/ ).

В этой практической работе по программированию вы узнаете:

  • Как загрузить данные из CSV-файлов (пакетная загрузка) из GCS в BigQuery с помощью Cloud Data Fusion .
  • Как визуально построить конвейер интеграции данных в Cloud Data Fusion для массовой загрузки, преобразования и маскирования данных здравоохранения .

Что вам понадобится для проведения этого практического занятия?

  • Вам необходим доступ к проекту GCP.
  • Для участия в проекте GCP вам должна быть назначена роль владельца.
  • Медицинские данные в формате CSV, включая заголовок.

Если у вас нет проекта GCP, выполните следующие шаги для создания нового проекта GCP.

Данные о здравоохранении в формате CSV предварительно загружены в хранилище GCS по адресу gs://hcls_testing_data_fhir_10_patients/csv/ . Каждый CSV-файл ресурса имеет свою уникальную структуру схемы. Например, файл Patients.csv имеет другую схему, чем Providers.csv. Предварительно загруженные файлы схем можно найти по адресу gs://hcls_testing_data_fhir_10_patients/csv_schemas .

Если вам нужен новый набор данных, вы всегда можете сгенерировать его с помощью Synthea™ . Затем загрузите его в GCS вместо копирования из хранилища на этапе копирования входных данных.

2. Настройка проекта GCP

Инициализируйте переменные оболочки для вашей среды.

Чтобы найти PROJECT_ID , обратитесь к разделу «Идентификация проектов» .

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Создайте хранилище GCS для хранения входных данных и журналов ошибок с помощью инструмента gsutil .

gsutil mb -l us gs://$BUCKET_NAME

Получите доступ к синтетическому набору данных.

  1. Отправьте письмо на адрес hcls-solutions-external+subscribe@google.com с того адреса электронной почты, который вы используете для входа в Cloud Console, с просьбой о присоединении.
  2. Вы получите электронное письмо с инструкциями по подтверждению действия. 525a0fa752e0acae.png
  3. Воспользуйтесь возможностью ответить на электронное письмо, чтобы присоединиться к группе. НЕ нажимайте на кнопку.
  4. После получения подтверждающего письма вы можете перейти к следующему шагу в практическом задании.

Скопировать входные данные.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Создайте набор данных BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

3. Настройка среды облачного слияния данных

Выполните следующие шаги, чтобы включить API Cloud Data Fusion и предоставить необходимые разрешения:

Включить API .

  1. Перейдите в библиотеку API консоли GCP .
  2. Выберите свой проект из списка проектов.
  3. В библиотеке API выберите API, который хотите включить. Если вам нужна помощь в поиске API, воспользуйтесь полем поиска и/или фильтрами.
  4. На странице API нажмите «Включить».

Создайте экземпляр Cloud Data Fusion .

  1. В консоли GCP выберите свой идентификатор проекта (ProjectID).
  2. Выберите Data Fusion в левом меню, затем нажмите кнопку «СОЗДАТЬ ЭКЗЕМПЛЯР» в середине страницы (первое создание) или кнопку «СОЗДАТЬ ЭКЗЕМПЛЯР» в верхнем меню (дополнительное создание).

a828690ff3bf3c46.png

8372c944c94737ea.png

  1. Укажите имя экземпляра. Выберите «Предприятие».

5af91e46917260ff.png

  1. Нажмите кнопку СОЗДАТЬ.

Настройка прав доступа к экземпляру.

После создания экземпляра выполните следующие действия, чтобы предоставить учетной записи службы, связанной с экземпляром, разрешения в вашем проекте:

  1. Чтобы перейти на страницу с подробной информацией об экземпляре, щелкните по его названию.

76ad691f795e1ab3.png

  1. Скопируйте учетную запись службы.

6c91836afb72209d.png

  1. Перейдите на страницу IAM вашего проекта.
  2. На странице разрешений IAM мы добавим учетную запись службы в качестве нового участника и предоставим ей роль агента службы Cloud Data Fusion API . Нажмите кнопку «Добавить» , затем вставьте «учетную запись службы» в поле «Новые участники» и выберите «Управление службами» -> «Роль агента сервера Cloud Data Fusion API».
  3. ea68b28d917a24b1.png
  4. Нажмите « Сохранить ».

После выполнения этих шагов вы можете начать использовать Cloud Data Fusion, щелкнув ссылку « Просмотреть экземпляр» на странице экземпляров Cloud Data Fusion или на странице с подробными сведениями об экземпляре.

Настройте правило брандмауэра.

  1. Перейдите в консоль GCP -> Сеть VPC -> Правила брандмауэра, чтобы проверить, существует ли правило default-allow-ssh.

102adef44bbe3a45.png

  1. В противном случае добавьте правило брандмауэра, разрешающее весь входящий SSH-трафик в сеть по умолчанию.

Использование командной строки:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Использование пользовательского интерфейса: нажмите «Создать правило брандмауэра» и заполните информацию:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Создайте схему для преобразования.

Теперь, когда у нас есть среда Cloud Fusion в GCP, давайте создадим схему. Эта схема необходима для преобразования данных CSV.

  1. В окне Cloud Data Fusion в столбце «Действие» нажмите ссылку «Просмотреть экземпляр». Вы будете перенаправлены на другую страницу. Щелкните указанный URL-адрес , чтобы открыть экземпляр Cloud Data Fusion. На ваш выбор нажмите кнопку «Начать ознакомительный тур» или «Нет, спасибо» во всплывающем окне приветствия.
  2. Разверните меню-гамбургер, выберите Pipeline -> Studio

6561b13f30e36c3a.png

  1. В разделе «Преобразование» на панели плагинов слева дважды щелкните узел Wrangler, который появится в пользовательском интерфейсе Data Pipelines.

aa44a4db5fe6623a.png

  1. Наведите указатель мыши на узел Wrangler и нажмите «Свойства» . Нажмите кнопку Wrangler , затем выберите исходный файл .csv (например, patients.csv), который должен содержать все поля данных для построения нужной схемы.
  2. Нажмите стрелку вниз (Преобразования столбцов) рядом с названием каждого столбца (например, body). 802edca8a97da18.png
  3. По умолчанию при первоначальном импорте предполагается, что в вашем файле данных только один столбец. Чтобы преобразовать его в CSV-файл, выберите «Преобразовать в CSV» , затем выберите разделитель и установите флажок «Установить первую строку в качестве заголовка» при необходимости. Нажмите кнопку «Применить».
  4. Щелкните стрелку вниз рядом с полем «Тело», выберите «Удалить столбец», чтобы удалить поле «Тело». Кроме того, вы можете попробовать другие преобразования, такие как удаление столбцов, изменение типа данных для некоторых столбцов (по умолчанию — строковый тип), разделение столбцов, присвоение имен столбцам и т. д.

e6d2cda51ff298e7.png

  1. На вкладках «Столбцы» и «Этапы преобразования» отображается схема вывода и алгоритм Wrangler. Нажмите «Применить» в правом верхнем углу. Нажмите кнопку «Проверить». Зеленое сообщение «Ошибок не обнаружено» указывает на успешное выполнение.

1add853c43f2abee.png

  1. В свойствах Wrangler щелкните раскрывающееся меню «Действия» , чтобы экспортировать нужную схему в локальное хранилище для последующего импорта при необходимости.
  2. Сохраните рецепт Wrangler для дальнейшего использования.
parse-as-csv :body ',' true
drop body
  1. Чтобы закрыть окно «Свойства Wrangler», нажмите кнопку «X» .

5. Создайте узлы для конвейера.

В этом разделе мы будем создавать компоненты конвейера.

  1. В пользовательском интерфейсе «Конвейеры данных» в левом верхнем углу вы должны увидеть, что в качестве типа конвейера выбран «Конвейер данных — Пакетная обработка» .

af67c42ce3d98529.png

  1. На левой панели расположены различные разделы: Фильтр, Источник, Преобразование, Аналитика, Приемник, Условия и действия, Обработчики ошибок и Оповещения, где можно выбрать узел или узлы для конвейера.

c4438f7682f8b19b.png

Исходный узел

  1. Выберите узел «Источник».
  2. В разделе «Источник» на панели плагинов слева дважды щелкните узел Google Cloud Storage , который появится в пользовательском интерфейсе Data Pipelines.
  3. Укажите на узел источника GCS и нажмите «Свойства» .

87e51a3e8dae8b3f.png

  1. Заполните обязательные поля. Установите следующие параметры:
  • Метка = {любой текст}
  • Название ссылки = {любой текст}
  • Идентификатор проекта = автоматическое определение
  • Путь = URL-адрес GCS для хранилища в вашем текущем проекте. Например, gs://$BUCKET_NAME/csv/
  • Формат = текст
  • Поле «Путь» = имя файла
  • Путь только к имени файла = true
  • Read Files Recursively = true
  1. Добавьте поле 'filename' в схему вывода GCS, нажав кнопку "+" .
  2. Для получения подробных пояснений нажмите «Документация» . Нажмите кнопку «Проверить». Зеленая надпись «Ошибок не обнаружено» означает успешное выполнение проверки.
  3. Чтобы закрыть окно свойств GCS, нажмите кнопку X.

Узел преобразования

  1. Выберите узел «Преобразование».
  2. В разделе «Преобразование» на панели плагинов слева дважды щелкните узел Wrangler , который появится в пользовательском интерфейсе конвейеров данных. Соедините исходный узел GCS с узлом преобразования Wrangler.
  3. Наведите указатель мыши на узел Wrangler и щелкните «Свойства» .
  4. Нажмите на выпадающее меню «Действия» и выберите «Импорт» , чтобы импортировать сохраненную схему (например: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema(Patients).json ), а затем вставьте сохраненный рецепт из предыдущего раздела.
  5. Или же используйте узел Wrangler из раздела: Создание схемы для преобразования .
  6. Заполните обязательные поля. Установите следующие параметры:
  • Метка = {любой текст}
  • Название поля ввода = {*}
  • Precondition = {filename != "patients.csv"} для различения каждого входного файла ( например, patients.csv, providers.csv, allergys.csv и т. д. ) от узла Source.

2426f8f0a6c4c670.png

  1. Добавьте узел JavaScript для выполнения предоставленного пользователем JavaScript-кода, который дополнительно преобразует записи. В этом практическом занятии мы используем узел JavaScript для получения метки времени для каждого обновления записи. Соедините узел преобразования Wrangler с узлом преобразования JavaScript. Откройте свойства JavaScript и добавьте следующую функцию:

75212f9ad98265a8.png

function transform(input, emitter, context) {
  input.TIMESTAMP = (new Date()).getTime()*1000;
  emitter.emit(input);
}
  1. Добавьте поле с именем TIMESTAMP в схему вывода (если оно отсутствует), нажав знак «+» . Выберите тип данных «метка времени».

4227389b57661135.png

  1. Для получения подробного объяснения нажмите «Документация» . Нажмите кнопку «Проверить», чтобы проверить всю введенную информацию. Зеленая надпись «Ошибок не обнаружено» означает успешную проверку.
  2. Чтобы закрыть окно «Свойства преобразования», нажмите кнопку «X» .

Маскирование и обезличивание данных

  1. Вы можете выбрать отдельные столбцы данных, щелкнув стрелку вниз в столбце и применив правила маскирования в разделе «Маскировка выбранных данных» в соответствии с вашими требованиями (например, столбец SSN).

bb1eb067dd6e0946.png

  1. В окне «Рецепт» узла Wrangler можно добавить дополнительные директивы. Например, можно использовать директиву `hash` с алгоритмом хеширования, следующим синтаксисом для целей деидентификации:
hash <column> <algorithm> <encode>

<column>: name of the column
<algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.)
<encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

cbcc9a0932f53197.png

Узел-приемник

  1. Выберите узел-приемник.
  2. В разделе «Приемник» на панели плагинов слева дважды щелкните узел BigQuery, который появится в пользовательском интерфейсе конвейера данных.
  3. Наведите указатель мыши на узел приемника BigQuery и щелкните «Свойства».

1be711152c92c692.png

  1. Заполните обязательные поля. Установите следующие параметры:
  • Метка = {любой текст}
  • Название ссылки = {любой текст}
  • Идентификатор проекта = автоматическое определение
  • Dataset = Набор данных BigQuery, используемый в текущем проекте (т.е. DATASET_ID)
  • Таблица = {название таблицы}
  1. Для получения подробного объяснения нажмите «Документация» . Нажмите кнопку «Проверить», чтобы проверить всю введенную информацию. Зеленая надпись «Ошибок не обнаружено» означает успешную проверку.

c5585747da2ef341.png

  1. Чтобы закрыть окно свойств BigQuery, нажмите кнопку X.

6. Создание конвейера пакетной обработки данных

Соединение всех узлов в конвейере

  1. Перетащите стрелку соединения > на правый край исходного узла и отпустите на левый край целевого узла.
  2. Конвейер обработки данных может иметь несколько ветвей, получающих входные файлы из одного и того же узла источника GCS.

67510ab46bd44d36.png

  1. Назовите трубопровод.

Вот и все. Вы только что создали свой первый конвейер пакетной обработки данных и можете развернуть и запустить его.

Отправлять оповещения о ходе выполнения проекта по электронной почте (необязательно)

Для использования функции Pipeline Alert SendEmail требуется настроить почтовый сервер для отправки почты с виртуальной машины. Дополнительную информацию см. по ссылке ниже:

Отправка электронных писем из экземпляра | Документация Compute Engine

В этом практическом задании мы настроим службу пересылки почты с помощью Mailgun, выполнив следующие шаги:

  1. Следуйте инструкциям в разделе «Отправка электронных писем с помощью Mailgun | Документация Compute Engine» , чтобы создать учетную запись в Mailgun и настроить службу пересылки электронной почты. Дополнительные изменения описаны ниже.
  2. Добавьте все адреса электронной почты получателей в авторизованный список Mailgun. Этот список можно найти в меню Mailgun > Отправка > Обзор на левой панели.

7e6224cced3fa4e0.pngfa78739f1ddf2dc2.png

После того как получатели нажмут кнопку «Я согласен» в электронном письме, отправленном с адреса support@mailgun.net , их адреса электронной почты будут сохранены в списке авторизованных получателей для получения уведомлений о ходе выполнения проекта.

72847c97fd5fce0f.png

  1. Шаг 3 раздела «Прежде чем начать» — создайте правило брандмауэра следующим образом:

75b063c165091912.png

  1. Шаг 3 раздела «Настройка Mailgun в качестве почтового ретранслятора с Postfix». Выберите «Интернет-сайт» или «Интернет с помощью smarthost» вместо «Только локальный» , как указано в инструкциях.

8fd8474a4ef18f16.png

  1. Шаг 4 раздела «Настройка Mailgun в качестве почтового ретранслятора с Postfix». Отредактируйте файл /etc/postfix/main.cf , добавив 10.128.0.0/9 в конец строки mynetworks .

249fbf3edeff1ce8.png

  1. Отредактируйте файл /etc/postfix/master.cf с помощью vi , чтобы изменить значение SMTP по умолчанию (25) на порт 587.

86c82cf48c687e72.png

  1. В правом верхнем углу Data Fusion Studio нажмите «Настроить» . Нажмите «Оповещение конвейера» и нажмите кнопку «+» , чтобы открыть окно «Оповещения» . Выберите «Отправить по электронной почте» .

dc079a91f1b0da68.png

  1. Заполните форму настройки электронной почты . Для каждого типа оповещения выберите завершение, успех или неудачу из раскрывающегося списка «Условие выполнения» . Если Include Workflow Token = false , отправляется только информация из поля «Сообщение». Если Include Workflow Token = true , отправляется информация из поля «Сообщение» и подробная информация о токене рабочего процесса. Для поля «Протокол» необходимо использовать строчные буквы . В поле «Отправитель » используйте любой « фиктивный » адрес электронной почты, кроме корпоративного адреса.

1fa619b6ce28f5e5.png

7. Настройка, развертывание, запуск/планирование конвейера.

db612e62a1c7ab7e.png

  1. В правом верхнем углу Data Fusion Studio нажмите «Настроить» . Выберите Spark в качестве конфигурации движка. В окне «Настройка» нажмите «Сохранить».

8ecf7c243c125882.png

  1. Нажмите «Предварительный просмотр» , чтобы просмотреть данные, и нажмите «Предварительный просмотр» еще раз, чтобы вернуться к предыдущему окну. Вы также можете запустить конвейер в режиме предварительного просмотра.

b3c891e5e1aa20ae.png

  1. Нажмите «Журналы» , чтобы просмотреть журналы.
  2. Нажмите «Сохранить» , чтобы сохранить все изменения.
  3. Нажмите «Импорт» , чтобы импортировать сохраненную конфигурацию конвейера при создании нового конвейера.
  4. Нажмите «Экспорт» , чтобы экспортировать конфигурацию конвейера.
  5. Нажмите «Развернуть» , чтобы развернуть конвейер.
  6. После развертывания нажмите кнопку «Запустить» и дождитесь завершения выполнения конвейера.

bb06001d46a293db.png

  1. Вы можете продублировать конвейер, выбрав пункт «Дублировать» под кнопкой «Действия» .
  2. Экспорт конфигурации конвейера можно осуществить, выбрав пункт «Экспорт» в меню « Действия» .
  3. Чтобы при необходимости настроить триггеры конвейера, щелкните «Входящие триггеры» или «Исходящие триггеры» в левой или правой части окна Studio.
  4. Нажмите «Расписание» , чтобы запланировать периодическое выполнение конвейера и загрузку данных.

4167fa67550a49d5.png

  1. В сводке отображаются диаграммы истории выполнения, записи, журналы ошибок и предупреждения.

8. Валидация

  1. Процесс проверки был успешно выполнен.

7dee6e662c323f14.png

  1. Проверьте, содержит ли набор данных BigQuery все таблицы.
bq ls $PROJECT_ID:$DATASET_ID
     tableId       Type    Labels   Time Partitioning
----------------- ------- -------- -------------------
 Allergies         TABLE
 Careplans         TABLE
 Conditions        TABLE
 Encounters        TABLE
 Imaging_Studies   TABLE
 Immunizations     TABLE
 Medications       TABLE
 Observations      TABLE
 Organizations     TABLE
 Patients          TABLE
 Procedures        TABLE
 Providers         TABLE
  1. Получайте оповещения по электронной почте (если настроено).

Просмотр результатов

Чтобы просмотреть результаты после выполнения конвейера:

  1. Выполните запрос к таблице в пользовательском интерфейсе BigQuery. ПЕРЕЙДИТЕ В ИНТЕРФЕЙС BIGQUERY
  2. Обновите приведенный ниже запрос, указав название вашего проекта, набор данных и таблицу.

e32bfd5d965a117f.png

9. Уборка

Чтобы избежать списания средств с вашего счета Google Cloud Platform за ресурсы, использованные в этом руководстве:

После завершения обучения вы можете очистить созданные вами ресурсы в GCP, чтобы они не занимали вашу квоту и вам не выставлялись за них счета в будущем. В следующих разделах описано, как удалить или отключить эти ресурсы.

Удаление набора данных BigQuery

Следуйте этим инструкциям, чтобы удалить набор данных BigQuery, созданный вами в рамках этого руководства.

Удаление сегмента GCS

Следуйте этим инструкциям, чтобы удалить созданный вами в рамках этого руководства сегмент GCS .

Удаление экземпляра Cloud Data Fusion

Следуйте этим инструкциям, чтобы удалить свой экземпляр Cloud Data Fusion .

Удаление проекта

Самый простой способ избежать выставления счетов — удалить проект, созданный для этого урока.

Чтобы удалить проект:

  1. В консоли GCP перейдите на страницу «Проекты» . ПЕРЕЙДИТЕ НА СТРАНИЦУ ПРОЕКТОВ
  2. В списке проектов выберите проект, который хотите удалить, и нажмите «Удалить» .
  3. В диалоговом окне введите идентификатор проекта, а затем нажмите «Завершить» , чтобы удалить проект.

10. Поздравляем!

Поздравляем, вы успешно завершили практическое задание по загрузке медицинских данных в BigQuery с помощью Cloud Data Fusion.

Вы импортировали данные в формате CSV из Google Cloud Storage в BigQuery.

Вы визуально создали конвейер интеграции данных для массовой загрузки, преобразования и маскирования медицинских данных.

Теперь вы знаете ключевые шаги, необходимые для начала работы с BigQuery на платформе Google Cloud Platform в сфере анализа данных в здравоохранении.