Cloud Data Fusion을 사용하여 CSV (쉼표로 구분된 값) 데이터를 BigQuery로 수집 - 실시간 수집

1. 소개

509db33558ae025.png

최종 업데이트: 2020년 2월 28일

이 Codelab에서는 CSV 형식의 의료 데이터를 BigQuery로 실시간으로 수집하는 데이터 수집 패턴을 보여줍니다. 이 실습에서는 Cloud Data Fusion 실시간 데이터 파이프라인을 사용합니다. 현실적인 의료 테스트 데이터가 생성되어 Google Cloud Storage 버킷 (gs://hcls_testing_data_fhir_10_patients/csv/)에 제공되었습니다.

이 Codelab에서 학습할 내용은 다음과 같습니다.

  • Cloud Data Fusion을 사용하여 Pub/Sub에서 BigQuery로 CSV 데이터 (실시간 로드)를 수집하는 방법입니다.
  • 의료 데이터를 실시간으로 로드, 변환, 마스킹하기 위해 Cloud Data Fusion에서 데이터 통합 파이프라인을 시각적으로 빌드하는 방법

이 데모를 실행하려면 무엇이 필요한가요?

  • GCP 프로젝트에 액세스해야 합니다.
  • GCP 프로젝트의 소유자 역할을 할당받아야 합니다.
  • 헤더를 포함한 CSV 형식의 의료 데이터입니다.

GCP 프로젝트가 없으면 이 단계에 따라 새 GCP 프로젝트를 만듭니다.

CSV 형식의 의료 데이터는 GCS 버킷(gs://hcls_testing_data_fhir_10_patients/csv/)에 미리 로드되어 있습니다. 각 CSV 리소스 파일에는 고유한 스키마 구조가 있습니다. 예를 들어 Patients.csv는 Providers.csv와 다른 스키마를 사용합니다. 미리 로드된 스키마 파일은 gs://hcls_testing_data_fhir_10_patients/csv_schemas에서 확인할 수 있습니다.

새 데이터 세트가 필요한 경우 언제든지 SyntheaTM를 사용하여 데이터 세트를 생성할 수 있습니다. 그런 다음 입력 데이터 복사 단계에서 버킷의 파일을 복사하는 대신 GCS에 업로드합니다.

2. GCP 프로젝트 설정

환경에 맞게 셸 변수를 초기화합니다.

PROJECT_ID를 찾으려면 프로젝트 식별을 참고하세요.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

gsutil 도구를 사용하여 입력 데이터와 오류 로그를 저장할 GCS 버킷을 만듭니다.

gsutil mb -l us gs://$BUCKET_NAME

합성 데이터 세트에 대한 액세스 권한을 얻습니다.

  1. Cloud 콘솔에 로그인할 때 사용하는 이메일 주소에서 hcls-solutions-external+subscribe@google.com으로 가입을 요청하는 이메일을 전송합니다.
  2. 작업을 확인하는 방법에 대한 지침이 포함된 이메일이 전송됩니다.
  3. 옵션을 사용하여 이메일에 답장하고 그룹에 참여하세요. 525a0fa752e0acae.png 버튼을 클릭하지 마세요.
  4. 확인 이메일을 받으면 Codelab의 다음 단계로 진행할 수 있습니다.

입력 데이터를 복사합니다.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

BigQuery 데이터 세트를 만듭니다.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

Google Cloud SDK를 설치 및 초기화하고 Pub 또는 Sub 주제 및 구독을 만듭니다.

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. Cloud Data Fusion 환경 설정

다음 단계에 따라 Cloud Data Fusion API를 사용 설정하고 필요한 권한을 부여합니다.

API 사용 설정

  1. GCP Console API 라이브러리로 이동합니다.
  2. 프로젝트 목록에서 프로젝트를 선택합니다.
  3. API 라이브러리에서 사용 설정할 API ( Cloud Data Fusion API, Cloud Pub/Sub API)를 선택합니다. API를 찾는 데 도움이 필요하면 검색 필드 및 필터를 사용하세요.
  4. API 페이지에서 사용 설정을 클릭합니다.

Cloud Data Fusion 인스턴스 만들기

  1. GCP 콘솔에서 프로젝트 ID를 선택합니다.
  2. 왼쪽 메뉴에서 Data Fusion을 선택한 후 페이지 중간에 있는 '인스턴스 만들기' 버튼을 클릭하거나 (첫 번째 생성), 상단 메뉴에서 '인스턴스 만들기' 버튼을 클릭합니다 (추가 생성).

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. 인스턴스 이름을 입력합니다. Enterprise를 선택합니다.

5af91e46917260ff.png

  1. 만들기 버튼을 클릭합니다.

인스턴스 권한 설정

인스턴스를 만든 후 다음 단계에 따라 인스턴스와 연결된 서비스 계정에 프로젝트에 대한 권한을 부여합니다.

  1. 인스턴스 이름을 클릭하여 인스턴스 세부정보 페이지로 이동합니다.

76ad691f795e1ab3.png

  1. 서비스 계정 복사

6c91836afb72209d.png

  1. 프로젝트의 IAM 페이지로 이동합니다.
  2. IAM 권한 페이지에서 추가 버튼을 클릭하여 서비스 계정에 Cloud Data Fusion API 서비스 에이전트 역할을 부여합니다. '서비스 계정'을 붙여넣습니다. '새 구성원' 필드에서 서비스 관리 -> Cloud Data Fusion API 서버 에이전트 역할입니다.

36f03d11c2a4ce0.png

  1. + 다른 역할 추가 (또는 Cloud Data Fusion API 서비스 에이전트 수정)를 클릭하여 Pub/Sub 구독자 역할을 추가합니다.

b4bf5500b8cbe5f9.png

  1. 저장을 클릭합니다.

이 단계가 완료되면 Cloud Data Fusion 인스턴스 페이지 또는 인스턴스의 세부정보 페이지에서 인스턴스 보기 링크를 클릭하여 Cloud Data Fusion 사용을 시작할 수 있습니다.

방화벽 규칙을 설정합니다.

  1. GCP 콘솔로 이동합니다. -> VPC 네트워크 -> default-allow-ssh 규칙의 존재 여부를 확인하는 방화벽 규칙입니다.

102adef44bbe3a45.png

  1. 그렇지 않은 경우 기본 네트워크에 대한 모든 인그레스 SSH 트래픽을 허용하는 방화벽 규칙을 추가합니다.

명령줄 사용:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

UI 사용: 방화벽 규칙 만들기를 클릭하고 정보를 입력합니다.

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. 파이프라인용 노드 빌드

GCP에 Cloud Data Fusion 환경이 마련되었으니 이제 다음 단계에 따라 Cloud Data Fusion에서 데이터 파이프라인 빌드를 시작해 보겠습니다.

  1. Cloud Data Fusion 창의 작업 열에서 인스턴스 보기 링크를 클릭합니다. 다른 페이지로 리디렉션됩니다. 제공된 url을 클릭하여 Cloud Data Fusion 인스턴스를 엽니다. '둘러보기 시작' 클릭 옵션 또는 '아니요'입니다. 버튼을 클릭합니다.
  2. '햄버거' 펼치기 메뉴에서 파이프라인을 선택합니다 -> 목록

317820def934a00a.png

  1. 오른쪽 상단의 녹색 + 버튼을 클릭한 다음 파이프라인 만들기를 선택합니다. 또는 '만들기' 클릭 파이프라인 링크를 생성합니다

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. 파이프라인 스튜디오가 나타나면 왼쪽 상단의 드롭다운에서 데이터 파이프라인 - 실시간을 선택합니다.

372a889a81da5e66.png

  1. Data Pipelines UI의 왼쪽 패널에는 파이프라인의 노드를 하나 이상 선택할 수 있는 필터, 소스, 변환, 분석, 싱크, 오류 핸들러, 알림과 같은 다양한 섹션이 표시됩니다.

c63de071d4580f2f.png

소스 노드를 선택합니다.

  1. 왼쪽 플러그인 팔레트의 소스 섹션 아래에서 Data Pipelines UI에 표시되는 Google Cloud PubSub 노드를 더블클릭합니다.
  2. PubSub 소스 노드를 가리키고 속성을 클릭합니다.

ed857a5134148d7b.png

  1. 필수 입력란을 기재합니다. 다음 필드를 설정합니다.
  • 라벨 = {모든 텍스트}
  • 참조 이름 = {모든 텍스트}
  • 프로젝트 ID = 자동 감지
  • 구독 = Pub/Sub 주제 만들기 섹션에서 생성된 구독 (예: your-sub)
  • 주제 = Pub/Sub 주제 만들기 섹션에서 생성된 주제 (예: your-topic)
  1. 자세한 설명을 보려면 문서를 클릭하세요. 확인 버튼을 클릭하여 모든 입력 정보의 유효성을 검사합니다. 녹색 '오류가 없습니다.' 는 성공을 나타냅니다.

5c2774338b66bebe.png

  1. Pub/Sub 속성을 닫으려면 X 버튼을 클릭합니다.

변환 노드를 선택합니다.

  1. 왼쪽에 있는 플러그인 팔레트의 변환 섹션에서 데이터 파이프라인 UI에 표시되는 프로젝션 노드를 더블클릭합니다. Pub/Sub 소스 노드를 Projection 변환 노드에 연결합니다.
  2. 투영 노드를 가리키고 속성을 클릭합니다.

b3a9a3878879bfd7.png

  1. 필수 입력란을 기재합니다. 다음 필드를 설정합니다.
  • Convert = message를 바이트 유형에서 문자열 유형으로 변환합니다.
  • 삭제할 필드 = {모든 필드}
  • 유지할 필드 = {message, timestamp, attributes} (예: Pub/Sub에서 전송한 속성: key=‘filename':value=‘patients’)
  • 이름을 바꿀 필드 = {message, timestamp}
  1. 자세한 설명을 보려면 문서를 클릭하세요. 확인 버튼을 클릭하여 모든 입력 정보의 유효성을 검사합니다. 녹색 '오류가 없습니다.' 는 성공을 나타냅니다.

b8c2f8efe18234ff.png

  1. 왼쪽에 있는 플러그인 팔레트의 Transform(변환) 섹션에서 Data Pipelines UI에 표시되는 Wrangler 노드를 더블클릭합니다. Projection 변환 노드를 Wrangler 변환 노드에 연결합니다. Wrangler 노드를 가리킨 다음 Properties(속성)를 클릭합니다.

aa44a4db5fe6623a.png

  1. 작업 드롭다운을 클릭하고 가져오기를 선택하여 저장된 스키마 (예: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ 스키마 (Patients).json)를 가져옵니다.
  2. 마지막 필드 옆에 있는 + 버튼을 클릭하고 'Null'을 선택하여 출력 스키마에 TIMESTAMP 필드가 없는 경우 이를 추가합니다. 체크박스를 선택합니다.
  3. 필수 입력란을 기재합니다. 다음 필드를 설정합니다.
  • 라벨 = {모든 텍스트}
  • 입력란 이름 = {*}
  • Precondition = {attributes.get("filename") != "patients"} PubSub 소스 노드에서 전송된 각 기록 또는 메시지 유형 (예: 환자, 제공업체, 알레르기 등)을 구별합니다.
  1. 자세한 설명을 보려면 문서를 클릭하세요. 확인 버튼을 클릭하여 모든 입력 정보의 유효성을 검사합니다. 녹색 '오류가 없습니다.' 는 성공을 나타냅니다.

3b8e552cd2e3442c.png

  1. 원하는 순서로 열 이름을 설정하고 필요하지 않은 필드는 삭제합니다. 다음 코드 스니펫을 복사하여 레시피 상자에 붙여넣습니다.
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. 데이터 마스킹 및 익명화는 Batch-Codelab - CDF를 통해 BigQuery로 CSV를 참조하세요. 또는 레시피 상자에 이 코드 스니펫 mask-number SSN xxxxxxx####을 추가합니다.
  2. 변환 속성 창을 닫으려면 X 버튼을 클릭합니다.

싱크 노드를 선택합니다.

  1. 왼쪽에 있는 플러그인 팔레트의 싱크 섹션에서 Data Pipeline UI에 표시되는 BigQuery 노드를 더블클릭합니다. Wrangler 변환 노드를 BigQuery 싱크 노드에 연결합니다.
  2. BigQuery 싱크 노드를 가리킨 다음 '속성'을 클릭합니다.

1be711152c92c692.png

  1. 필수 입력란을 작성합니다.
  • 라벨 = {모든 텍스트}
  • 참조 이름 = {모든 텍스트}
  • 프로젝트 ID = 자동 감지
  • 데이터 세트 = 현재 프로젝트에서 사용되는 BigQuery 데이터 세트 (예: DATASET_ID)
  • 테이블 = {table name}
  1. 자세한 설명을 보려면 문서를 클릭하세요. 확인 버튼을 클릭하여 모든 입력 정보의 유효성을 검사합니다. 녹색 '오류가 없습니다.' 는 성공을 나타냅니다.

bba71de9f31e842a.png

  1. BigQuery 속성을 닫으려면 X 버튼을 클릭합니다.

5. 실시간 데이터 파이프라인 빌드

이전 섹션에서는 Cloud Data Fusion에서 데이터 파이프라인을 빌드하는 데 필요한 노드를 만들었습니다. 이 섹션에서는 노드를 연결하여 실제 파이프라인을 빌드합니다.

파이프라인의 모든 노드 연결

  1. 연결 화살표를 드래그 > 소스 노드의 오른쪽 가장자리에 배치하고 대상 노드의 왼쪽 가장자리에 드롭합니다.
  2. 파이프라인에는 동일한 PubSub 소스 노드에서 게시된 메시지를 가져오는 여러 브랜치가 있을 수 있습니다.

b22908cc35364cdd.png

  1. 파이프라인 이름을 지정합니다.

이상입니다. 배포 및 실행할 첫 번째 실시간 데이터 파이프라인을 만들었습니다.

Cloud Pub/Sub를 통해 메시지 전송

Pub/Sub UI 사용:

  1. GCP 콘솔로 이동합니다. -> Pub/Sub -> 주제를 선택하고 내 주제를 선택한 후 상단 메뉴에서 '메시지 게시'를 클릭합니다.

d65b2a6af1668ecd.png

  1. Message 입력란에는 한 번에 하나의 레코드 행만 배치합니다. +속성 추가 버튼을 클릭합니다. 키 = filename, 값 = <레코드 유형> 제공 (예: 환자, 의료 서비스 제공자, 알레르기 등)
  2. 게시 버튼을 클릭하여 메시지를 보냅니다.

gcloud 명령어 사용:

  1. 메시지를 수동으로 제공합니다.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. catsed Unix 명령어를 사용하여 메시지를 반자동 제공합니다. 이 명령어는 여러 매개변수를 사용하여 반복적으로 실행할 수 있습니다.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. 파이프라인 구성, 배포, 실행

이제 데이터 파이프라인을 개발했으므로 Cloud Data Fusion에서 데이터 파이프라인을 배포하고 실행할 수 있습니다.

1bb5b0b8e2953ffa.png

  1. 구성 기본값을 유지합니다.
  2. 미리보기를 클릭하여 데이터를 미리 봅니다**.** 이전 창으로 다시 전환하려면 **미리보기** 를 다시 클릭합니다. **실행**을 클릭하여 미리보기 모드에서 파이프라인을 실행할 수도 있습니다.

b3c891e5e1aa20ae.png

  1. 로그를 클릭하여 로그를 확인합니다.
  2. 저장을 클릭하여 모든 변경사항을 저장합니다.
  3. 새 파이프라인을 빌드할 때 저장된 파이프라인 구성을 가져오려면 가져오기를 클릭합니다.
  4. 내보내기를 클릭하여 파이프라인 구성을 내보냅니다.
  5. 배포를 클릭하여 파이프라인을 배포합니다.
  6. 배포되면 실행을 클릭하고 파이프라인이 완료될 때까지 기다립니다.

f01ba6b746ba53a.png

  1. 언제든지 중지를 클릭하여 파이프라인 실행을 중지할 수 있습니다.
  2. 작업 버튼에서 복제를 선택하여 파이프라인을 복제할 수 있습니다.
  3. 작업 버튼에서 내보내기를 선택하여 파이프라인 구성을 내보낼 수 있습니다.

28ea4fc79445fad2.png

  1. 실행 기록, 레코드, 오류 로그, 경고 차트를 표시하려면 요약을 클릭합니다.

7. 유효성 검사

이 섹션에서는 데이터 파이프라인의 실행을 검증합니다.

  1. 파이프라인이 성공적으로 실행되었고 지속적으로 실행되었는지 검증합니다.

1644DFAc4a2d819d.png

  1. BigQuery 테이블이 TIMESTAMP를 기준으로 업데이트된 레코드로 로드되었는지 확인합니다. 이 예시에서는 2019년 6월 25일에 2개의 환자 레코드 또는 메시지와 1개의 알레르기 레코드 또는 메시지가 Pub/Sub 주제에 게시되었습니다.
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. <your-topic>에 게시된 메시지 확인 수신: <your-sub> 있습니다.
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

결과 보기

실시간 파이프라인이 실행되는 동안 메시지가 Pub/Sub 주제에 게시된 후 결과를 보려면 다음 안내를 따르세요.

  1. BigQuery UI에서 테이블을 쿼리합니다. BigQuery UI로 이동
  2. 아래 쿼리를 자신의 프로젝트 이름, 데이터 세트, 테이블로 업데이트합니다.

6a1fb85bd868abc9.png

8. 삭제

이 튜토리얼에서 사용한 리소스 비용이 Google Cloud Platform 계정에 청구되지 않도록 하는 방법은 다음과 같습니다.

이 가이드를 완료한 후에는 할당량을 차지하지 않고 이후에 요금이 청구되지 않도록 GCP에서 만든 리소스를 삭제할 수 있습니다. 다음 섹션에서는 이러한 리소스를 삭제 또는 해제하는 방법을 설명합니다.

BigQuery 데이터 세트 삭제

다음 안내에 따라 이 튜토리얼의 일부로 만든 BigQuery 데이터 세트를 삭제합니다.

GCS 버킷 삭제

다음 안내에 따라 이 튜토리얼의 일부로 만든 GCS 버킷을 삭제합니다.

Cloud Data Fusion 인스턴스 삭제

다음 안내에 따라 Cloud Data Fusion 인스턴스를 삭제합니다.

프로젝트 삭제

비용이 청구되지 않도록 하는 가장 쉬운 방법은 튜토리얼에서 만든 프로젝트를 삭제하는 것입니다.

프로젝트를 삭제하는 방법은 다음과 같습니다.

  1. GCP 콘솔에서 프로젝트 페이지로 이동합니다. 프로젝트 페이지로 이동
  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 대화상자에서 프로젝트 ID를 입력하고 종료를 클릭하여 프로젝트를 삭제합니다.

9. 축하합니다

수고하셨습니다. Cloud Data Fusion을 사용하여 BigQuery에서 의료 데이터를 수집하는 Codelab을 성공적으로 완료했습니다.

Pub/Sub 주제에 CSV 데이터를 게시한 후 BigQuery에 로드했습니다.

의료 데이터를 실시간으로 로드, 변환, 마스킹하기 위한 데이터 통합 파이프라인을 시각적으로 빌드해 보았습니다.

지금까지 Google Cloud Platform에서 BigQuery로 의료 데이터 분석 여정을 시작하는 데 필요한 주요 단계를 알아봤습니다.