1. Введение

Последнее обновление: 28.02.2020
В этом практическом занятии демонстрируется шаблон загрузки данных в формате CSV из медицинских учреждений в BigQuery в пакетном режиме. Для этого занятия мы будем использовать конвейер пакетной обработки данных Cloud Data Fusion. Реалистичные данные медицинских анализов были сгенерированы и предоставлены вам в хранилище Google Cloud Storage ( gs://hcls_testing_data_fhir_10_patients/csv/ ).
В этой практической работе по программированию вы узнаете:
- Как загрузить данные из CSV-файлов (пакетная загрузка) из GCS в BigQuery с помощью Cloud Data Fusion .
- Как визуально построить конвейер интеграции данных в Cloud Data Fusion для массовой загрузки, преобразования и маскирования данных здравоохранения .
Что вам понадобится для проведения этого практического занятия?
- Вам необходим доступ к проекту GCP.
- Для участия в проекте GCP вам должна быть назначена роль владельца.
- Медицинские данные в формате CSV, включая заголовок.
Если у вас нет проекта GCP, выполните следующие шаги для создания нового проекта GCP.
Данные о здравоохранении в формате CSV предварительно загружены в хранилище GCS по адресу gs://hcls_testing_data_fhir_10_patients/csv/ . Каждый CSV-файл ресурса имеет свою уникальную структуру схемы. Например, файл Patients.csv имеет другую схему, чем Providers.csv. Предварительно загруженные файлы схем можно найти по адресу gs://hcls_testing_data_fhir_10_patients/csv_schemas .
Если вам нужен новый набор данных, вы всегда можете сгенерировать его с помощью Synthea™ . Затем загрузите его в GCS вместо копирования из хранилища на этапе копирования входных данных.
2. Настройка проекта GCP
Инициализируйте переменные оболочки для вашей среды.
Чтобы найти PROJECT_ID , обратитесь к разделу «Идентификация проектов» .
<!-- CODELAB: Initialize shell variables -> <!-- Your current GCP Project ID -> export PROJECT_ID=<PROJECT_ID> <!-- A new GCS Bucket in your current Project - INPUT -> export BUCKET_NAME=<BUCKET_NAME> <!-- A new BQ Dataset ID - OUTPUT -> export DATASET_ID=<DATASET_ID>
Создайте хранилище GCS для хранения входных данных и журналов ошибок с помощью инструмента gsutil .
gsutil mb -l us gs://$BUCKET_NAME
Получите доступ к синтетическому набору данных.
- Отправьте письмо на адрес hcls-solutions-external+subscribe@google.com с того адреса электронной почты, который вы используете для входа в Cloud Console, с просьбой о присоединении.
- Вы получите электронное письмо с инструкциями по подтверждению действия.

- Воспользуйтесь возможностью ответить на электронное письмо, чтобы присоединиться к группе. НЕ нажимайте на кнопку.
- После получения подтверждающего письма вы можете перейти к следующему шагу в практическом задании.
Скопировать входные данные.
gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME
Создайте набор данных BigQuery.
bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID
3. Настройка среды облачного слияния данных
Выполните следующие шаги, чтобы включить API Cloud Data Fusion и предоставить необходимые разрешения:
Включить API .
- Перейдите в библиотеку API консоли GCP .
- Выберите свой проект из списка проектов.
- В библиотеке API выберите API, который хотите включить. Если вам нужна помощь в поиске API, воспользуйтесь полем поиска и/или фильтрами.
- На странице API нажмите «Включить».
Создайте экземпляр Cloud Data Fusion .
- В консоли GCP выберите свой идентификатор проекта (ProjectID).
- Выберите Data Fusion в левом меню, затем нажмите кнопку «СОЗДАТЬ ЭКЗЕМПЛЯР» в середине страницы (первое создание) или кнопку «СОЗДАТЬ ЭКЗЕМПЛЯР» в верхнем меню (дополнительное создание).


- Укажите имя экземпляра. Выберите «Предприятие».

- Нажмите кнопку СОЗДАТЬ.
Настройка прав доступа к экземпляру.
После создания экземпляра выполните следующие действия, чтобы предоставить учетной записи службы, связанной с экземпляром, разрешения в вашем проекте:
- Чтобы перейти на страницу с подробной информацией об экземпляре, щелкните по его названию.

- Скопируйте учетную запись службы.

- Перейдите на страницу IAM вашего проекта.
- На странице разрешений IAM мы добавим учетную запись службы в качестве нового участника и предоставим ей роль агента службы Cloud Data Fusion API . Нажмите кнопку «Добавить» , затем вставьте «учетную запись службы» в поле «Новые участники» и выберите «Управление службами» -> «Роль агента сервера Cloud Data Fusion API».

- Нажмите « Сохранить ».
После выполнения этих шагов вы можете начать использовать Cloud Data Fusion, щелкнув ссылку « Просмотреть экземпляр» на странице экземпляров Cloud Data Fusion или на странице с подробными сведениями об экземпляре.
Настройте правило брандмауэра.
- Перейдите в консоль GCP -> Сеть VPC -> Правила брандмауэра, чтобы проверить, существует ли правило default-allow-ssh.

- В противном случае добавьте правило брандмауэра, разрешающее весь входящий SSH-трафик в сеть по умолчанию.
Использование командной строки:
gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging
Использование пользовательского интерфейса: нажмите «Создать правило брандмауэра» и заполните информацию:


4. Создайте схему для преобразования.
Теперь, когда у нас есть среда Cloud Fusion в GCP, давайте создадим схему. Эта схема необходима для преобразования данных CSV.
- В окне Cloud Data Fusion в столбце «Действие» нажмите ссылку «Просмотреть экземпляр». Вы будете перенаправлены на другую страницу. Щелкните указанный URL-адрес , чтобы открыть экземпляр Cloud Data Fusion. На ваш выбор нажмите кнопку «Начать ознакомительный тур» или «Нет, спасибо» во всплывающем окне приветствия.
- Разверните меню-гамбургер, выберите Pipeline -> Studio

- В разделе «Преобразование» на панели плагинов слева дважды щелкните узел Wrangler, который появится в пользовательском интерфейсе Data Pipelines.

- Наведите указатель мыши на узел Wrangler и нажмите «Свойства» . Нажмите кнопку Wrangler , затем выберите исходный файл .csv (например, patients.csv), который должен содержать все поля данных для построения нужной схемы.
- Нажмите стрелку вниз (Преобразования столбцов) рядом с названием каждого столбца (например, body).

- По умолчанию при первоначальном импорте предполагается, что в вашем файле данных только один столбец. Чтобы преобразовать его в CSV-файл, выберите «Преобразовать в CSV» , затем выберите разделитель и установите флажок «Установить первую строку в качестве заголовка» при необходимости. Нажмите кнопку «Применить».
- Щелкните стрелку вниз рядом с полем «Тело», выберите «Удалить столбец», чтобы удалить поле «Тело». Кроме того, вы можете попробовать другие преобразования, такие как удаление столбцов, изменение типа данных для некоторых столбцов (по умолчанию — строковый тип), разделение столбцов, присвоение имен столбцам и т. д.

- На вкладках «Столбцы» и «Этапы преобразования» отображается схема вывода и алгоритм Wrangler. Нажмите «Применить» в правом верхнем углу. Нажмите кнопку «Проверить». Зеленое сообщение «Ошибок не обнаружено» указывает на успешное выполнение.

- В свойствах Wrangler щелкните раскрывающееся меню «Действия» , чтобы экспортировать нужную схему в локальное хранилище для последующего импорта при необходимости.
- Сохраните рецепт Wrangler для дальнейшего использования.
parse-as-csv :body ',' true drop body
- Чтобы закрыть окно «Свойства Wrangler», нажмите кнопку «X» .
5. Создайте узлы для конвейера.
В этом разделе мы будем создавать компоненты конвейера.
- В пользовательском интерфейсе «Конвейеры данных» в левом верхнем углу вы должны увидеть, что в качестве типа конвейера выбран «Конвейер данных — Пакетная обработка» .

- На левой панели расположены различные разделы: Фильтр, Источник, Преобразование, Аналитика, Приемник, Условия и действия, Обработчики ошибок и Оповещения, где можно выбрать узел или узлы для конвейера.

Исходный узел
- Выберите узел «Источник».
- В разделе «Источник» на панели плагинов слева дважды щелкните узел Google Cloud Storage , который появится в пользовательском интерфейсе Data Pipelines.
- Укажите на узел источника GCS и нажмите «Свойства» .

- Заполните обязательные поля. Установите следующие параметры:
- Метка = {любой текст}
- Название ссылки = {любой текст}
- Идентификатор проекта = автоматическое определение
- Путь = URL-адрес GCS для хранилища в вашем текущем проекте. Например, gs://$BUCKET_NAME/csv/
- Формат = текст
- Поле «Путь» = имя файла
- Путь только к имени файла = true
- Read Files Recursively = true
- Добавьте поле 'filename' в схему вывода GCS, нажав кнопку "+" .
- Для получения подробных пояснений нажмите «Документация» . Нажмите кнопку «Проверить». Зеленая надпись «Ошибок не обнаружено» означает успешное выполнение проверки.
- Чтобы закрыть окно свойств GCS, нажмите кнопку X.
Узел преобразования
- Выберите узел «Преобразование».
- В разделе «Преобразование» на панели плагинов слева дважды щелкните узел Wrangler , который появится в пользовательском интерфейсе конвейеров данных. Соедините исходный узел GCS с узлом преобразования Wrangler.
- Наведите указатель мыши на узел Wrangler и щелкните «Свойства» .
- Нажмите на выпадающее меню «Действия» и выберите «Импорт» , чтобы импортировать сохраненную схему (например: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema(Patients).json ), а затем вставьте сохраненный рецепт из предыдущего раздела.
- Или же используйте узел Wrangler из раздела: Создание схемы для преобразования .
- Заполните обязательные поля. Установите следующие параметры:
- Метка = {любой текст}
- Название поля ввода = {*}
- Precondition = {filename != "patients.csv"} для различения каждого входного файла ( например, patients.csv, providers.csv, allergys.csv и т. д. ) от узла Source.

- Добавьте узел JavaScript для выполнения предоставленного пользователем JavaScript-кода, который дополнительно преобразует записи. В этом практическом занятии мы используем узел JavaScript для получения метки времени для каждого обновления записи. Соедините узел преобразования Wrangler с узлом преобразования JavaScript. Откройте свойства JavaScript и добавьте следующую функцию:

function transform(input, emitter, context) {
input.TIMESTAMP = (new Date()).getTime()*1000;
emitter.emit(input);
}
- Добавьте поле с именем TIMESTAMP в схему вывода (если оно отсутствует), нажав знак «+» . Выберите тип данных «метка времени».

- Для получения подробного объяснения нажмите «Документация» . Нажмите кнопку «Проверить», чтобы проверить всю введенную информацию. Зеленая надпись «Ошибок не обнаружено» означает успешную проверку.
- Чтобы закрыть окно «Свойства преобразования», нажмите кнопку «X» .
Маскирование и обезличивание данных
- Вы можете выбрать отдельные столбцы данных, щелкнув стрелку вниз в столбце и применив правила маскирования в разделе «Маскировка выбранных данных» в соответствии с вашими требованиями (например, столбец SSN).

- В окне «Рецепт» узла Wrangler можно добавить дополнительные директивы. Например, можно использовать директиву `hash` с алгоритмом хеширования, следующим синтаксисом для целей деидентификации:
hash <column> <algorithm> <encode> <column>: name of the column <algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.) <encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

Узел-приемник
- Выберите узел-приемник.
- В разделе «Приемник» на панели плагинов слева дважды щелкните узел BigQuery, который появится в пользовательском интерфейсе конвейера данных.
- Наведите указатель мыши на узел приемника BigQuery и щелкните «Свойства».

- Заполните обязательные поля. Установите следующие параметры:
- Метка = {любой текст}
- Название ссылки = {любой текст}
- Идентификатор проекта = автоматическое определение
- Dataset = Набор данных BigQuery, используемый в текущем проекте (т.е. DATASET_ID)
- Таблица = {название таблицы}
- Для получения подробного объяснения нажмите «Документация» . Нажмите кнопку «Проверить», чтобы проверить всю введенную информацию. Зеленая надпись «Ошибок не обнаружено» означает успешную проверку.

- Чтобы закрыть окно свойств BigQuery, нажмите кнопку X.
6. Создание конвейера пакетной обработки данных
Соединение всех узлов в конвейере
- Перетащите стрелку соединения > на правый край исходного узла и отпустите на левый край целевого узла.
- Конвейер обработки данных может иметь несколько ветвей, получающих входные файлы из одного и того же узла источника GCS.

- Назовите трубопровод.
Вот и все. Вы только что создали свой первый конвейер пакетной обработки данных и можете развернуть и запустить его.
Отправлять оповещения о ходе выполнения проекта по электронной почте (необязательно)
Для использования функции Pipeline Alert SendEmail требуется настроить почтовый сервер для отправки почты с виртуальной машины. Дополнительную информацию см. по ссылке ниже:
Отправка электронных писем из экземпляра | Документация Compute Engine
В этом практическом задании мы настроим службу пересылки почты с помощью Mailgun, выполнив следующие шаги:
- Следуйте инструкциям в разделе «Отправка электронных писем с помощью Mailgun | Документация Compute Engine» , чтобы создать учетную запись в Mailgun и настроить службу пересылки электронной почты. Дополнительные изменения описаны ниже.
- Добавьте все адреса электронной почты получателей в авторизованный список Mailgun. Этот список можно найти в меню Mailgun > Отправка > Обзор на левой панели.


После того как получатели нажмут кнопку «Я согласен» в электронном письме, отправленном с адреса support@mailgun.net , их адреса электронной почты будут сохранены в списке авторизованных получателей для получения уведомлений о ходе выполнения проекта.

- Шаг 3 раздела «Прежде чем начать» — создайте правило брандмауэра следующим образом:

- Шаг 3 раздела «Настройка Mailgun в качестве почтового ретранслятора с Postfix». Выберите «Интернет-сайт» или «Интернет с помощью smarthost» вместо «Только локальный» , как указано в инструкциях.

- Шаг 4 раздела «Настройка Mailgun в качестве почтового ретранслятора с Postfix». Отредактируйте файл /etc/postfix/main.cf , добавив 10.128.0.0/9 в конец строки mynetworks .

- Отредактируйте файл /etc/postfix/master.cf с помощью vi , чтобы изменить значение SMTP по умолчанию (25) на порт 587.

- В правом верхнем углу Data Fusion Studio нажмите «Настроить» . Нажмите «Оповещение конвейера» и нажмите кнопку «+» , чтобы открыть окно «Оповещения» . Выберите «Отправить по электронной почте» .

- Заполните форму настройки электронной почты . Для каждого типа оповещения выберите завершение, успех или неудачу из раскрывающегося списка «Условие выполнения» . Если Include Workflow Token = false , отправляется только информация из поля «Сообщение». Если Include Workflow Token = true , отправляется информация из поля «Сообщение» и подробная информация о токене рабочего процесса. Для поля «Протокол» необходимо использовать строчные буквы . В поле «Отправитель » используйте любой « фиктивный » адрес электронной почты, кроме корпоративного адреса.

7. Настройка, развертывание, запуск/планирование конвейера.

- В правом верхнем углу Data Fusion Studio нажмите «Настроить» . Выберите Spark в качестве конфигурации движка. В окне «Настройка» нажмите «Сохранить».

- Нажмите «Предварительный просмотр» , чтобы просмотреть данные, и нажмите «Предварительный просмотр» еще раз, чтобы вернуться к предыдущему окну. Вы также можете запустить конвейер в режиме предварительного просмотра.

- Нажмите «Журналы» , чтобы просмотреть журналы.
- Нажмите «Сохранить» , чтобы сохранить все изменения.
- Нажмите «Импорт» , чтобы импортировать сохраненную конфигурацию конвейера при создании нового конвейера.
- Нажмите «Экспорт» , чтобы экспортировать конфигурацию конвейера.
- Нажмите «Развернуть» , чтобы развернуть конвейер.
- После развертывания нажмите кнопку «Запустить» и дождитесь завершения выполнения конвейера.

- Вы можете продублировать конвейер, выбрав пункт «Дублировать» под кнопкой «Действия» .
- Экспорт конфигурации конвейера можно осуществить, выбрав пункт «Экспорт» в меню « Действия» .
- Чтобы при необходимости настроить триггеры конвейера, щелкните «Входящие триггеры» или «Исходящие триггеры» в левой или правой части окна Studio.
- Нажмите «Расписание» , чтобы запланировать периодическое выполнение конвейера и загрузку данных.

- В сводке отображаются диаграммы истории выполнения, записи, журналы ошибок и предупреждения.
8. Валидация
- Процесс проверки был успешно выполнен.

- Проверьте, содержит ли набор данных BigQuery все таблицы.
bq ls $PROJECT_ID:$DATASET_ID
tableId Type Labels Time Partitioning
----------------- ------- -------- -------------------
Allergies TABLE
Careplans TABLE
Conditions TABLE
Encounters TABLE
Imaging_Studies TABLE
Immunizations TABLE
Medications TABLE
Observations TABLE
Organizations TABLE
Patients TABLE
Procedures TABLE
Providers TABLE
- Получайте оповещения по электронной почте (если настроено).
Просмотр результатов
Чтобы просмотреть результаты после выполнения конвейера:
- Выполните запрос к таблице в пользовательском интерфейсе BigQuery. ПЕРЕЙДИТЕ В ИНТЕРФЕЙС BIGQUERY
- Обновите приведенный ниже запрос, указав название вашего проекта, набор данных и таблицу.

9. Уборка
Чтобы избежать списания средств с вашего счета Google Cloud Platform за ресурсы, использованные в этом руководстве:
После завершения обучения вы можете очистить созданные вами ресурсы в GCP, чтобы они не занимали вашу квоту и вам не выставлялись за них счета в будущем. В следующих разделах описано, как удалить или отключить эти ресурсы.
Удаление набора данных BigQuery
Следуйте этим инструкциям, чтобы удалить набор данных BigQuery, созданный вами в рамках этого руководства.
Удаление сегмента GCS
Следуйте этим инструкциям, чтобы удалить созданный вами в рамках этого руководства сегмент GCS .
Удаление экземпляра Cloud Data Fusion
Следуйте этим инструкциям, чтобы удалить свой экземпляр Cloud Data Fusion .
Удаление проекта
Самый простой способ избежать выставления счетов — удалить проект, созданный для этого урока.
Чтобы удалить проект:
- В консоли GCP перейдите на страницу «Проекты» . ПЕРЕЙДИТЕ НА СТРАНИЦУ ПРОЕКТОВ
- В списке проектов выберите проект, который хотите удалить, и нажмите «Удалить» .
- В диалоговом окне введите идентификатор проекта, а затем нажмите «Завершить» , чтобы удалить проект.
10. Поздравляем!
Поздравляем, вы успешно завершили практическое задание по загрузке медицинских данных в BigQuery с помощью Cloud Data Fusion.
Вы импортировали данные в формате CSV из Google Cloud Storage в BigQuery.
Вы визуально создали конвейер интеграции данных для массовой загрузки, преобразования и маскирования медицинских данных.
Теперь вы знаете ключевые шаги, необходимые для начала работы с BigQuery на платформе Google Cloud Platform в сфере анализа данных в здравоохранении.