1. Введение

Последнее обновление: 28.02.2020
В этом практическом занятии демонстрируется шаблон загрузки данных в формате CSV из медицинских учреждений в BigQuery в режиме реального времени. Для этого занятия мы будем использовать конвейер обработки данных Cloud Data Fusion Real time Data Pipeline. Реалистичные данные медицинских анализов были сгенерированы и предоставлены вам в хранилище Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).
В этой практической работе по программированию вы узнаете:
- Как загрузить CSV-данные (в режиме реального времени) из системы Pub/Sub в BigQuery с помощью Cloud Data Fusion .
- Как визуально построить конвейер интеграции данных в Cloud Data Fusion для загрузки, преобразования и маскирования медицинских данных в режиме реального времени .
Что вам понадобится для запуска этой демонстрации?
- Вам необходим доступ к проекту GCP.
- Для участия в проекте GCP вам должна быть назначена роль владельца.
- Медицинские данные в формате CSV, включая заголовок.
Если у вас нет проекта GCP, выполните следующие шаги для создания нового проекта GCP.
Медицинские данные в формате CSV предварительно загружены в хранилище GCS по адресу gs://hcls_testing_data_fhir_10_patients/csv/ . Каждый файл ресурсов CSV имеет уникальную структуру схемы. Например, файл Patients.csv имеет другую схему, чем Providers.csv. Предварительно загруженные файлы схем можно найти по адресу gs://hcls_testing_data_fhir_10_patients/csv_schemas .
Если вам нужен новый набор данных, вы всегда можете сгенерировать его с помощью Synthea™ . Затем загрузите его в GCS вместо копирования из хранилища на этапе копирования входных данных.
2. Настройка проекта GCP
Инициализируйте переменные оболочки для вашей среды.
Чтобы найти PROJECT_ID , обратитесь к разделу «Идентификация проектов» .
<!-- CODELAB: Initialize shell variables -> <!-- Your current GCP Project ID -> export PROJECT_ID=<PROJECT_ID> <!-- A new GCS Bucket in your current Project - INPUT -> export BUCKET_NAME=<BUCKET_NAME> <!-- A new BQ Dataset ID - OUTPUT -> export DATASET_ID=<DATASET_ID>
Создайте хранилище GCS для хранения входных данных и журналов ошибок с помощью инструмента gsutil .
gsutil mb -l us gs://$BUCKET_NAME
Получите доступ к синтетическому набору данных.
- Отправьте письмо на адрес hcls-solutions-external+subscribe@google.com с того адреса электронной почты, который вы используете для входа в Cloud Console, с просьбой о присоединении.
- Вы получите электронное письмо с инструкциями по подтверждению действия.
- Воспользуйтесь возможностью ответить на электронное письмо, чтобы присоединиться к группе. НЕ нажимайте на кнопку.
кнопка. - После получения подтверждающего письма вы можете перейти к следующему шагу в практическом задании.
Скопировать входные данные.
gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME
Создайте набор данных BigQuery.
bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID
Установите и инициализируйте Google Cloud SDK , создайте публикации (Pub или Sub Topic) и подписки (Subscriptions).
gcloud init gcloud pubsub topics create your-topic gcloud pubsub subscriptions create --topic your-topic your-sub
3. Настройка среды облачного слияния данных
Выполните следующие шаги, чтобы включить API Cloud Data Fusion и предоставить необходимые разрешения:
Включить API .
- Перейдите в библиотеку API консоли GCP .
- Выберите свой проект из списка проектов.
- В библиотеке API выберите API, который хотите включить ( Cloud Data Fusion API, Cloud Pub/Sub API). Если вам нужна помощь в поиске API, воспользуйтесь полем поиска и фильтрами.
- На странице API нажмите «Включить» .
Создайте экземпляр Cloud Data Fusion .
- В консоли GCP выберите свой идентификатор проекта (ProjectID).
- Выберите Data Fusion в левом меню, затем нажмите кнопку «СОЗДАТЬ ЭКЗЕМПЛЯР» в середине страницы (первое создание) или кнопку «СОЗДАТЬ ЭКЗЕМПЛЯР» в верхнем меню (дополнительное создание).


- Укажите имя экземпляра. Выберите «Предприятие» .

- Нажмите кнопку СОЗДАТЬ.
Настройте права доступа к экземпляру.
После создания экземпляра выполните следующие действия, чтобы предоставить учетной записи службы, связанной с экземпляром, разрешения в вашем проекте:
- Чтобы перейти на страницу с подробной информацией об экземпляре, щелкните по его названию.

- Скопируйте учетную запись службы.

- Перейдите на страницу IAM вашего проекта.
- На странице разрешений IAM предоставьте учетной записи службы роль агента службы Cloud Data Fusion API, нажав кнопку «Добавить ». Вставьте «учетную запись службы» в поле «Новые участники» и выберите «Управление службами» -> «Роль агента сервера Cloud Data Fusion API».

- Нажмите кнопку «+ Добавить еще одну роль» (или «Редактировать агент службы API Cloud Data Fusion»), чтобы добавить роль подписчика Pub/Sub.

- Нажмите « Сохранить ».
После выполнения этих шагов вы можете начать использовать Cloud Data Fusion, щелкнув ссылку « Просмотреть экземпляр» на странице экземпляров Cloud Data Fusion или на странице с подробными сведениями об экземпляре.
Настройте правило брандмауэра.
- Перейдите в консоль GCP -> Сеть VPC -> Правила брандмауэра, чтобы проверить, существует ли правило default-allow-ssh.

- В противном случае добавьте правило брандмауэра, разрешающее весь входящий SSH-трафик в сеть по умолчанию.
Использование командной строки:
gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging
Использование пользовательского интерфейса: нажмите «Создать правило брандмауэра» и заполните информацию:


4. Создайте узлы для конвейера.
Теперь, когда у нас есть среда Cloud Data Fusion в GCP, давайте начнем создавать конвейеры обработки данных в Cloud Data Fusion, выполнив следующие шаги:
- В окне Cloud Data Fusion в столбце «Действие» нажмите ссылку «Просмотреть экземпляр». Вы будете перенаправлены на другую страницу. Щелкните указанный URL-адрес , чтобы открыть экземпляр Cloud Data Fusion. На ваш выбор нажмите кнопку «Начать ознакомительный тур» или «Нет, спасибо» во всплывающем окне приветствия.
- Разверните меню-гамбургер, выберите Pipeline -> List

- Нажмите зеленую кнопку «+» в правом верхнем углу, затем выберите «Создать конвейер». Или нажмите ссылку «Создать» для создания конвейера.


- После появления окна «Студия конвейера» в левом верхнем углу выберите в выпадающем списке «Конвейер данных — в реальном времени» .

- В пользовательском интерфейсе конвейеров данных на левой панели вы увидите различные разделы: Фильтр, Источник, Преобразование, Аналитика, Приемник, Обработчики ошибок и Оповещения, где вы можете выбрать узел или узлы для конвейера.

Выберите узел «Источник» .
- В разделе «Источник» на панели плагинов слева дважды щелкните узел Google Cloud PubSub , который появится в пользовательском интерфейсе Data Pipelines.
- Укажите на узел источника PubSub и нажмите «Свойства» .

- Заполните обязательные поля. Установите следующие параметры:
- Метка = {любой текст}
- Название ссылки = {любой текст}
- Идентификатор проекта = автоматическое определение
- Подписка = Подписка, созданная в разделе «Создать тему публикации/подписки» (например, your-sub ).
- Тема = Тема, созданная в разделе «Создать тему публикации/подтемы» (например, ваша тема ).
- Для получения подробного объяснения нажмите «Документация» . Нажмите кнопку «Проверить», чтобы проверить всю введенную информацию. Зеленая надпись «Ошибок не обнаружено» означает успешную проверку.

- Чтобы закрыть окно «Свойства публикации/подписки», нажмите кнопку «X» .
Выберите узел «Преобразование» .
- В разделе «Преобразование» на панели плагинов слева дважды щелкните узел «Проекция» , который появится в пользовательском интерфейсе конвейеров данных. Соедините узел источника Pub/Sub с узлом преобразования «Проекция».
- Наведите указатель мыши на узел «Проекция» и щелкните «Свойства» .

- Заполните обязательные поля. Установите следующие параметры:
- Convert = преобразовать сообщение из байтового типа в строковый.
- Поля для удаления = {любое поле}
- Поля для сохранения : { сообщение , метка времени и атрибуты } ( например, атрибуты: ключ='имя файла':значение='пациенты', отправленные из Pub/Sub)
- Поля для переименования = { сообщение , метка времени }
- Для получения подробного объяснения нажмите «Документация» . Нажмите кнопку «Проверить», чтобы проверить всю введенную информацию. Зеленая надпись «Ошибок не обнаружено» означает успешную проверку.

- В разделе «Преобразование» на панели плагинов слева дважды щелкните узел Wrangler , который появится в пользовательском интерфейсе Data Pipelines. Соедините узел преобразования Projection с узлом преобразования Wrangler. Наведите указатель мыши на узел Wrangler и щелкните «Свойства» .

- Нажмите на выпадающее меню «Действия» и выберите «Импорт» , чтобы импортировать сохраненную схему (например: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json ).
- Добавьте поле TIMESTAMP в схему вывода (если оно отсутствует), нажав кнопку «+» рядом с последним полем и установив флажок «Null».
- Заполните обязательные поля. Установите следующие параметры:
- Метка = {любой текст}
- Название поля ввода = {*}
- Предварительное условие = { attributes.get("filename") != "patients" } для различения каждого типа записей или сообщений ( например, пациенты, поставщики услуг, аллергии и т. д. ), отправляемых из исходного узла PubSub.
- Для получения подробного объяснения нажмите «Документация» . Нажмите кнопку «Проверить», чтобы проверить всю введенную информацию. Зеленая надпись «Ошибок не обнаружено» означает успешную проверку.

- Укажите желаемый порядок названий столбцов и удалите ненужные поля. Скопируйте следующий фрагмент кода и вставьте его в поле «Рецепт».
drop attributes parse-as-csv :body ',' false drop body set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP mask-number SSN xxxxxxx####

- Для маскирования и обезличивания данных обратитесь к руководству Batch-Codelab - CSV to BigQuery via CDF . Или добавьте этот фрагмент кода mask-number SSN xxxxxxx#### в поле «Рецепт».
- Чтобы закрыть окно «Свойства преобразования», нажмите кнопку «X» .
Выберите узел «Приёмник».
- В разделе «Приемник» на панели плагинов слева дважды щелкните узел BigQuery , который появится в пользовательском интерфейсе конвейера данных. Соедините узел преобразования Wrangler с узлом приемника BigQuery.
- Наведите указатель мыши на узел приемника BigQuery и щелкните «Свойства».

- Заполните обязательные поля:
- Метка = {любой текст}
- Название ссылки = {любой текст}
- Идентификатор проекта = автоматическое определение
- Dataset = Набор данных BigQuery, используемый в текущем проекте (например, DATASET_ID)
- Таблица = {название таблицы}
- Для получения подробного объяснения нажмите «Документация» . Нажмите кнопку «Проверить», чтобы проверить всю введенную информацию. Зеленая надпись «Ошибок не обнаружено» означает успешную проверку.

- Чтобы закрыть окно свойств BigQuery, нажмите кнопку X.
5. Создайте конвейер обработки данных в реальном времени.
В предыдущем разделе мы создали узлы, необходимые для построения конвейера обработки данных в Cloud Data Fusion. В этом разделе мы соединим узлы для построения самого конвейера.
Соединение всех узлов в конвейере
- Перетащите стрелку соединения > на правый край исходного узла и отпустите на левый край целевого узла.
- Конвейер может иметь несколько ветвей, которые получают опубликованные сообщения от одного и того же узла источника PubSub.

- Назовите трубопровод.
Вот и всё. Вы только что создали свой первый конвейер обработки данных в реальном времени, который можно развернуть и запустить.
Отправляйте сообщения через Cloud Pub/Sub.
Использование пользовательского интерфейса публикации/подписки :
- Перейдите в консоль GCP -> Публикация/Подписка -> Темы, выберите свою тему , затем нажмите ОПУБЛИКОВАТЬ СООБЩЕНИЕ в верхнем меню.

- В поле «Сообщение» указывайте только одну строку записи за раз. Нажмите кнопку «+ДОБАВИТЬ АТРИБУТ» . Укажите ключ = имя файла , значение = < тип записи > ( например, пациенты, поставщики услуг, аллергии и т. д. ).
- Нажмите кнопку «Опубликовать», чтобы отправить сообщение.
Используя команду gcloud :
- Введите сообщение вручную.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "paste one record row here"
- Полуавтоматически передавайте сообщение с помощью команд Unix cat и sed . Эту команду можно запускать многократно с различными параметрами.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"
6. Настройка, развертывание и запуск конвейера.
Теперь, когда мы разработали конвейер обработки данных, мы можем развернуть и запустить его в Cloud Data Fusion.

- Оставьте значения по умолчанию в настройках .
- Нажмите «Предварительный просмотр» , чтобы просмотреть данные. Нажмите «Предварительный просмотр» еще раз, чтобы вернуться к предыдущему окну. Вы также можете запустить конвейер в режиме предварительного просмотра, нажав «Запустить».

- Нажмите «Журналы» , чтобы просмотреть журналы.
- Нажмите «Сохранить» , чтобы сохранить все изменения.
- Нажмите «Импорт» , чтобы импортировать сохраненную конфигурацию конвейера при создании нового конвейера.
- Нажмите «Экспорт» , чтобы экспортировать конфигурацию конвейера.
- Нажмите «Развернуть» , чтобы развернуть конвейер.
- После развертывания нажмите кнопку «Запустить» и дождитесь завершения выполнения конвейера.

- Нажмите «Стоп» , чтобы остановить выполнение конвейера в любое время.
- Вы можете продублировать конвейер, выбрав пункт «Дублировать» под кнопкой «Действия» .
- Вы можете экспортировать конфигурацию конвейера, выбрав пункт «Экспорт» в разделе « Действия» .

- Нажмите «Сводка» , чтобы отобразить диаграммы истории выполнения, записи, журналы ошибок и предупреждения.
7. Валидация
В этом разделе мы проверяем выполнение конвейера обработки данных.
- Убедитесь, что конвейер был успешно выполнен и работает непрерывно.

- Убедитесь, что таблицы BigQuery загружены обновленными записями на основе временной метки (TIMESTAMP). В этом примере две записи или сообщения о пациентах и одна запись или сообщение об аллергии были опубликованы в тему Pub/Sub 25 июня 2019 года.
bq query --nouse_legacy_sql 'select (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Patients' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC" ) as Patients, (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
| 2 | 1 |
+----------+-----------+
- Убедитесь, что сообщения, опубликованные в теме <your-topic>, были получены подписчиком <your-sub>.
gcloud pubsub subscriptions pull --auto-ack <your-sub>

Просмотр результатов
Чтобы просмотреть результаты после публикации сообщений в тему Pub/Sub во время работы конвейера обработки в реальном времени:
- Выполните запрос к таблице в пользовательском интерфейсе BigQuery. ПЕРЕЙДИТЕ В ИНТЕРФЕЙС BIGQUERY
- Обновите приведенный ниже запрос, указав название вашего проекта, набор данных и таблицу.

8. Уборка
Чтобы избежать списания средств с вашего счета Google Cloud Platform за ресурсы, использованные в этом руководстве:
После завершения обучения вы можете очистить созданные вами ресурсы в GCP, чтобы они не занимали квоты и вам не выставлялись за них счета в будущем. В следующих разделах описано, как удалить или отключить эти ресурсы.
Удаление набора данных BigQuery
Следуйте этим инструкциям, чтобы удалить набор данных BigQuery, созданный вами в рамках этого руководства.
Удаление сегмента GCS
Следуйте этим инструкциям, чтобы удалить созданный вами в рамках этого руководства сегмент GCS .
Удаление экземпляра Cloud Data Fusion
Следуйте этим инструкциям, чтобы удалить свой экземпляр Cloud Data Fusion .
Удаление проекта
Самый простой способ избежать выставления счетов — удалить проект, созданный для этого урока.
Чтобы удалить проект:
- В консоли GCP перейдите на страницу «Проекты» . ПЕРЕЙДИТЕ НА СТРАНИЦУ ПРОЕКТОВ
- В списке проектов выберите проект, который хотите удалить, и нажмите «Удалить» .
- В диалоговом окне введите идентификатор проекта, а затем нажмите «Завершить» , чтобы удалить проект.
9. Поздравляем!
Поздравляем, вы успешно завершили практическое задание по загрузке медицинских данных в BigQuery с помощью Cloud Data Fusion.
Вы опубликовали данные в формате CSV в топик Pub/Sub, а затем загрузили их в BigQuery.
Вы визуально создали конвейер интеграции данных для загрузки, преобразования и маскирования медицинских данных в режиме реального времени.
Теперь вы знаете ключевые шаги, необходимые для начала работы с BigQuery на платформе Google Cloud Platform в сфере анализа данных в здравоохранении.