Прием данных CSV в BigQuery с помощью Cloud Data Fusion – пакетный прием

1. Введение

12fb66cc134b50ef.png

Последнее обновление: 28 февраля 2020 г.

В этой кодовой лаборатории демонстрируется шаблон приема данных для массовой загрузки медицинских данных в формате CSV в BigQuery. Для этой лабораторной работы мы будем использовать конвейер пакетных данных Cloud Data Fusion. Реалистичные данные медицинских тестов были созданы и доступны для вас в сегменте Google Cloud Storage ( gs://hcls_testing_data_fhir_10_phases/csv/ ).

В этой лабораторной работе по кодированию вы узнаете:

  • Как импортировать данные CSV (пакетная загрузка по расписанию) из GCS в BigQuery с помощью Cloud Data Fusion .
  • Как визуально построить конвейер интеграции данных в Cloud Data Fusion для массовой загрузки, преобразования и маскировки медицинских данных .

Что вам нужно для запуска этой лаборатории кода?

  • Вам нужен доступ к проекту GCP.
  • Вам должна быть назначена роль владельца проекта GCP.
  • Данные о здравоохранении в формате CSV, включая заголовок.

Если у вас нет проекта GCP, выполните следующие действия , чтобы создать новый проект GCP.

Медицинские данные в формате CSV были предварительно загружены в корзину GCS по адресу gs://hcls_testing_data_fhir_10_phases/csv/ . Каждый CSV-файл ресурса имеет свою уникальную структуру схемы. Например, в файле Patients.csv используется другая схема, чем в файле Providers.csv. Предварительно загруженные файлы схем можно найти по адресу gs://hcls_testing_data_fhir_10_phases/csv_schemas .

Если вам понадобится новый набор данных, вы всегда можете сгенерировать его с помощью SyntheaTM . Затем загрузите его в GCS вместо копирования из корзины на этапе копирования входных данных.

2. Настройка проекта GCP

Инициализируйте переменные оболочки для вашей среды.

Чтобы найти PROJECT_ID , обратитесь к разделу Идентификация проектов .

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

Создайте сегмент GCS для хранения входных данных и журналов ошибок с помощью инструмента gsutil .

gsutil mb -l us gs://$BUCKET_NAME

Получите доступ к синтетическому набору данных.

  1. С адреса электронной почты, который вы используете для входа в Cloud Console, отправьте электронное письмо на адрес hcls-solutions-external+subscribe@google.com с просьбой присоединиться.
  2. Вы получите электронное письмо с инструкциями о том, как подтвердить действие. 525a0fa752e0acae.png
  3. Используйте возможность ответить на электронное письмо, чтобы присоединиться к группе. НЕ нажимайте кнопку.
  4. Как только вы получите электронное письмо с подтверждением, вы сможете перейти к следующему шагу в кодовой лаборатории.

Скопируйте входные данные.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

Создайте набор данных BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

3. Настройка среды Cloud Data Fusion

Выполните следующие действия, чтобы включить API Cloud Data Fusion и предоставить необходимые разрешения:

Включите API .

  1. Перейдите в библиотеку API консоли GCP .
  2. В списке проектов выберите свой проект.
  3. В библиотеке API выберите API, который вы хотите включить. Если вам нужна помощь в поиске API, воспользуйтесь полем поиска и/или фильтрами.
  4. На странице API нажмите ВКЛЮЧИТЬ.

Создайте экземпляр Cloud Data Fusion .

  1. В консоли GCP выберите свой ProjectID.
  2. Выберите Data Fusion в левом меню, затем нажмите кнопку СОЗДАТЬ ЭКЗЕМПЛЯР в середине страницы (первое создание) или нажмите кнопку СОЗДАТЬ ЭКЗЕМПЛЯР в верхнем меню (дополнительное создание).

а828690ff3bf3c46.png

8372c944c94737ea.png

  1. Укажите имя экземпляра. Выберите Предприятие.

5af91e46917260ff.png

  1. Нажмите кнопку СОЗДАТЬ.

Разрешения экземпляра установки.

После создания экземпляра выполните следующие действия, чтобы предоставить учетной записи службы, связанной с разрешениями экземпляра, в вашем проекте:

  1. Перейдите на страницу сведений об экземпляре, щелкнув имя экземпляра.

76ad691f795e1ab3.png

  1. Скопируйте учетную запись службы.

6c91836afb72209d.png

  1. Перейдите на страницу IAM вашего проекта.
  2. На странице разрешений IAM мы теперь добавим сервисный аккаунт в качестве нового участника и предоставим ему роль агента службы Cloud Data Fusion API . Нажмите кнопку «Добавить» , затем вставьте «служебный аккаунт» в поле «Новые участники» и выберите «Управление услугами» -> Роль агента сервера Cloud Data Fusion API.
  3. ea68b28d917a24b1.png
  4. Нажмите Сохранить .

После выполнения этих шагов вы можете начать использовать Cloud Data Fusion, щелкнув ссылку «Просмотреть экземпляр» на странице экземпляров Cloud Data Fusion или странице сведений об экземпляре.

Настройте правило брандмауэра.

  1. Перейдите в Консоль GCP -> Сеть VPC -> Правила брандмауэра, чтобы проверить, существует ли правило default-allow-ssh или нет.

102adef44bbe3a45.png

  1. Если нет, добавьте правило брандмауэра, которое разрешает весь входящий SSH-трафик в сеть по умолчанию.

Использование командной строки:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

Использование пользовательского интерфейса: нажмите «Создать правило брандмауэра» и заполните информацию:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. Постройте схему трансформации

Теперь, когда у нас есть среда Cloud Fusion в GCP, давайте построим схему. Эта схема нам нужна для преобразования данных CSV.

  1. В окне Cloud Data Fusion нажмите ссылку «Просмотреть экземпляр» в столбце «Действие». Вы будете перенаправлены на другую страницу. Нажмите предоставленный URL-адрес , чтобы открыть экземпляр Cloud Data Fusion. По вашему выбору нажмите кнопку «Начать тур» или кнопку «Нет, спасибо» во всплывающем окне приветствия.
  2. Разверните меню «гамбургер», выберите Pipeline -> Studio.

6561b13f30e36c3a.png

  1. В разделе «Преобразование» на палитре плагинов слева дважды щелкните узел Wrangler, который появится в пользовательском интерфейсе конвейеров данных.

аа44а4db5fe6623a.png

  1. Наведите указатель мыши на узел Wrangler и нажмите «Свойства» . Нажмите кнопку «Сборка» , затем выберите исходный файл .csv (например, Patients.csv), который должен содержать все поля данных для построения нужной схемы.
  2. Нажмите стрелку вниз («Преобразования столбцов») рядом с именем каждого столбца (например, «тело»). 802edca8a97da18.png
  3. По умолчанию при первоначальном импорте предполагается, что в вашем файле данных есть только один столбец. Чтобы проанализировать его как CSV, выберите «РазборCSV» , затем выберите разделитель и установите соответствующий флажок «Установить первую строку как заголовок». Нажмите кнопку «Применить».
  4. Нажмите стрелку вниз рядом с полем «Тело» и выберите «Удалить столбец», чтобы удалить поле «Тело». Кроме того, вы можете попробовать другие преобразования, такие как удаление столбцов, изменение типа данных для некоторых столбцов (по умолчанию используется тип «строка»), разделение столбцов, установка имен столбцов и т. д.

e6d2cda51ff298e7.png

  1. На вкладках «Столбцы» и «Шаги преобразования» показана схема вывода и рецепт Wrangler. Нажмите Применить в правом верхнем углу. Нажмите кнопку «Подтвердить». Зеленый цвет «Ошибок не обнаружено» указывает на успех.

1add853c43f2abee.png

  1. В свойствах Wrangler щелкните раскрывающийся список «Действия» , чтобы экспортировать нужную схему в локальное хранилище для будущего импорта , если это необходимо.
  2. Сохраните рецепт Wrangler для дальнейшего использования.
parse-as-csv :body ',' true
drop body
  1. Чтобы закрыть окно свойств Wrangler, нажмите кнопку X.

5. Создайте узлы для конвейера

В этом разделе мы построим компоненты конвейера.

  1. В пользовательском интерфейсе конвейеров данных в левом верхнем углу вы должны увидеть, что в качестве типа конвейера выбран «Конвейер данных — пакет» .

af67c42ce3d98529.png

  1. На левой панели есть различные разделы: «Фильтр», «Источник», «Преобразование», «Аналитика», «Приемник», «Условия и действия», «Обработчики ошибок» и «Предупреждения», где вы можете выбрать узел или узлы для конвейера.

c4438f7682f8b19b.png

Исходный узел

  1. Выберите узел Источник.
  2. В разделе «Источник» на палитре плагинов слева дважды щелкните узел Google Cloud Storage , который отображается в пользовательском интерфейсе конвейеров данных.
  3. Укажите исходный узел GCS и нажмите «Свойства» .

87e51a3e8dae8b3f.png

  1. Заполните необходимые поля. Установите следующие поля:
  • Метка = {любой текст}
  • Имя ссылки = {любой текст}
  • Идентификатор проекта = автоматическое определение
  • Путь = URL-адрес GCS для сегмента в текущем проекте. Например, gs://$BUCKET_NAME/csv/
  • Формат = текст
  • Поле пути = имя файла
  • Только имя файла пути = true
  • Читать файлы рекурсивно = true
  1. Добавьте поле «имя файла» в схему вывода GCS, нажав кнопку «+» .
  2. Нажмите «Документация» для подробного объяснения. Нажмите кнопку «Подтвердить». Зеленый цвет «Ошибок не обнаружено» указывает на успех.
  3. Чтобы закрыть свойства GCS, нажмите кнопку X.

Узел преобразования

  1. Выберите узел Преобразование.
  2. В разделе «Преобразование» на палитре плагинов слева дважды щелкните узел Wrangler , который появляется в пользовательском интерфейсе конвейеров данных. Подключите исходный узел GCS к узлу преобразования Wrangler.
  3. Наведите указатель мыши на узел Wrangler и нажмите «Свойства» .
  4. Нажмите раскрывающийся список «Действия» и выберите «Импорт» , чтобы импортировать сохраненную схему (например: gs://hcls_testing_data_fhir_10_phases/csv_schemas/ schema (Patients).json ), и вставьте сохраненный рецепт из предыдущего раздела.
  5. Или повторно используйте узел Wrangler из раздела: Построение схемы для трансформации .
  6. Заполните необходимые поля. Установите следующие поля:
  • Метка = {любой текст}
  • Имя поля ввода = {*}
  • Предварительное условие = {имя_файла != "пациенты.csv"}, чтобы отличать каждый входной файл ( например, Patients.csv, поставщики.csv, аллергии.csv и т. д. ) от исходного узла.

2426f8f0a6c4c670.png

  1. Добавьте узел JavaScript для выполнения предоставленного пользователем кода JavaScript, который дополнительно преобразует записи. В этой кодовой лаборатории мы используем узел JavaScript для получения отметки времени для каждого обновления записи. Подключите узел преобразования Wrangler к узлу преобразования JavaScript. Откройте « Свойства JavaScript» и добавьте следующую функцию:

75212f9ad98265a8.png

function transform(input, emitter, context) {
  input.TIMESTAMP = (new Date()).getTime()*1000;
  emitter.emit(input);
}
  1. Добавьте поле с именем TIMESTAMP в схему вывода (если оно не существует), щелкнув знак + . Выберите временную метку в качестве типа данных.

4227389b57661135.png

  1. Нажмите «Документация» для получения подробного объяснения. Нажмите кнопку «Проверить», чтобы проверить всю входную информацию. Зеленый цвет «Ошибок не обнаружено» указывает на успех.
  2. Чтобы закрыть окно «Свойства преобразования», нажмите кнопку X.

Маскирование и деидентификация данных

  1. Вы можете выбрать отдельные столбцы данных, щелкнув стрелку вниз в столбце и применив правила маскирования в разделе «Выбор данных маски» в соответствии с вашими требованиями (например, столбец SSN).

bb1eb067dd6e0946.png

  1. Вы можете добавить больше директив в окне «Рецепт» узла Wrangler . Например, используя директиву hash с алгоритмом хеширования, следующим синтаксисом, в целях деидентификации:
hash <column> <algorithm> <encode>

<column>: name of the column
<algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.)
<encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

cbcc9a0932f53197.png

Приемный узел

  1. Выберите узел приемника.
  2. В разделе «Приемник» на палитре плагинов слева дважды щелкните узел BigQuery, который появится в пользовательском интерфейсе конвейера данных.
  3. Наведите указатель мыши на узел приемника BigQuery и нажмите «Свойства».

1be711152c92c692.png

  1. Заполните необходимые поля. Установите следующие поля:
  • Метка = {любой текст}
  • Имя ссылки = {любой текст}
  • Идентификатор проекта = автоматическое определение
  • Набор данных = набор данных BigQuery, используемый в текущем проекте (т. е. DATASET_ID).
  • Таблица = {имя таблицы}
  1. Нажмите «Документация» для получения подробного объяснения. Нажмите кнопку «Проверить», чтобы проверить всю входную информацию. Зеленый цвет «Ошибок не обнаружено» указывает на успех.

c5585747da2ef341.png

  1. Чтобы закрыть свойства BigQuery, нажмите кнопку X.

6. Создайте конвейер пакетных данных

Соединение всех узлов в конвейере

  1. Перетащите стрелку соединения > на правый край исходного узла и опустите на левый край узла назначения.
  2. Конвейер может иметь несколько ветвей, которые получают входные файлы из одного и того же узла источника GCS.

67510ab46bd44d36.png

  1. Назовите трубопровод.

Вот и все. Вы только что создали свой первый конвейер пакетных данных и можете его развернуть и запустить.

Отправка оповещений конвейера по электронной почте (необязательно)

Чтобы использовать функцию Pipeline Alert SendEmail, конфигурация требует, чтобы почтовый сервер был настроен для отправки почты с экземпляра виртуальной машины. Для получения дополнительной информации см. ссылку ниже:

Отправка электронной почты из экземпляра | Документация по вычислительному движку

В этой кодовой лаборатории мы настраиваем службу ретрансляции почты через Mailgun, выполнив следующие шаги:

  1. Следуйте инструкциям в разделе «Отправка электронной почты с помощью Mailgun | Документация Compute Engine , чтобы настроить учетную запись в Mailgun и настроить службу ретрансляции электронной почты. Дополнительные модификации ниже.
  2. Добавьте адреса электронной почты всех получателей в авторизованный список Mailgun. Этот список можно найти в разделе Mailgun>Отправка>Обзор на левой панели.

7e6224cced3fa4e0.pngfa78739f1ddf2dc2.png

Как только получатели нажимают «Я согласен» в электронном письме, отправленном с адреса support@mailgun.net , их адреса электронной почты сохраняются в авторизованном списке для получения электронных писем с оповещениями по конвейеру.

72847c97fd5fce0f.png

  1. Шаг 3 из раздела «Перед началом работы» — создайте правило брандмауэра следующим образом:

75b063c165091912.png

  1. Шаг 3 «Настройка Mailgun в качестве ретранслятора почты с помощью Postfix». Выберите «Интернет-сайт» или «Интернет со смартхостом» вместо «Только локально» , как указано в инструкциях.

8fd8474a4ef18f16.png

  1. Шаг 4 «Настройка Mailgun в качестве ретранслятора почты с помощью Postfix». Отредактируйте vi /etc/postfix/main.cf, добавив 10.128.0.0/9 в конец mynetworks .

249fbf3edeff1ce8.png

  1. Отредактируйте vi /etc/postfix/master.cf, чтобы изменить smtp по умолчанию (25) на порт 587.

86c82cf48c687e72.png

  1. В правом верхнем углу студии Data Fusion нажмите «Настроить» . Нажмите «Предупреждение конвейера» и нажмите кнопку «+» , чтобы открыть окно «Предупреждения» . Выберите Отправить электронную почту .

dc079a91f1b0da68.png

  1. Заполните форму настройки электронной почты . Выберите завершение, успех или неудачу в раскрывающемся списке «Условие выполнения» для каждого типа оповещения. Если Include Workflow Token = false , отправляется только информация из поля «Сообщение». Если включить токен рабочего процесса = true , информация из поля «Сообщение» и подробная информация о токене рабочего процесса не будут отправлены. Вы должны использовать строчные буквы для протокола . В качестве отправителя используйте любой « поддельный » адрес электронной почты, отличный от адреса электронной почты вашей компании.

1fa619b6ce28f5e5.png

7. Настройка, развертывание, запуск/планирование конвейера

db612e62a1c7ab7e.png

  1. В правом верхнем углу студии Data Fusion нажмите «Настроить» . Выберите Spark для конфигурации двигателя. Нажмите «Сохранить» в окне «Настроить».

8ecf7c243c125882.png

  1. Нажмите «Просмотр для просмотра данных»**,** и снова нажмите «Просмотр**», чтобы вернуться к предыдущему окну. Вы также можете **Запустить** конвейер в режиме предварительного просмотра.

b3c891e5e1aa20ae.png

  1. Нажмите Журналы , чтобы просмотреть журналы.
  2. Нажмите «Сохранить» , чтобы сохранить все изменения.
  3. Нажмите «Импортировать» , чтобы импортировать сохраненную конфигурацию конвейера при создании нового конвейера.
  4. Нажмите «Экспорт» , чтобы экспортировать конфигурацию конвейера.
  5. Нажмите «Развернуть» , чтобы развернуть конвейер.
  6. После развертывания нажмите «Выполнить» и дождитесь завершения работы конвейера.

bb06001d46a293db.png

  1. Вы можете дублировать конвейер, выбрав «Дублировать» под кнопкой «Действия» .
  2. Вы можете экспортировать конфигурацию конвейера, выбрав «Экспорт» под кнопкой «Действия» .
  3. Нажмите «Входящие триггеры» или «Исходящие триггеры» на левом или правом краю окна Studio, чтобы при необходимости настроить триггеры конвейера.
  4. Нажмите «Расписание» , чтобы запланировать периодический запуск конвейера и загрузку данных.

4167fa67550a49d5.png

  1. Сводка показывает диаграммы истории запусков, записей, журналов ошибок и предупреждений.

8. Проверка

  1. Конвейер проверки выполнен успешно.

7dee6e662c323f14.png

  1. Проверьте, есть ли в наборе данных BigQuery все таблицы.
bq ls $PROJECT_ID:$DATASET_ID
     tableId       Type    Labels   Time Partitioning
----------------- ------- -------- -------------------
 Allergies         TABLE
 Careplans         TABLE
 Conditions        TABLE
 Encounters        TABLE
 Imaging_Studies   TABLE
 Immunizations     TABLE
 Medications       TABLE
 Observations      TABLE
 Organizations     TABLE
 Patients          TABLE
 Procedures        TABLE
 Providers         TABLE
  1. Получать оповещения по электронной почте (если настроено).

Просмотр результатов

Чтобы просмотреть результаты после запуска конвейера:

  1. Запросите таблицу в пользовательском интерфейсе BigQuery. ПЕРЕХОДИТЬ В пользовательский интерфейс BIGQUERY
  2. Обновите запрос ниже, указав собственное имя проекта, набор данных и таблицу.

e32bfd5d965a117f.png

9. Уборка

Чтобы избежать списания средств с вашей учетной записи Google Cloud Platform за ресурсы, используемые в этом руководстве:

После завершения обучения вы можете очистить ресурсы, созданные вами в GCP, чтобы они не занимали вашу квоту и вам не выставлялись счета за них в будущем. В следующих разделах описано, как удалить или отключить эти ресурсы.

Удаление набора данных BigQuery

Следуйте этим инструкциям, чтобы удалить набор данных BigQuery, созданный вами в рамках этого руководства.

Удаление сегмента GCS

Следуйте этим инструкциям, чтобы удалить сегмент GCS, созданный вами в рамках этого руководства.

Удаление экземпляра Cloud Data Fusion

Следуйте этим инструкциям, чтобы удалить экземпляр Cloud Data Fusion .

Удаление проекта

Самый простой способ избавиться от выставления счетов — удалить проект, созданный вами для этого руководства.

Чтобы удалить проект:

  1. В консоли GCP перейдите на страницу «Проекты» . ПЕРЕЙТИ НА СТРАНИЦУ ПРОЕКТОВ
  2. В списке проектов выберите проект, который хотите удалить, и нажмите «Удалить» .
  3. В диалоговом окне введите идентификатор проекта, а затем нажмите «Завершить работу» , чтобы удалить проект.

10. Поздравления

Поздравляем, вы успешно завершили лабораторную работу по кодированию для приема медицинских данных в BigQuery с помощью Cloud Data Fusion.

Вы импортировали данные CSV из Google Cloud Storage в BigQuery.

Вы визуально построили конвейер интеграции данных для массовой загрузки, преобразования и маскировки медицинских данных.

Теперь вы знаете ключевые шаги, необходимые для начала работы в области анализа данных здравоохранения с помощью BigQuery на Google Cloud Platform.