Cloud Data Fusion을 사용하여 BigQuery로 CSV 데이터 수집 - 일괄 수집

1. 소개

12fb66cc134b50ef.png

최종 업데이트: 2020년 2월 28일

이 Codelab에서는 CSV 형식의 의료 데이터를 BigQuery로 대량 수집하는 데이터 수집 패턴을 보여줍니다. 이 실습에서는 Cloud Data Fusion 일괄 데이터 파이프라인을 사용합니다. 사실적인 의료 테스트 데이터가 생성되어 Google Cloud Storage 버킷 (gs://hcls_testing_data_fhir_10_patients/csv/)에서 사용할 수 있습니다.

이 Codelab에서는 다음 내용을 알아봅니다.

  • Cloud Data Fusion을 사용하여 GCS에서 BigQuery로 CSV 데이터를 수집 (일괄 예약 로드)하는 방법
  • Cloud Data Fusion에서 의료 데이터를 대량으로 로드, 변환, 마스킹하기 위한 데이터 통합 파이프라인을 시각적으로 빌드하는 방법

이 Codelab을 실행하려면 무엇이 필요한가요?

  • GCP 프로젝트에 액세스할 수 있어야 합니다.
  • GCP 프로젝트에 소유자 역할이 할당되어 있어야 합니다.
  • 헤더를 포함한 CSV 형식의 의료 데이터

GCP 프로젝트가 없는 경우 이 단계에 따라 새 GCP 프로젝트를 만드세요.

CSV 형식의 의료 데이터가 gs://hcls_testing_data_fhir_10_patients/csv/의 GCS 버킷에 미리 로드되어 있습니다. 각 리소스 CSV 파일에는 고유한 스키마 구조가 있습니다. 예를 들어 Patients.csv의 스키마는 Providers.csv의 스키마와 다릅니다. 사전 로드된 스키마 파일은 gs://hcls_testing_data_fhir_10_patients/csv_schemas에서 확인할 수 있습니다.

새 데이터 세트가 필요한 경우 언제든지 SyntheaTM를 사용하여 생성할 수 있습니다. 그런 다음 '입력 데이터 복사' 단계에서 버킷에서 복사하는 대신 GCS에 업로드합니다.

2. GCP 프로젝트 설정

환경의 셸 변수를 초기화합니다.

PROJECT_ID를 찾으려면 프로젝트 식별을 참고하세요.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

gsutil 도구를 사용하여 입력 데이터와 오류 로그를 저장할 GCS 버킷을 만듭니다.

gsutil mb -l us gs://$BUCKET_NAME

합성 데이터 세트에 액세스합니다.

  1. Cloud 콘솔에 로그인할 때 사용하는 이메일 주소에서 hcls-solutions-external+subscribe@google.com으로 가입을 요청하는 이메일을 보냅니다.
  2. 작업을 확인하는 방법에 대한 안내가 포함된 이메일이 전송됩니다. 525a0fa752e0acae.png
  3. 이메일에 답장하여 그룹에 가입하는 옵션을 사용합니다. 버튼을 클릭하지 마세요.
  4. 확인 이메일을 받으면 Codelab의 다음 단계로 진행할 수 있습니다.

입력 데이터 복사

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

BigQuery 데이터 세트를 만듭니다.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

3. Cloud Data Fusion 환경 설정

다음 단계에 따라 Cloud Data Fusion API를 사용 설정하고 필요한 권한을 부여하세요.

API 사용 설정

  1. GCP Console API 라이브러리로 이동합니다.
  2. 프로젝트 목록에서 프로젝트를 선택합니다.
  3. API 라이브러리에서 사용 설정할 API를 선택합니다. API를 찾는 데 도움이 필요하면 검색 필드 또는 필터를 사용하세요.
  4. API 페이지에서 사용 설정을 클릭합니다.

Cloud Data Fusion 인스턴스를 만듭니다.

  1. GCP 콘솔에서 ProjectID를 선택합니다.
  2. 왼쪽 메뉴에서 Data Fusion을 선택한 다음 페이지 중앙에 있는 인스턴스 만들기 버튼 (첫 번째 생성)을 클릭하거나 상단 메뉴에 있는 인스턴스 만들기 버튼 (추가 생성)을 클릭합니다.

a828690ff3bf3c46.png

8372c944c94737ea.png

  1. 인스턴스 이름을 제공합니다. Enterprise를 선택합니다.

5af91e46917260ff.png

  1. '만들기' 버튼을 클릭합니다.

인스턴스 권한을 설정합니다.

인스턴스를 만든 후 다음 단계에 따라 인스턴스와 연결된 서비스 계정에 프로젝트에 대한 권한을 부여합니다.

  1. 인스턴스 이름을 클릭하여 인스턴스 세부정보 페이지로 이동합니다.

76ad691f795e1ab3.png

  1. 서비스 계정 복사

6c91836afb72209d.png

  1. 프로젝트의 IAM 페이지로 이동합니다.
  2. 이제 IAM 권한 페이지에서 서비스 계정을 새 구성원으로 추가하고 Cloud Data Fusion API 서비스 에이전트 역할을 부여합니다. 추가 버튼을 클릭한 다음 '서비스 계정'을 새 구성원 필드에 붙여넣고 서비스 관리 -> Cloud Data Fusion API 서버 에이전트 역할을 선택합니다.
  3. ea68b28d917a24b1.png
  4. 저장을 클릭합니다.

이 단계가 완료되면 이제부터 Cloud Data Fusion 인스턴스 페이지 또는 인스턴스의 세부정보 페이지에서 인스턴스 보기 링크를 클릭하여 Cloud Data Fusion을 사용할 수 있습니다.

방화벽 규칙을 설정합니다.

  1. GCP 콘솔 -> VPC 네트워크 -> 방화벽 규칙으로 이동하여 default-allow-ssh 규칙이 있는지 확인합니다.

102adef44bbe3a45.png

  1. 그렇지 않은 경우 모든 인그레스 SSH 트래픽을 기본 네트워크로 허용하는 방화벽 규칙을 추가합니다.

명령줄 사용:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

UI 사용: '방화벽 규칙 만들기'를 클릭하고 정보를 입력합니다.

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. 변환을 위한 스키마 빌드

이제 GCP에 Cloud Fusion 환경이 있으므로 스키마를 빌드해 보겠습니다. CSV 데이터를 변환하려면 이 스키마가 필요합니다.

  1. Cloud Data Fusion 창의 작업 열에서 인스턴스 보기 링크를 클릭합니다. 다른 페이지로 리디렉션됩니다. 제공된 URL을 클릭하여 Cloud Data Fusion 인스턴스를 엽니다. 시작 팝업에서 '둘러보기 시작' 또는 '아니요' 버튼을 클릭한 선택사항
  2. '햄버거' 메뉴를 펼치고 파이프라인 -> 스튜디오를 선택합니다.

6561b13f30e36c3a.png

  1. 왼쪽의 플러그인 팔레트에 있는 변환 섹션에서 Wrangler 노드를 더블클릭합니다. 그러면 데이터 파이프라인 UI에 Wrangler 노드가 표시됩니다.

aa44a4db5fe6623a.png

  1. Wrangler 노드를 가리키고 속성을 클릭합니다. Wrangle 버튼을 클릭한 다음 원하는 스키마를 빌드하는 데 필요한 모든 데이터 필드가 포함된 .csv 소스 파일 (예: patients.csv)을 선택합니다.
  2. 각 열 이름 (예: 본문) 옆에 있는 아래쪽 화살표 (열 변환)를 클릭합니다. 802edca8a97da18.png
  3. 기본적으로 초기 가져오기는 데이터 파일에 열이 하나만 있다고 가정합니다. CSV로 파싱하려면 파싱CSV를 선택한 다음 구분자를 선택하고 필요에 따라 '첫 번째 행을 헤더로 설정' 체크박스를 선택합니다. '적용' 버튼을 클릭합니다.
  4. 본문 필드 옆에 있는 아래쪽 화살표를 클릭하고 열 삭제를 선택하여 본문 필드를 삭제합니다. 또한 열 삭제, 일부 열의 데이터 유형 변경 (기본값은 '문자열' 유형), 열 분할, 열 이름 설정 등 다른 변환을 시도할 수 있습니다.

e6d2cda51ff298e7.png

  1. '열' 및 '변환 단계' 탭에는 출력 스키마와 Wrangler의 레시피가 표시됩니다. 오른쪽 상단에서 적용을 클릭합니다. '유효성 검사' 버튼을 클릭합니다. 녹색 '오류 없음'은 성공을 나타냅니다.

1add853c43f2abee.png

  1. Wrangler Properties에서 Actions 드롭다운을 클릭하여 원하는 스키마를 로컬 저장소로 Export합니다. 나중에 필요한 경우 Import할 수 있습니다.
  2. 나중에 사용할 수 있도록 Wrangler 레시피를 저장합니다.
parse-as-csv :body ',' true
drop body
  1. 랭글러 속성 창을 닫으려면 X 버튼을 클릭합니다.

5. 파이프라인의 노드 빌드

이 섹션에서는 파이프라인 구성요소를 빌드합니다.

  1. 데이터 파이프라인 UI의 왼쪽 상단에서 데이터 파이프라인 - 일괄이 파이프라인 유형으로 선택된 것을 볼 수 있습니다.

af67c42ce3d98529.png

  1. 왼쪽 패널에는 파이프라인의 노드를 선택할 수 있는 필터, 소스, 변환, 분석, 싱크, 조건 및 작업, 오류 핸들러, 알림과 같은 다양한 섹션이 있습니다.

c4438f7682f8b19b.png

소스 노드

  1. 소스 노드를 선택합니다.
  2. 왼쪽의 플러그인 팔레트의 소스 섹션에서 데이터 파이프라인 UI에 표시되는 Google Cloud Storage 노드를 더블클릭합니다.
  3. GCS 소스 노드를 가리키고 속성을 클릭합니다.

87e51a3e8dae8b3f.png

  1. 필수 입력란을 기재합니다. 다음 필드를 설정합니다.
  • 라벨 = {텍스트}
  • 참조 이름 = {텍스트}
  • 프로젝트 ID = 자동 감지
  • 경로 = 현재 프로젝트의 버킷에 대한 GCS URL입니다. 예: gs://$BUCKET_NAME/csv/
  • 형식 = text
  • 경로 필드 = filename
  • Path Filename Only = true
  • Read Files Recursively = true
  1. + 버튼을 클릭하여 GCS 출력 스키마에 'filename' 필드를 추가합니다.
  2. 자세한 설명은 문서를 클릭하세요. '유효성 검사' 버튼을 클릭합니다. 녹색 '오류 없음'은 성공을 나타냅니다.
  3. GCS 속성을 닫으려면 X 버튼을 클릭합니다.

변환 노드

  1. 변환 노드를 선택합니다.
  2. 왼쪽의 플러그인 팔레트에서 변환 섹션 아래에 있는 Wrangler 노드를 더블클릭합니다. 이 노드는 데이터 파이프라인 UI에 표시됩니다. GCS 소스 노드를 Wrangler 변환 노드에 연결합니다.
  3. Wrangler 노드를 가리키고 속성을 클릭합니다.
  4. 작업 드롭다운을 클릭하고 가져오기를 선택하여 저장된 스키마 (예: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json)를 가져오고 이전 섹션에서 저장한 레시피를 붙여넣기합니다.
  5. 또는 변환을 위한 스키마 빌드 섹션의 Wrangler 노드를 재사용합니다.
  6. 필수 입력란을 기재합니다. 다음 필드를 설정합니다.
  • Label = {텍스트}
  • 입력란 이름 = {*}
  • 사전 조건 = {filename != "patients.csv"}를 사용하여 각 입력 파일 (예: patients.csv, providers.csv, allergies.csv 등)을 소스 노드와 구분합니다.

2426f8f0a6c4c670.png

  1. 사용자가 제공한 JavaScript를 실행하여 레코드를 추가로 변환하는 JavaScript 노드를 추가합니다. 이 Codelab에서는 JavaScript 노드를 활용하여 각 레코드 업데이트의 타임스탬프를 가져옵니다. Wrangler 변환 노드를 JavaScript 변환 노드에 연결합니다. JavaScript 속성을 열고 다음 함수를 추가합니다.

75212f9ad98265a8.png

function transform(input, emitter, context) {
  input.TIMESTAMP = (new Date()).getTime()*1000;
  emitter.emit(input);
}
  1. + 기호를 클릭하여 출력 스키마에 TIMESTAMP라는 필드를 추가합니다 (없는 경우). 타임스탬프를 데이터 유형으로 선택합니다.

4227389b57661135.png

  1. 자세한 설명은 문서를 클릭하세요. '유효성 검사' 버튼을 클릭하여 모든 입력 정보를 검사합니다. 녹색 '오류가 발견되지 않음'은 성공을 나타냅니다.
  2. 변환 속성 창을 닫으려면 X 버튼을 클릭합니다.

데이터 마스킹 및 익명화

  1. 열에서 아래쪽 화살표를 클릭하고 요구사항에 따라 데이터 마스크 선택에서 마스킹 규칙을 적용하여 개별 데이터 열을 선택할 수 있습니다 (예: 주민등록번호 열).

bb1eb067dd6e0946.png

  1. Wrangler 노드의 Recipe 창에서 지시어를 추가할 수 있습니다. 예를 들어 익명화 목적으로 다음 구문에 따라 해싱 알고리즘과 함께 해시 지시문을 사용합니다.
hash <column> <algorithm> <encode>

<column>: name of the column
<algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.)
<encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

cbcc9a0932f53197.png

싱크 노드

  1. 싱크 노드를 선택합니다.
  2. 왼쪽의 플러그인 팔레트의 싱크 섹션에서 BigQuery 노드를 더블클릭하면 데이터 파이프라인 UI에 표시됩니다.
  3. BigQuery 싱크 노드를 가리키고 속성을 클릭합니다.

1be711152c92c692.png

  1. 필수 입력란을 작성합니다. 다음 필드를 설정합니다.
  • 라벨 = {텍스트}
  • 참조 이름 = {텍스트}
  • 프로젝트 ID = 자동 감지
  • 데이터 세트 = 현재 프로젝트에서 사용되는 BigQuery 데이터 세트 (예: DATASET_ID)
  • Table = {table name}
  1. 자세한 설명은 문서를 클릭하세요. '유효성 검사' 버튼을 클릭하여 모든 입력 정보를 검사합니다. 녹색 '오류가 발견되지 않음'은 성공을 나타냅니다.

c5585747da2ef341.png

  1. BigQuery 속성을 닫으려면 X 버튼을 클릭합니다.

6. 일괄 데이터 파이프라인 빌드

파이프라인의 모든 노드 연결

  1. 소스 노드의 오른쪽 가장자리에 있는 연결 화살표(>)를 드래그하여 대상 노드의 왼쪽 가장자리에 놓습니다.
  2. 파이프라인에는 동일한 GCS 소스 노드에서 입력 파일을 가져오는 여러 분기가 있을 수 있습니다.

67510ab46bd44d36.png

  1. 파이프라인 이름을 지정합니다.

이상입니다. 첫 번째 일괄 데이터 파이프라인을 만들었으므로 파이프라인을 배포하고 실행할 수 있습니다.

이메일을 통해 파이프라인 알림 보내기 (선택사항)

파이프라인 알림 SendEmail 기능을 활용하려면 가상 머신 인스턴스에서 메일을 보내도록 메일 서버를 설정해야 합니다. 자세한 내용은 아래 참조 링크를 확인하세요.

인스턴스에서 이메일 보내기 | Compute Engine 문서

이 Codelab에서는 다음 단계에 따라 Mailgun을 통해 메일 릴레이 서비스를 설정합니다.

  1. Mailgun으로 이메일 보내기 | Compute Engine 문서의 안내에 따라 Mailgun 계정을 설정하고 이메일 릴레이 서비스를 구성합니다. 추가 수정사항은 아래를 참고하세요.
  2. 모든 수신자의 이메일 주소를 Mailgun의 승인된 목록에 추가합니다. 이 목록은 왼쪽 패널의 Mailgun> Sending> Overview 옵션에서 확인할 수 있습니다.

7e6224cced3fa4e0.png fa78739f1ddf2dc2.png

수신자가 support@mailgun.net에서 보낸 이메일에서 '동의'를 클릭하면 파이프라인 알림 이메일을 수신할 수 있도록 이메일 주소가 승인된 목록에 저장됩니다.

72847c97fd5fce0f.png

  1. '시작하기 전에' 섹션의 3단계 - 다음과 같이 방화벽 규칙을 만듭니다.

75b063c165091912.png

  1. 'Mailgun을 Postfix 관련 메일 릴레이로 구성'의 3단계 안내에 설명된 대로 로컬 전용 대신 인터넷 사이트 또는 스마트호스트가 있는 인터넷을 선택합니다.

8fd8474a4ef18f16.png

  1. 'Postfix를 사용하여 Mailgun을 메일 릴레이로 구성'의 4단계 vi /etc/postfix/main.cf를 수정하여 mynetworks 끝에 10.128.0.0/9를 추가합니다.

249fbf3edeff1ce8.png

  1. vi /etc/postfix/master.cf를 편집하여 기본 smtp (25)를 포트 587로 변경합니다.

86c82cf48c687e72.png

  1. Data Fusion 스튜디오의 오른쪽 상단에서 구성을 클릭합니다. 파이프라인 알림을 클릭하고 + 버튼을 클릭하여 알림 창을 엽니다. SendEmail을 선택합니다.

dc079a91f1b0da68.png

  1. 이메일 구성 양식을 작성합니다. 각 알림 유형의 실행 조건 드롭다운에서 완료, 성공 또는 실패를 선택합니다. 워크플로 토큰 포함 = false인 경우 메시지 필드의 정보만 전송됩니다. 워크플로 토큰 포함 = true인 경우 메시지 필드의 정보와 워크플로 토큰 세부정보가 전송됩니다. 프로토콜에는 소문자를 사용해야 합니다. 발신자에 회사 이메일 주소가 아닌 '가짜' 이메일을 사용합니다.

1fa619b6ce28f5e5.png

7. 파이프라인 구성, 배포, 실행/예약

db612e62a1c7ab7e.png

  1. Data Fusion 스튜디오의 오른쪽 상단에서 구성을 클릭합니다. 엔진 구성에서 Spark를 선택합니다. 구성 창에서 저장을 클릭합니다.

8ecf7c243c125882.png

  1. 미리보기를 클릭하여 데이터를 미리 보고**,** 다시 **미리보기** 를 클릭하여 이전 창으로 전환합니다. 미리보기 모드에서 파이프라인을 **실행** 할 수도 있습니다.

b3c891e5e1aa20ae.png

  1. 로그를 클릭하여 로그를 확인합니다.
  2. 저장을 클릭하여 모든 변경사항을 저장합니다.
  3. 새 파이프라인을 빌드할 때 저장된 파이프라인 구성을 가져오려면 가져오기를 클릭합니다.
  4. 내보내기를 클릭하여 파이프라인 구성을 내보냅니다.
  5. 배포를 클릭하여 파이프라인을 배포합니다.
  6. 배포되면 실행을 클릭하고 파이프라인 실행이 완료될 때까지 기다립니다.

bb06001d46a293db.png

  1. 작업 버튼에서 '복제'를 선택하여 파이프라인을 복제할 수 있습니다.
  2. 작업 버튼에서 내보내기를 선택하여 파이프라인 구성을 내보낼 수 있습니다.
  3. 원하는 경우 스튜디오 창의 왼쪽 또는 오른쪽 가장자리에서 인바운드 트리거 또는 아웃바운드 트리거를 클릭하여 파이프라인 트리거를 설정합니다.
  4. 예약을 클릭하여 파이프라인이 주기적으로 실행되고 데이터를 로드하도록 예약합니다.

4167fa67550a49d5.png

  1. 요약에는 실행 기록, 기록, 오류 로그, 경고의 차트가 표시됩니다.

8. 유효성 검사

  1. 유효성 검사 파이프라인이 성공적으로 실행되었습니다.

7dee6e662c323f14.png

  1. BigQuery 데이터 세트에 모든 테이블이 있는지 확인합니다.
bq ls $PROJECT_ID:$DATASET_ID
     tableId       Type    Labels   Time Partitioning
----------------- ------- -------- -------------------
 Allergies         TABLE
 Careplans         TABLE
 Conditions        TABLE
 Encounters        TABLE
 Imaging_Studies   TABLE
 Immunizations     TABLE
 Medications       TABLE
 Observations      TABLE
 Organizations     TABLE
 Patients          TABLE
 Procedures        TABLE
 Providers         TABLE
  1. 알림 이메일을 수신합니다 (구성된 경우).

결과 보기

파이프라인을 실행한 후 다음 단계에 따라 결과를 확인합니다.

  1. BigQuery UI에서 테이블을 쿼리합니다. BigQuery UI로 이동
  2. 아래 쿼리를 자신의 프로젝트 이름, 데이터 세트, 테이블로 업데이트합니다.

e32bfd5d965a117f.png

9. 삭제

이 튜토리얼에서 사용한 리소스 비용이 Google Cloud Platform 계정에 청구되지 않도록 하는 방법은 다음과 같습니다.

튜토리얼을 완료한 후에는 할당량을 차지하지 않고 이후에 요금이 청구되지 않도록 GCP에서 만든 리소스를 삭제할 수 있습니다. 다음 섹션에서는 이러한 리소스를 삭제 또는 해제하는 방법을 설명합니다.

BigQuery 데이터 세트 삭제

이 튜토리얼에서 만든 BigQuery 데이터 세트를 삭제하려면 다음 안내를 따르세요.

GCS 버킷 삭제

이 튜토리얼의 일부로 만든 GCS 버킷을 삭제하려면 다음 안내를 따르세요.

Cloud Data Fusion 인스턴스 삭제

다음 안내에 따라 Cloud Data Fusion 인스턴스를 삭제합니다.

프로젝트 삭제

비용이 청구되지 않도록 하는 가장 쉬운 방법은 튜토리얼에서 만든 프로젝트를 삭제하는 것입니다.

프로젝트를 삭제하는 방법은 다음과 같습니다.

  1. GCP 콘솔에서 프로젝트 페이지로 이동합니다. 프로젝트 페이지로 이동
  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 대화상자에서 프로젝트 ID를 입력하고 종료를 클릭하여 프로젝트를 삭제합니다.

10. 축하합니다

축하합니다. Cloud Data Fusion을 사용하여 BigQuery에 의료 데이터를 수집하는 코드 랩을 성공적으로 완료했습니다.

Google Cloud Storage에서 BigQuery로 CSV 데이터를 가져왔습니다.

의료 데이터를 일괄적으로 로드, 변환, 마스킹하기 위한 데이터 통합 파이프라인을 시각적으로 빌드했습니다.

이제 Google Cloud Platform에서 BigQuery를 사용하여 의료 데이터 분석 여정을 시작하는 데 필요한 주요 단계를 알게 되었습니다.