Cloud Data Fusion을 사용하여 BigQuery로 CSV 데이터 수집 - 일괄 수집

1. 소개

12fb66cc134b50ef.png

최종 업데이트: 2020년 2월 28일

이 Codelab에서는 CSV 형식의 의료 데이터를 BigQuery로 일괄 수집하는 데이터 수집 패턴을 보여줍니다. 이 실습에서는 Cloud Data Fusion 일괄 데이터 파이프라인을 사용합니다. 현실적인 의료 테스트 데이터가 생성되어 Google Cloud Storage 버킷 (gs://hcls_testing_data_fhir_10_patients/csv/)에 제공되었습니다.

이 Codelab에서 학습할 내용은 다음과 같습니다.

  • Cloud Data Fusion을 사용하여 GCS에서 BigQuery로 CSV 데이터 (일괄 예약 로드)를 수집하는 방법을 설명합니다.
  • 의료 데이터를 일괄 로드, 변환, 마스킹하기 위해 Cloud Data Fusion에서 데이터 통합 파이프라인을 시각적으로 빌드하는 방법

이 Codelab을 실행하려면 무엇이 필요한가요?

  • GCP 프로젝트에 액세스해야 합니다.
  • GCP 프로젝트의 소유자 역할을 할당받아야 합니다.
  • 헤더를 포함한 CSV 형식의 의료 데이터입니다.

GCP 프로젝트가 없으면 이 단계에 따라 새 GCP 프로젝트를 만듭니다.

CSV 형식의 의료 데이터는 GCS 버킷(gs://hcls_testing_data_fhir_10_patients/csv/)에 미리 로드되어 있습니다. 각 리소스 CSV 파일에는 고유한 스키마 구조가 있습니다. 예를 들어 Patients.csv는 Providers.csv와 다른 스키마를 사용합니다. 미리 로드된 스키마 파일은 gs://hcls_testing_data_fhir_10_patients/csv_schemas에서 확인할 수 있습니다.

새 데이터 세트가 필요한 경우 언제든지 SyntheaTM를 사용하여 데이터 세트를 생성할 수 있습니다. 그런 다음 입력 데이터 복사 단계에서 버킷의 파일을 복사하는 대신 GCS에 업로드합니다.

2. GCP 프로젝트 설정

환경에 맞게 셸 변수를 초기화합니다.

PROJECT_ID를 찾으려면 프로젝트 식별을 참고하세요.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

GCS 버킷을 만들어 gsutil 도구를 사용하여 입력 데이터와 오류 로그를 저장합니다.

gsutil mb -l us gs://$BUCKET_NAME

합성 데이터 세트에 대한 액세스 권한을 얻습니다.

  1. Cloud 콘솔에 로그인할 때 사용하는 이메일 주소에서 hcls-solutions-external+subscribe@google.com으로 가입을 요청하는 이메일을 전송합니다.
  2. 작업을 확인하는 방법에 대한 지침이 포함된 이메일이 전송됩니다. 525a0fa752e0acae.png
  3. 옵션을 사용하여 이메일에 답장하고 그룹에 참여하세요. 버튼을 클릭하지 마세요.
  4. 확인 이메일을 받으면 Codelab의 다음 단계로 진행할 수 있습니다.

입력 데이터를 복사합니다.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

BigQuery 데이터 세트를 만듭니다.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

3. Cloud Data Fusion 환경 설정

다음 단계에 따라 Cloud Data Fusion API를 사용 설정하고 필요한 권한을 부여합니다.

API를 사용 설정합니다.

  1. GCP Console API 라이브러리로 이동합니다.
  2. 프로젝트 목록에서 프로젝트를 선택합니다.
  3. API 라이브러리에서 사용 설정할 API를 선택합니다. API를 찾는 데 도움이 필요하면 검색 필드 또는 필터를 사용하세요.
  4. API 페이지에서 사용을 클릭합니다.

Cloud Data Fusion 인스턴스를 만듭니다.

  1. GCP 콘솔에서 프로젝트 ID를 선택합니다.
  2. 왼쪽 메뉴에서 Data Fusion을 선택한 후 페이지 중간에 있는 '인스턴스 만들기' 버튼을 클릭하거나 (첫 번째 생성), 상단 메뉴에서 '인스턴스 만들기' 버튼을 클릭합니다 (추가 생성).

a828690ff3bf3c46.png

8372c944c94737ea.png

  1. 인스턴스 이름을 입력합니다. Enterprise를 선택합니다.

5af91e46917260ff.png

  1. 만들기 버튼을 클릭합니다.

인스턴스 권한 설정

인스턴스를 만든 후 다음 단계에 따라 인스턴스와 연결된 서비스 계정에 프로젝트에 대한 권한을 부여합니다.

  1. 인스턴스 이름을 클릭하여 인스턴스 세부정보 페이지로 이동합니다.

76ad691f795e1ab3.png

  1. 서비스 계정 복사

6c91836afb72209d.png

  1. 프로젝트의 IAM 페이지로 이동합니다.
  2. 이제 IAM 권한 페이지에서 서비스 계정을 새 구성원으로 추가하고 Cloud Data Fusion API 서비스 에이전트 역할을 부여합니다. 추가 버튼을 클릭한 다음 '서비스 계정'을 붙여넣습니다. '새 구성원' 필드에서 서비스 관리 -> Cloud Data Fusion API 서버 에이전트 역할입니다.
  3. ea68b28d917a24b1.png
  4. 저장을 클릭합니다.

이 단계가 완료되면 Cloud Data Fusion 인스턴스 페이지 또는 인스턴스의 세부정보 페이지에서 인스턴스 보기 링크를 클릭하여 Cloud Data Fusion 사용을 시작할 수 있습니다.

방화벽 규칙을 설정합니다.

  1. GCP 콘솔로 이동합니다. -> VPC 네트워크 -> default-allow-ssh 규칙의 존재 여부를 확인하는 방화벽 규칙입니다.

102adef44bbe3a45.png

  1. 그렇지 않은 경우 기본 네트워크에 대한 모든 인그레스 SSH 트래픽을 허용하는 방화벽 규칙을 추가합니다.

명령줄 사용:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

UI 사용: 방화벽 규칙 만들기를 클릭하고 정보를 입력합니다.

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. 변환을 위한 스키마 빌드

이제 GCP에 Cloud Fusion 환경이 있으므로 스키마를 빌드해 보겠습니다. CSV 데이터를 변환하려면 이 스키마가 필요합니다.

  1. Cloud Data Fusion 창의 작업 열에서 인스턴스 보기 링크를 클릭합니다. 다른 페이지로 리디렉션됩니다. 제공된 url을 클릭하여 Cloud Data Fusion 인스턴스를 엽니다. '둘러보기 시작' 클릭 옵션 또는 '아니요'입니다. 버튼을 클릭합니다.
  2. '햄버거' 펼치기 메뉴에서 파이프라인을 선택합니다 -> 스튜디오

6561b13f30e36c3a.png

  1. 왼쪽에 있는 플러그인 팔레트의 Transform(변환) 섹션에서 Data Pipelines UI에 표시될 Wrangler 노드를 더블클릭합니다.

aa44a4db5fe6623a.png

  1. Wrangler 노드를 가리킨 다음 Properties(속성)를 클릭합니다. Wrangle 버튼을 클릭한 다음 원하는 스키마를 빌드하려면 모든 데이터 필드가 있어야 하는 .csv 소스 파일 (예: 환자.csv)을 선택합니다.
  2. 각 열 이름 (예: 본문) 옆에 있는 아래쪽 화살표 (열 변환)를 클릭합니다. 802edca8a97da18.png
  3. 기본적으로 처음 가져오기에서는 데이터 파일에 열이 하나만 있다고 가정합니다. CSV로 파싱하려면 파싱CSV를 선택한 다음 구분 기호를 선택하고 '첫 번째 행을 헤더로 설정'을 선택합니다. 체크박스를 선택합니다. 적용 버튼을 클릭합니다.
  4. 본문 필드 옆에 있는 아래쪽 화살표를 클릭하고 열 삭제를 선택하여 본문 필드를 삭제하세요. 또한 열 삭제, 일부 열의 데이터 유형 변경 (기본값은 '문자열' 유형), 열 분할, 열 이름 설정 등의 다른 변환을 시도해 볼 수 있습니다.

e6d2cda51ff298e7.png

  1. '열' '변환 단계'를 살펴보겠습니다 탭에는 출력 스키마와 Wrangler의 레시피가 표시됩니다. 오른쪽 상단에서 적용을 클릭합니다. 확인 버튼을 클릭합니다. 녹색 '발견된 오류 없음' 는 성공을 나타냅니다.

1add853c43f2abee.png

  1. 필요한 경우 Wrangler 속성에서 작업 드롭다운을 클릭하여 나중에 사용할 수 있도록 원하는 스키마를 로컬 스토리지로 내보내기합니다.가져오기
  2. 나중에 사용할 수 있도록 Wrangler 레시피를 저장합니다.
parse-as-csv :body ',' true
drop body
  1. Wrangler Properties(랭글러 속성) 창을 닫으려면 X 버튼을 클릭합니다.

5. 파이프라인용 노드 빌드

이 섹션에서는 파이프라인 구성요소를 빌드합니다.

  1. Data Pipelines UI의 왼쪽 상단에서 파이프라인 유형으로 Data Pipeline - Batch(데이터 파이프라인 - 일괄)가 선택되어 있습니다.

af67c42ce3d98529.png

  1. 왼쪽 패널에는 파이프라인의 노드를 하나 이상 선택할 수 있는 필터, 소스, 변환, 분석, 싱크, 조건 및 작업, 오류 핸들러, 알림과 같은 다양한 섹션이 있습니다.

c4438f7682f8b19b.png

소스 노드

  1. 소스 노드를 선택합니다.
  2. 왼쪽에 있는 플러그인 팔레트의 소스 섹션에서 데이터 파이프라인 UI에 표시되는 Google Cloud Storage 노드를 더블클릭합니다.
  3. GCS 소스 노드를 가리키고 속성을 클릭합니다.

87e51a3e8dae8b3f.png

  1. 필수 입력란을 기재합니다. 다음 필드를 설정합니다.
  • 라벨 = {모든 텍스트}
  • 참조 이름 = {모든 텍스트}
  • 프로젝트 ID = 자동 감지
  • Path = 현재 프로젝트의 버킷에 대한 GCS URL입니다. 예를 들면 gs://$BUCKET_NAME/csv/입니다.
  • 형식 = 텍스트
  • 경로 필드 = 파일 이름
  • 경로 파일 이름만 = true
  • 재귀적으로 파일 읽기 = true
  1. 'filename' 필드 추가 + 버튼을 클릭하여 GCS 출력 스키마로 변경합니다.
  2. 자세한 설명을 보려면 문서를 클릭하세요. 확인 버튼을 클릭합니다. 녹색 '발견된 오류 없음' 는 성공을 나타냅니다.
  3. GCS 속성을 닫으려면 X 버튼을 클릭합니다.

node 변환

  1. 변환 노드를 선택합니다.
  2. 왼쪽에 있는 플러그인 팔레트의 Transform(변환) 섹션에서 Data Pipelines UI에 표시되는 Wrangler 노드를 더블클릭합니다. GCS 소스 노드를 Wrangler 변환 노드에 연결합니다.
  3. Wrangler 노드를 가리킨 다음 Properties(속성)를 클릭합니다.
  4. 작업 드롭다운을 클릭하고 가져오기를 선택하여 저장된 스키마 (예: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ 스키마 (Patients).json)를 가져오고 이전 섹션의 저장된 레시피를 붙여넣습니다.
  5. 또는 변환을 위한 스키마 빌드 섹션의 Wrangler 노드를 재사용하세요.
  6. 필수 입력란을 기재합니다. 다음 필드를 설정합니다.
  • 라벨 = {모든 텍스트}
  • 입력란 이름 = {*}
  • Precondition = {filename != "patients.csv"}: 소스 노드에서 각 입력 파일 (예: 환자.csv, provider.csv, allergies.csv 등)을 구별합니다.

2426f8f0a6c4c670.png

  1. JavaScript 노드를 추가하여 레코드를 추가로 변환하는 사용자 제공 JavaScript를 실행합니다. 이 Codelab에서는 JavaScript 노드를 활용하여 각 레코드 업데이트의 타임스탬프를 가져옵니다. Wrangler 변환 노드를 JavaScript 변환 노드에 연결합니다. JavaScript 속성을 열고 다음 함수를 추가합니다.

75212f9ad98265a8.png

function transform(input, emitter, context) {
  input.TIMESTAMP = (new Date()).getTime()*1000;
  emitter.emit(input);
}
  1. + 기호를 클릭하여 TIMESTAMP라는 필드를 출력 스키마에 추가합니다 (없는 경우). 데이터 유형으로 타임스탬프를 선택합니다.

4227389b57661135.png

  1. 자세한 설명을 보려면 문서를 클릭하세요. 확인 버튼을 클릭하여 모든 입력 정보의 유효성을 검사합니다. 녹색 '오류가 없습니다.' 는 성공을 나타냅니다.
  2. 변환 속성 창을 닫으려면 X 버튼을 클릭합니다.

데이터 마스킹 및 익명화

  1. 열에서 아래쪽 화살표를 클릭하고 요구사항 (예: SSN 열)에 따라 데이터 마스킹에서 마스킹 규칙을 적용하여 개별 데이터 열을 선택할 수 있습니다.

bb1eb067dd6e0946.png

  1. Wrangler 노드의 Recipe 창에서 더 많은 지시문을 추가할 수 있습니다. 예를 들어 익명화를 위해 다음 구문을 따라 해싱 알고리즘과 함께 해시 지시문을 사용하는 경우입니다.
hash <column> <algorithm> <encode>

<column>: name of the column
<algorithm>: Hashing algorithm (i.e. MD5, SHA-1, etc.)
<encode>: default is true (hashed digest is encoded as hex with left-padding zeros). To disable hex encoding, set <encode> to false.

cbcc9a0932f53197.png

싱크 노드

  1. 싱크 노드를 선택합니다.
  2. 왼쪽에 있는 플러그인 팔레트의 Sink(싱크) 섹션에서 BigQuery 노드를 더블클릭합니다. 그러면 Data Pipelines UI가 표시됩니다.
  3. BigQuery 싱크 노드를 가리킨 다음 '속성'을 클릭합니다.

1be711152c92c692.png

  1. 필수 입력란을 작성합니다. 다음 필드를 설정합니다.
  • 라벨 = {모든 텍스트}
  • 참조 이름 = {모든 텍스트}
  • 프로젝트 ID = 자동 감지
  • 데이터 세트 = 현재 프로젝트에 사용되는 BigQuery 데이터 세트 (예: DATASET_ID)
  • 테이블 = {table name}
  1. 자세한 설명을 보려면 문서를 클릭하세요. 확인 버튼을 클릭하여 모든 입력 정보의 유효성을 검사합니다. 녹색 '오류가 없습니다.' 는 성공을 나타냅니다.

c5585747da2ef341.png

  1. BigQuery 속성을 닫으려면 X 버튼을 클릭합니다.

6. 일괄 데이터 파이프라인 빌드

파이프라인의 모든 노드 연결

  1. 연결 화살표를 드래그 > 소스 노드의 오른쪽 가장자리에 배치하고 대상 노드의 왼쪽 가장자리에 드롭합니다.
  2. 파이프라인에는 동일한 GCS 소스 노드에서 입력 파일을 가져오는 여러 브랜치가 있을 수 있습니다.

67510ab46bd44d36.png

  1. 파이프라인 이름을 지정합니다.

이상입니다. 첫 번째 일괄 데이터 파이프라인을 만들었으므로 이 파이프라인을 배포하고 실행할 수 있습니다.

이메일을 통해 파이프라인 알림 전송 (선택사항)

파이프라인 알림 SendEmail 기능을 활용하려면 가상 머신 인스턴스에서 메일을 전송할 수 있도록 메일 서버를 설정해야 합니다. 자세한 내용은 아래의 참조 링크를 확인하세요.

인스턴스에서 이메일 보내기 | Compute Engine 문서

이 Codelab에서는 다음 단계에 따라 Mailgun을 통해 메일 릴레이 서비스를 설정합니다.

  1. Mailgun으로 이메일 보내기 | Compute Engine 문서를 참조하여 Mailgun으로 계정을 설정하고 이메일 릴레이 서비스를 구성합니다. 추가 수정사항은 다음과 같습니다.
  2. 모든 수신자 추가 이메일 주소를 Mailgun의 승인된 목록에 추가할 수 있습니다. 이 목록은 왼쪽 패널의 Mailgun>Sending>Overview 옵션에서 찾을 수 있습니다.

7e6224cced3fa4e0.png fa78739f1ddf2dc2.png

수신자가 '동의'를 클릭하면 해당 이메일 주소는 파이프라인 알림 이메일을 수신하도록 승인된 목록에 저장됩니다.support@mailgun.net

72847c97fd5fce0f.png

  1. '시작하기 전에'의 3단계 섹션 - 다음과 같이 방화벽 규칙을 만듭니다.

75b063c165091912.png

  1. 'Postfix를 사용하여 Mailgun을 메일 릴레이로 구성'의 3단계 안내에 설명된 대로 로컬 전용 대신 인터넷 사이트 또는 스마트 호스트를 사용하는 인터넷을 선택합니다.

8fd8474a4ef18f16.png

  1. 'Postfix를 사용하여 Mailgun을 메일 릴레이로 구성'의 4단계 vi /etc/postfix/main.cf를 수정하여 mynetworks의 끝에 10.128.0.0/9를 추가합니다.

249fbf3edeff1ce8.png

  1. vi /etc/postfix/master.cf를 수정하여 기본 smtp (25)를 포트 587로 변경합니다.

86c82cf48c687e72.png

  1. Data Fusion 스튜디오의 오른쪽 상단에서 구성을 클릭합니다. 파이프라인 알림을 클릭하고 + 버튼을 클릭하여 알림 창을 엽니다. SendEmail를 선택합니다.

dc079a91f1b0da68.png

  1. 이메일 구성 양식을 작성합니다. 각 알림 유형의 실행 조건 드롭다운에서 완료, 성공 또는 실패를 선택합니다. 워크플로 토큰 포함 = false이면 메시지 필드의 정보만 전송됩니다. 워크플로 토큰 포함 = true인 경우, 메시지 필드와 워크플로 토큰 세부정보 정보가 전송됩니다. 프로토콜에는 소문자를 사용해야 합니다. 'fake'(가짜)를 사용합니다. 발신자의 회사 이메일 주소가 아닌 다른 이메일

1fa619b6ce28f5e5.png

7. 파이프라인 구성, 배포, 실행/예약

db612e62a1c7ab7e.png

  1. Data Fusion 스튜디오의 오른쪽 상단에서 구성을 클릭합니다. 엔진 구성용 Spark를 선택합니다. 구성 창에서 저장을 클릭합니다.

8ecf7c243c125882.png

  1. 미리보기를 클릭하여 데이터 미리보기**,** **미리보기** 를 다시 클릭하여 이전 창으로 전환합니다. 미리보기 모드에서 파이프라인을 **실행** 할 수도 있습니다.

b3c891e5e1aa20ae.png

  1. 로그를 클릭하여 로그를 확인합니다.
  2. 저장을 클릭하여 모든 변경사항을 저장합니다.
  3. 새 파이프라인을 빌드할 때 저장된 파이프라인 구성을 가져오려면 가져오기를 클릭합니다.
  4. 내보내기를 클릭하여 파이프라인 구성을 내보냅니다.
  5. 배포를 클릭하여 파이프라인을 배포합니다.
  6. 배포되면 실행을 클릭하고 파이프라인이 완료될 때까지 기다립니다.

bb06001d46a293db.png

  1. 작업 버튼에서 복제를 선택하여 파이프라인을 복제할 수 있습니다.
  2. 작업 버튼에서 내보내기를 선택하여 파이프라인 구성을 내보낼 수 있습니다.
  3. 원하는 경우 스튜디오 창의 왼쪽 또는 오른쪽 가장자리에서 인바운드 트리거 또는 아웃바운드 트리거를 클릭하여 파이프라인 트리거를 설정합니다.
  4. 예약을 클릭하여 파이프라인을 실행하고 데이터를 주기적으로 로드하도록 예약합니다.

4167fa67550a49d5.png

  1. 요약: 실행 기록, 레코드, 오류 로그, 경고 차트를 보여줍니다.

8. 유효성 검사

  1. Validate 파이프라인이 성공적으로 실행되었습니다.

7dee6e662c323f14.png

  1. BigQuery 데이터 세트에 모든 테이블이 있는지 확인합니다.
bq ls $PROJECT_ID:$DATASET_ID
     tableId       Type    Labels   Time Partitioning
----------------- ------- -------- -------------------
 Allergies         TABLE
 Careplans         TABLE
 Conditions        TABLE
 Encounters        TABLE
 Imaging_Studies   TABLE
 Immunizations     TABLE
 Medications       TABLE
 Observations      TABLE
 Organizations     TABLE
 Patients          TABLE
 Procedures        TABLE
 Providers         TABLE
  1. 알림 이메일 수신 (구성된 경우)

결과 보기

파이프라인을 실행한 후 다음 단계에 따라 결과를 확인합니다.

  1. BigQuery UI에서 테이블을 쿼리합니다. BigQuery UI로 이동
  2. 아래 쿼리를 자신의 프로젝트 이름, 데이터 세트, 테이블로 업데이트합니다.

e32bfd5d965a117f.png

9. 삭제

이 튜토리얼에서 사용한 리소스 비용이 Google Cloud Platform 계정에 청구되지 않도록 하는 방법은 다음과 같습니다.

이 가이드를 완료한 후에는 할당량을 차지하지 않고 이후에 요금이 청구되지 않도록 GCP에서 만든 리소스를 삭제할 수 있습니다. 다음 섹션에서는 이러한 리소스를 삭제 또는 해제하는 방법을 설명합니다.

BigQuery 데이터 세트 삭제

다음 안내에 따라 이 튜토리얼의 일부로 만든 BigQuery 데이터 세트를 삭제합니다.

GCS 버킷 삭제

다음 안내에 따라 이 튜토리얼의 일부로 만든 GCS 버킷을 삭제합니다.

Cloud Data Fusion 인스턴스 삭제

다음 안내에 따라 Cloud Data Fusion 인스턴스를 삭제합니다.

프로젝트 삭제

비용이 청구되지 않도록 하는 가장 쉬운 방법은 튜토리얼에서 만든 프로젝트를 삭제하는 것입니다.

프로젝트를 삭제하는 방법은 다음과 같습니다.

  1. GCP 콘솔에서 프로젝트 페이지로 이동합니다. 프로젝트 페이지로 이동
  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 대화상자에서 프로젝트 ID를 입력하고 종료를 클릭하여 프로젝트를 삭제합니다.

10. 축하합니다

수고하셨습니다. Cloud Data Fusion을 사용하여 BigQuery에서 의료 데이터를 수집하는 Codelab을 성공적으로 완료했습니다.

Google Cloud Storage에서 BigQuery로 CSV 데이터를 가져왔습니다.

의료 데이터를 일괄적으로 로드, 변환, 마스킹하기 위한 데이터 통합 파이프라인을 시각적으로 빌드했습니다.

지금까지 Google Cloud Platform에서 BigQuery로 의료 데이터 분석 여정을 시작하는 데 필요한 주요 단계를 알아봤습니다.