1. Введение
Последнее обновление: 28 февраля 2020 г.
В этой кодовой лаборатории демонстрируется шаблон приема данных для приема данных здравоохранения в формате CSV в BigQuery в режиме реального времени. Для этой лабораторной работы мы будем использовать конвейер данных Cloud Data Fusion в реальном времени. Реалистичные данные медицинских тестов были созданы и доступны для вас в сегменте Google Cloud Storage (gs://hcls_testing_data_fhir_10_phases/csv/).
В этой лабораторной работе по кодированию вы узнаете:
- Как импортировать данные CSV (загрузка в реальном времени) из Pub/Sub в BigQuery с помощью Cloud Data Fusion .
- Как визуально построить конвейер интеграции данных в Cloud Data Fusion для загрузки, преобразования и маскировки медицинских данных в режиме реального времени .
Что вам нужно для запуска этой демонстрации?
- Вам нужен доступ к проекту GCP.
- Вам должна быть назначена роль владельца проекта GCP.
- Данные о здравоохранении в формате CSV, включая заголовок.
Если у вас нет проекта GCP, выполните следующие действия , чтобы создать новый проект GCP.
Медицинские данные в формате CSV были предварительно загружены в корзину GCS по адресу gs://hcls_testing_data_fhir_10_phases/csv/ . Каждый файл ресурсов CSV имеет уникальную структуру схемы. Например, в файле Patients.csv используется другая схема, чем в файле Providers.csv. Предварительно загруженные файлы схем можно найти по адресу gs://hcls_testing_data_fhir_10_phases/csv_schemas .
Если вам понадобится новый набор данных, вы всегда можете сгенерировать его с помощью SyntheaTM . Затем загрузите его в GCS вместо копирования из корзины на этапе копирования входных данных.
2. Настройка проекта GCP
Инициализируйте переменные оболочки для вашей среды.
Чтобы найти PROJECT_ID , обратитесь к разделу Идентификация проектов .
<!-- CODELAB: Initialize shell variables -> <!-- Your current GCP Project ID -> export PROJECT_ID=<PROJECT_ID> <!-- A new GCS Bucket in your current Project - INPUT -> export BUCKET_NAME=<BUCKET_NAME> <!-- A new BQ Dataset ID - OUTPUT -> export DATASET_ID=<DATASET_ID>
Создайте сегмент GCS для хранения входных данных и журналов ошибок с помощью инструмента gsutil .
gsutil mb -l us gs://$BUCKET_NAME
Получите доступ к синтетическому набору данных.
- С адреса электронной почты, который вы используете для входа в Cloud Console, отправьте электронное письмо на адрес hcls-solutions-external+subscribe@google.com с просьбой присоединиться.
- Вы получите электронное письмо с инструкциями о том, как подтвердить действие.
- Используйте возможность ответить на электронное письмо, чтобы присоединиться к группе. НЕ нажимайте кнопку кнопка.
- Как только вы получите электронное письмо с подтверждением, вы сможете перейти к следующему шагу в кодовой лаборатории.
Скопируйте входные данные.
gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME
Создайте набор данных BigQuery.
bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID
Установите и инициализируйте Google Cloud SDK , а также создайте публикацию или подтему и подписки.
gcloud init gcloud pubsub topics create your-topic gcloud pubsub subscriptions create --topic your-topic your-sub
3. Настройка среды Cloud Data Fusion
Выполните следующие действия, чтобы включить API Cloud Data Fusion и предоставить необходимые разрешения:
Включите API .
- Перейдите в библиотеку API консоли GCP .
- В списке проектов выберите свой проект.
- В библиотеке API выберите API, который вы хотите включить ( Cloud Data Fusion API, Cloud Pub/Sub API). Если вам нужна помощь в поиске API, воспользуйтесь полем поиска и фильтрами.
- На странице API нажмите ВКЛЮЧИТЬ .
Создайте экземпляр Cloud Data Fusion .
- В консоли GCP выберите свой ProjectID.
- Выберите «Data Fusion» в левом меню, затем нажмите кнопку «СОЗДАТЬ ЭКЗЕМПЛЯР» в середине страницы (первое создание) или нажмите кнопку «СОЗДАТЬ ЭКЗЕМПЛЯР» в верхнем меню (дополнительное создание).
- Укажите имя экземпляра. Выберите Предприятие .
- Нажмите кнопку СОЗДАТЬ.
Настройте разрешения экземпляра.
После создания экземпляра выполните следующие действия, чтобы предоставить учетной записи службы, связанной с разрешениями экземпляра, в вашем проекте:
- Перейдите на страницу сведений об экземпляре, щелкнув имя экземпляра.
- Скопируйте учетную запись службы.
- Перейдите на страницу IAM вашего проекта.
- На странице разрешений IAM предоставьте сервисному аккаунту роль агента службы Cloud Data Fusion API, нажав кнопку «Добавить» . Вставьте «служебный аккаунт» в поле «Новые участники» и выберите «Управление услугами» -> «Роль агента сервера Cloud Data Fusion API».
- Нажмите + Добавить другую роль (или Изменить агент службы Cloud Data Fusion API), чтобы добавить роль подписчика Pub/Sub.
- Нажмите Сохранить .
После выполнения этих шагов вы можете начать использовать Cloud Data Fusion, щелкнув ссылку «Просмотреть экземпляр» на странице экземпляров Cloud Data Fusion или странице сведений об экземпляре.
Настройте правило брандмауэра.
- Перейдите в Консоль GCP -> Сеть VPC -> Правила брандмауэра, чтобы проверить, существует ли правило default-allow-ssh или нет.
- Если нет, добавьте правило брандмауэра, которое разрешает весь входящий SSH-трафик в сеть по умолчанию.
Используя командную строку:
gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging
Использование пользовательского интерфейса: нажмите «Создать правило брандмауэра» и заполните информацию:
4. Создайте узлы для конвейера
Теперь, когда у нас есть среда Cloud Data Fusion в GCP, давайте начнем создавать конвейеры данных в Cloud Data Fusion, выполнив следующие шаги:
- В окне Cloud Data Fusion нажмите ссылку «Просмотреть экземпляр» в столбце «Действие». Вы будете перенаправлены на другую страницу. Нажмите предоставленный URL-адрес , чтобы открыть экземпляр Cloud Data Fusion. На ваш выбор: нажать кнопку «Начать тур» или кнопку «Нет, спасибо» во всплывающем окне приветствия.
- Разверните меню «гамбургер», выберите Pipeline -> List.
- Нажмите зеленую кнопку + в правом верхнем углу, затем выберите «Создать конвейер». Или нажмите «Создать» ссылку на конвейер.
- Когда появится студия конвейера, в левом верхнем углу выберите «Конвейер данных — в реальном времени» в раскрывающемся списке.
- В пользовательском интерфейсе конвейеров данных на левой панели вы увидите различные разделы: «Фильтр», «Источник», «Преобразование», «Аналитика», «Приемник», «Обработчики ошибок» и «Предупреждения», где вы можете выбрать узел или узлы для конвейера.
Выберите узел источника .
- В разделе «Источник» на палитре плагинов слева дважды щелкните узел Google Cloud PubSub , который появляется в пользовательском интерфейсе конвейеров данных.
- Укажите на исходный узел PubSub и нажмите «Свойства» .
- Заполните необходимые поля. Задайте следующие поля:
- Метка = {любой текст}
- Имя ссылки = {любой текст}
- Идентификатор проекта = автоматическое определение
- Подписка = подписка, созданная в разделе «Создать тему Pub/Sub» (например, your-sub ).
- Тема = тема, созданная в разделе «Создать публикацию/подтему» (например, your-topic ).
- Нажмите «Документация» для подробного объяснения. Нажмите кнопку «Проверить», чтобы проверить всю входную информацию. Зеленый цвет «Ошибок не обнаружено» указывает на успех.
- Чтобы закрыть свойства публикации/подписки, нажмите кнопку X.
Выберите узел Преобразование .
- В разделе «Преобразование» на палитре плагинов слева дважды щелкните узел «Проекция» , который появляется в пользовательском интерфейсе конвейеров данных. Подключите исходный узел Pub/Sub к узлу преобразования проекции.
- Укажите узел «Проекция» и нажмите «Свойства» .
- Заполните необходимые поля. Задайте следующие поля:
- Convert = преобразовать сообщение из байтового типа в строковый.
- Поля для удаления = {любое поле}
- Поля для сохранения = { message , timestamp и атрибуты } ( например, атрибуты: key='filename':value='peoples', отправленные из Pub/Sub)
- Поля для переименования = { сообщение , метка времени }
- Нажмите «Документация» для подробного объяснения. Нажмите кнопку «Проверить», чтобы проверить всю входную информацию. Зеленый цвет «Ошибок не обнаружено» указывает на успех.
- В разделе «Преобразование» на палитре плагинов слева дважды щелкните узел Wrangler , который появляется в пользовательском интерфейсе конвейеров данных. Соедините узел преобразования проекции с узлом преобразования Wrangler. Наведите указатель мыши на узел Wrangler и нажмите «Свойства» .
- Нажмите раскрывающийся список «Действия» и выберите «Импорт» , чтобы импортировать сохраненную схему (например: gs://hcls_testing_data_fhir_10_phases/csv_schemas/ schema (Patients).json ).
- Добавьте поле TIMESTAMP в схему вывода (если оно не существует), нажав кнопку + рядом с последним полем и установите флажок «Null».
- Заполните необходимые поля. Задайте следующие поля:
- Метка = {любой текст}
- Имя поля ввода = {*}
- Предварительное условие = { атрибуты.get("filename") != "пациенты" }, чтобы различать каждый тип записи или сообщения ( например, пациенты, поставщики услуг, аллергии и т. д. ), отправленные из исходного узла PubSub.
- Нажмите «Документация» для получения подробного объяснения. Нажмите кнопку «Проверить», чтобы проверить всю входную информацию. Зеленый цвет «Ошибок не обнаружено» указывает на успех.
- Установите имена столбцов в предпочтительном порядке и удалите ненужные поля. Скопируйте следующий фрагмент кода и вставьте его в поле «Рецепт».
drop attributes parse-as-csv :body ',' false drop body set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP mask-number SSN xxxxxxx####
- См. Batch-Codelab — CSV в BigQuery через CDF для маскировки и деидентификации данных. Или добавьте этот фрагмент кода с номером маски SSN xxxxxxx#### в поле «Рецепт».
- Чтобы закрыть окно «Свойства преобразования», нажмите кнопку X.
Выберите узел Раковина.
- В разделе «Приёмник» на палитре плагинов слева дважды щелкните узел BigQuery , который появляется в пользовательском интерфейсе конвейера данных. Подключите узел преобразования Wrangler к узлу приемника BigQuery.
- Наведите указатель мыши на узел приемника BigQuery и нажмите «Свойства».
- Заполните необходимые поля:
- Метка = {любой текст}
- Имя ссылки = {любой текст}
- Идентификатор проекта = автоматическое определение
- Набор данных = набор данных BigQuery, используемый в текущем проекте (например, DATASET_ID).
- Таблица = {имя таблицы}
- Нажмите «Документация» для подробного объяснения. Нажмите кнопку «Проверить», чтобы проверить всю входную информацию. Зеленый цвет «Ошибок не обнаружено» указывает на успех.
- Чтобы закрыть свойства BigQuery, нажмите кнопку X.
5. Создайте конвейер данных в реальном времени.
В предыдущем разделе мы создали узлы, необходимые для построения конвейера данных в Cloud Data Fusion. В этом разделе мы соединяем узлы для построения настоящего конвейера.
Соединение всех узлов в конвейере
- Перетащите стрелку соединения > на правый край исходного узла и опустите на левый край узла назначения.
- Конвейер может иметь несколько ветвей, которые получают опубликованные сообщения из одного и того же узла источника PubSub.
- Назовите трубопровод.
Вот и все. Вы только что создали свой первый конвейер данных в реальном времени, который нужно развернуть и запустить.
Отправка сообщений через Cloud Pub/Sub
Использование пользовательского интерфейса Pub/Sub :
- Перейдите в консоль GCP -> Pub/Sub -> Topics, выберите свою тему , затем нажмите «ОПУБЛИКОВАТЬ СООБЩЕНИЕ» в верхнем меню.
- Помещайте в поле «Сообщение» только одну строку записи за раз. Нажмите кнопку +ДОБАВИТЬ АТРИБУТ . Укажите Key = имя файла , Value = < тип записи > ( например, пациенты, поставщики услуг, аллергия и т. д. ).
- Нажмите кнопку «Опубликовать», чтобы отправить сообщение.
Используя команду gcloud :
- Введите сообщение вручную.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "paste one record row here"
- Полуавтоматически предоставьте сообщение с помощью команд unix cat и sed . Эту команду можно запускать неоднократно с разными параметрами.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \ "$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"
6. Настройка, развертывание и запуск конвейера
Теперь, когда мы разработали конвейер данных, мы можем развернуть и запустить его в Cloud Data Fusion.
- Сохраните настройки по умолчанию.
- Нажмите «Просмотр» , чтобы просмотреть данные**.** Нажмите «Просмотр» еще раз, чтобы вернуться к предыдущему окну. Вы также можете запустить конвейер в режиме предварительного просмотра, нажав **ЗАПУСК**.
- Нажмите Журналы , чтобы просмотреть журналы.
- Нажмите «Сохранить» , чтобы сохранить все изменения.
- Нажмите «Импортировать» , чтобы импортировать сохраненную конфигурацию конвейера при создании нового конвейера.
- Нажмите «Экспорт» , чтобы экспортировать конфигурацию конвейера.
- Нажмите «Развернуть» , чтобы развернуть конвейер.
- После развертывания нажмите «Выполнить» и дождитесь завершения работы конвейера.
- Нажмите «Стоп» , чтобы в любой момент остановить работу конвейера.
- Вы можете дублировать конвейер, выбрав «Дублировать» под кнопкой «Действия» .
- Вы можете экспортировать конфигурацию конвейера, выбрав «Экспорт» под кнопкой «Действия» .
- Нажмите «Сводка» , чтобы отобразить диаграммы истории запусков, записей, журналов ошибок и предупреждений.
7. Проверка
В этом разделе мы проверяем работу конвейера данных.
- Убедитесь, что конвейер выполнен успешно и работает непрерывно.
- Убедитесь, что таблицы BigQuery загружены обновленными записями на основе TIMESTAMP. В этом примере две записи или сообщения о пациентах и одна запись или сообщение об аллергии были опубликованы в теме Pub/Sub 25 июня 2019 г.
bq query --nouse_legacy_sql 'select (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Patients' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC" ) as Patients, (select count(*) from \ '$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \ 01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
| 2 | 1 |
+----------+-----------+
- Убедитесь, что сообщения, опубликованные в <your-topic>, были получены подписчиком <your-sub>.
gcloud pubsub subscriptions pull --auto-ack <your-sub>
Просмотр результатов
Чтобы просмотреть результаты после публикации сообщений в теме Pub/Sub во время работы конвейера Realtime:
- Запросите таблицу в пользовательском интерфейсе BigQuery. ПЕРЕХОДИТЬ В пользовательский интерфейс BIGQUERY
- Обновите запрос ниже, указав собственное имя проекта, набор данных и таблицу.
8. Уборка
Чтобы избежать списания средств с вашей учетной записи Google Cloud Platform за ресурсы, используемые в этом руководстве:
После завершения обучения вы можете очистить ресурсы, созданные вами в GCP, чтобы они не занимали квоту и вам не выставлялись счета за них в будущем. В следующих разделах описано, как удалить или отключить эти ресурсы.
Удаление набора данных BigQuery
Следуйте этим инструкциям, чтобы удалить набор данных BigQuery, созданный вами в рамках этого руководства.
Удаление сегмента GCS
Следуйте этим инструкциям, чтобы удалить сегмент GCS, созданный вами в рамках этого руководства.
Удаление экземпляра Cloud Data Fusion
Следуйте этим инструкциям, чтобы удалить экземпляр Cloud Data Fusion .
Удаление проекта
Самый простой способ избавиться от выставления счетов — удалить проект, созданный вами для этого руководства.
Чтобы удалить проект:
- В консоли GCP перейдите на страницу «Проекты» . ПЕРЕЙТИ НА СТРАНИЦУ ПРОЕКТОВ
- В списке проектов выберите проект, который хотите удалить, и нажмите «Удалить» .
- В диалоговом окне введите идентификатор проекта, а затем нажмите «Завершить работу» , чтобы удалить проект.
9. Поздравления
Поздравляем, вы успешно завершили лабораторную работу по кодированию для приема медицинских данных в BigQuery с помощью Cloud Data Fusion.
Вы опубликовали данные CSV в теме Pub/Sub, а затем загрузили их в BigQuery.
Вы визуально построили конвейер интеграции данных для загрузки, преобразования и маскировки медицинских данных в режиме реального времени.
Теперь вы знаете ключевые шаги, необходимые для начала работы в области анализа данных здравоохранения с помощью BigQuery на Google Cloud Platform.