1. 개요
Dataflow란 무엇인가요?
Dataflow는 다양한 데이터 처리 패턴을 실행하는 관리형 서비스입니다. 이 사이트의 문서에서는 서비스 기능 사용에 대한 지침을 포함하여 Dataflow를 사용하여 일괄 및 스트리밍 데이터 처리 파이프라인을 배포하는 방법을 설명합니다.
Apache Beam SDK는 일괄 및 스트리밍 파이프라인을 모두 개발할 수 있는 오픈소스 프로그래밍 모델입니다. Apache Beam 프로그램을 사용하여 파이프라인을 만든 다음 Dataflow 서비스에서 파이프라인을 실행합니다. Apache Beam 문서는 Apache Beam 프로그래밍 모델, SDK, 기타 실행기에 대한 심층적인 개념 정보와 참조 자료를 제공합니다.
신속한 스트리밍 데이터 분석
Dataflow를 사용하면 데이터 지연 시간을 줄이면서 스트리밍 데이터 파이프라인을 빠르고 간편하게 개발할 수 있습니다.
운영 및 관리 간소화
Dataflow는 서버리스 방식으로 데이터 엔지니어링 워크로드에서 운영 오버헤드를 제거하므로 팀이 서버 클러스터를 관리하는 대신 프로그래밍에 집중할 수 있습니다.
총소유비용 절감
Dataflow는 리소스 자동 확장과 비용 최적화된 일괄 처리 기능을 결합해 무제한에 가까운 용량을 제공하므로 시즌성 워크로드와 급증하는 워크로드를 과다 지출 없이 관리할 수 있습니다.
주요 특징
자동화된 리소스 관리 및 동적 작업 재분배
Dataflow는 처리 리소스의 프로비저닝 및 관리를 자동화하여 지연 시간을 최소화하고 사용률을 극대화하므로 인스턴스를 직접 가동하거나 예약할 필요가 없습니다. 작업 파티션 나누기도 자동화 및 최적화되어 지연 작업이 동적으로 재조정됩니다. '핫키'를 뒤쫓을 필요가 없음 전처리할 수 있습니다
수평 자동 확장
처리량 최적화를 위해 작업자 리소스가 수평식으로 자동 확장되어 전반적인 가격 대비 성능이 개선됩니다.
일괄 처리를 위한 유연한 리소스 예약 가격 책정
심야 근무와 같이 작업 예약 시간을 유연하게 처리해야 하는 경우 유연한 리소스 예약 (FlexRS)으로 일괄 처리 비용을 낮춰줍니다. 이러한 유연한 작업은 6시간 내에 실행을 위해 검색되도록 보장되어 큐에 배치됩니다.
이 가이드는 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven에서 작성되었습니다.
학습할 내용
- Java SDK를 사용하여 Apache Beam으로 Maven 프로젝트를 만드는 방법
- Google Cloud Platform 콘솔을 사용하여 예시 파이프라인 실행
- 연결된 Cloud Storage 버킷 및 콘텐츠 삭제 방법
필요한 항목
본 가이드를 어떻게 사용하실 계획인가요?
귀하의 Google Cloud Platform 서비스 사용 경험을 평가해 주세요.
<ph type="x-smartling-placeholder">2. 설정 및 요구사항
자습형 환경 설정
- Cloud Console에 로그인하고 새 프로젝트를 만들거나 기존 프로젝트를 다시 사용합니다. (Gmail 또는 G Suite 계정이 없으면 만들어야 합니다.)
모든 Google Cloud 프로젝트에서 고유한 이름인 프로젝트 ID를 기억하세요(위의 이름은 이미 사용되었으므로 사용할 수 없습니다). 이 ID는 나중에 이 Codelab에서 PROJECT_ID
라고 부릅니다.
- 그런 후 Google Cloud 리소스를 사용할 수 있도록 Cloud Console에서 결제를 사용 설정해야 합니다.
이 Codelab 실행에는 많은 비용이 들지 않습니다. 이 가이드를 마친 후 비용이 결제되지 않도록 리소스 종료 방법을 알려주는 '삭제' 섹션의 안내를 따르세요. Google Cloud 새 사용자에게는 미화 $300 상당의 무료 체험판 프로그램에 참여할 수 있는 자격이 부여됩니다.
API 사용 설정
화면 왼쪽 상단의 메뉴 아이콘을 클릭합니다.
API 및 서비스 > Dashboard를 선택합니다.
+ API 및 서비스 사용 설정을 선택합니다.
'Compute Engine'을 검색합니다. 을 입력합니다. 'Compute Engine API'를 클릭합니다. 를 입력합니다.
Google Compute Engine 페이지에서 사용 설정을 클릭합니다.
사용 설정되면 화살표를 클릭하여 돌아갑니다.
이제 다음 API를 검색하고 사용 설정합니다.
- Cloud Dataflow
- Stackdriver
- Cloud Storage
- Cloud Storage JSON
- BigQuery
- Cloud Pub/Sub
- Cloud Datastore
- Cloud Resource Manager API
3. 새 Cloud Storage 버킷 만들기
Google Cloud Platform Console에서 화면 왼쪽 상단의 메뉴 아이콘을 클릭합니다.
아래로 스크롤하여 Cloud Storage > 브라우저 Storage 하위 섹션:
이제 Cloud Storage 브라우저가 표시되고, 현재 Cloud Storage 버킷이 없는 프로젝트를 사용 중인 경우 새 버킷을 만들라는 초대가 표시됩니다. 버킷 만들기 버튼을 눌러 버킷을 만듭니다.
버킷 이름을 입력합니다. 대화상자에서 볼 수 있듯이 버킷 이름은 Cloud Storage 전체에서 고유해야 합니다. 따라서 'test'와 같이 명확한 이름을 선택하면 다른 사용자가 이미 해당 이름으로 버킷을 만들었을 수 있으므로 오류가 발생할 수 있습니다.
버킷 이름에 허용되는 문자에 관한 몇 가지 규칙도 있습니다. 버킷 이름의 시작과 끝에 문자 또는 숫자로 되어 있고, 중간에 대시만 사용하는 경우에는 괜찮습니다. 특수문자를 사용하거나 문자나 숫자가 아닌 다른 이름으로 버킷 이름의 시작 또는 끝을 시도하면 대화상자에서 규칙을 알려줍니다.
버킷의 고유한 이름을 입력하고 만들기를 누릅니다. 이미 사용 중인 항목을 선택하면 위에 표시된 오류 메시지가 표시됩니다. 버킷을 성공적으로 만들면 브라우저에서 비어있는 새 버킷으로 이동합니다.
물론 표시되는 버킷 이름은 모든 프로젝트에서 고유해야 하므로 다릅니다.
4. Cloud Shell 시작
Cloud Shell 활성화
- Cloud Console에서 Cloud Shell 활성화를 클릭합니다.
이전에 Cloud Shell을 시작하지 않았으면 설명이 포함된 중간 화면(스크롤해야 볼 수 있는 부분)이 제공됩니다. 이 경우 계속을 클릭합니다(이후 다시 표시되지 않음). 이 일회성 화면은 다음과 같습니다.
Cloud Shell을 프로비저닝하고 연결하는 데 몇 분 정도만 걸립니다.
가상 머신은 필요한 모든 개발 도구와 함께 로드됩니다. 영구적인 5GB 홈 디렉터리를 제공하고 Google Cloud에서 실행되므로 네트워크 성능과 인증이 크게 개선됩니다. 이 Codelab에서 대부분의 작업은 브라우저나 Chromebook만 사용하여 수행할 수 있습니다.
Cloud Shell에 연결되면 인증이 완료되었고 프로젝트가 해당 프로젝트 ID로 이미 설정된 것을 볼 수 있습니다.
- Cloud Shell에서 다음 명령어를 실행하여 인증되었는지 확인합니다.
gcloud auth list
명령어 결과
Credentialed Accounts ACTIVE ACCOUNT * <my_account>@<my_domain.com> To set the active account, run: $ gcloud config set account `ACCOUNT`
gcloud config list project
명령어 결과
[core] project = <PROJECT_ID>
또는 다음 명령어로 설정할 수 있습니다.
gcloud config set project <PROJECT_ID>
명령어 결과
Updated property [core/project].
5. Maven 프로젝트 만들기
Cloud Shell이 시작된 후 Apache Beam용 Java SDK를 사용하여 Maven 프로젝트를 만들어 시작해 보겠습니다.
Apache Beam은 데이터 파이프라인용 오픈소스 프로그래밍 모델입니다. Apache Beam 프로그램으로 이러한 파이프라인을 정의하고 Dataflow와 같은 실행기를 선택하여 파이프라인을 실행할 수 있습니다.
다음과 같이 셸에서 mvn archetype:generate
명령어를 실행합니다.
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
명령어를 실행하면 현재 디렉터리 아래에 first-dataflow
라는 새 디렉터리가 표시됩니다. first-dataflow
에는 자바용 Cloud Dataflow SDK와 예시 파이프라인이 포함된 Maven 프로젝트가 포함되어 있습니다.
6. Cloud Dataflow에서 텍스트 처리 파이프라인 실행
먼저 프로젝트 ID와 Cloud Storage 버킷 이름을 환경 변수로 저장해 보겠습니다. Cloud Shell에서 이 작업을 수행할 수 있습니다. <your_project_id>
를 자신의 프로젝트 ID로 바꿔야 합니다.
export PROJECT_ID=<your_project_id>
이제 Cloud Storage 버킷에 대해서도 동일한 작업을 수행합니다. <your_bucket_name>
을 이전 단계에서 버킷을 만들 때 사용한 고유한 이름으로 바꿔야 합니다.
export BUCKET_NAME=<your_bucket_name>
first-dataflow/
디렉터리로 변경합니다.
cd first-dataflow
텍스트를 읽고, 텍스트 줄을 개별 단어로 토큰화하고, 각 단어의 출현 빈도를 세는 WordCount라는 파이프라인을 실행하겠습니다. 먼저 파이프라인을 실행하고 실행되는 동안 각 단계에서 어떤 일이 일어나는지 살펴보겠습니다.
셸 또는 터미널 창에서 mvn compile exec:java
명령어를 실행하여 파이프라인을 시작합니다. --project, --stagingLocation,
및 --output
인수의 경우 아래 명령어는 이 단계의 앞부분에서 설정한 환경 변수를 참조합니다.
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
작업이 실행되는 동안 작업 목록에서 작업을 찾아보겠습니다.
Google Cloud Platform 콘솔에서 Cloud Dataflow 웹 UI를 엽니다. 워드카운트 작업의 상태가 실행 중으로 표시됩니다.
이제 파이프라인 매개변수를 살펴보겠습니다. 작업 이름을 클릭하여 시작합니다.
작업을 선택하면 실행 그래프를 볼 수 있습니다. 파이프라인의 실행 그래프는 파이프라인의 각 변환을 변환 이름과 일부 상태 정보가 포함된 상자로 나타냅니다. 각 단계의 오른쪽 상단에 있는 캐럿을 클릭하여 세부정보를 확인할 수 있습니다.
파이프라인이 각 단계에서 데이터를 변환하는 방법을 살펴보겠습니다.
- 읽기: 이 단계에서 파이프라인은 입력 소스에서 읽습니다. 이 경우에는 셰익스피어 희곡 리어왕의 전체 텍스트가 포함된 Cloud Storage의 텍스트 파일입니다. 파이프라인은 파일을 한 줄씩 읽고 각각
PCollection
를 출력합니다. 여기서 텍스트 파일의 각 줄은 컬렉션의 요소입니다. - CountWords:
CountWords
단계는 두 부분으로 구성됩니다. 먼저ExtractWords
라는 병렬 수행 함수 (ParDo)를 사용하여 각 줄을 개별 단어로 토큰화합니다. ExtractWords의 출력은 각 요소가 단어인 새로운 PCollection입니다. 다음 단계인Count
에서는 Java SDK에서 제공하는 변환을 활용하여 키가 고유한 단어이고 값은 발생하는 횟수를 나타내는 값 쌍을 반환합니다. 다음은CountWords
를 구현하는 메서드입니다. GitHub에서 전체 WordCount.java 파일을 확인할 수 있습니다.
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements: 아래에 복사된
FormatAsTextFn
를 호출하여 각 키-값 쌍의 형식을 인쇄 가능한 문자열로 지정합니다.
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts: 이 단계에서는 인쇄 가능한 문자열을 샤딩된 여러 텍스트 파일에 씁니다.
몇 분 후에 파이프라인의 결과 출력을 살펴보겠습니다.
이제 그래프 오른쪽에 있는 작업 정보 페이지를 살펴보겠습니다. 여기에는 mvn compile exec:java
명령어에 포함된 파이프라인 매개변수가 있습니다.
파이프라인의 맞춤 카운터도 확인할 수 있습니다. 이 경우에는 실행 중에 지금까지 발생한 빈 줄 수를 보여줍니다. 파이프라인에 새 카운터를 추가하여 애플리케이션별 측정항목을 추적할 수 있습니다.
콘솔 하단의 로그 아이콘을 클릭하면 특정 오류 메시지를 볼 수 있습니다.
이 패널에는 기본적으로 작업 상태를 전체적으로 보고하는 작업 로그 메시지가 표시됩니다. 최소 심각도 선택기를 사용하여 작업 진행 상황 및 상태 메시지를 필터링할 수 있습니다.
그래프에서 파이프라인 단계를 선택하면 사용자 코드 및 파이프라인 단계에서 실행되는 생성된 코드로 생성된 로그로 보기가 변경됩니다.
작업 로그로 돌아가려면 그래프 외부를 클릭하거나 오른쪽 측면 패널에 있는 닫기 버튼을 사용하여 단계를 선택 해제하세요.
로그 탭에서 작업자 로그 버튼을 사용하여 파이프라인을 실행하는 Compute Engine 인스턴스의 작업자 로그를 볼 수 있습니다. 작업자 로그는 코드에서 생성된 로그 줄과 이를 실행하는 Dataflow에서 생성된 코드로 구성됩니다.
파이프라인에서 장애를 디버그하려는 경우 작업자 로그에 추가 로깅이 있어 문제를 해결하는 데 도움이 되는 경우가 많습니다. 이러한 로그는 모든 작업자에 걸쳐 집계되며 필터링 및 검색될 수 있습니다.
Dataflow 모니터링 인터페이스는 최신 로그 메시지만 표시합니다. 로그 창 오른쪽에 있는 Google Cloud Observability 링크를 클릭하여 모든 로그를 볼 수 있습니다.
다음은 모니터링→로그 페이지에서 볼 수 있는 여러 가지 로그 유형을 요약한 것입니다.
- job-message 로그에는 Dataflow의 다양한 구성요소가 생성하는 작업 수준 메시지가 포함됩니다. 자동 확장 구성, 작업자 시작 또는 종료 시점, 작업 단계의 진행 상황, 작업 오류를 예로 들 수 있습니다. 사용자 코드 비정상 종료로 인해 발생했으며 worker 로그에 있는 작업자 수준 오류도 job-message 로그에 전파됩니다.
- worker 로그는 Dataflow 작업자가 생성합니다. 작업자는 대부분의 파이프라인 작업을 수행합니다 (예: 데이터에 ParDo 적용). Worker 로그에는 사용자 코드와 Dataflow에서 로깅한 메시지가 포함됩니다.
- worker-startup 로그는 대부분의 Dataflow 작업에 포함되며 시작 프로세스와 관련된 메시지를 캡처할 수 있습니다. 시작 프로세스에는 Cloud Storage에서 작업의 jar를 다운로드한 다음 작업자 시작이 포함됩니다. 작업자를 시작하는 데 문제가 있으면 이러한 로그를 확인하는 것이 좋습니다.
- shuffler 로그에는 병렬 파이프라인 작업 결과를 통합하는 작업자의 메시지가 포함됩니다.
- docker 및 kubelet 로그에는 Dataflow 작업자에서 사용되는 이러한 공개 기술과 관련된 메시지가 포함됩니다.
다음 단계에서는 작업이 성공했는지 확인합니다.
7. 작업이 성공했는지 확인
Google Cloud Platform 콘솔에서 Cloud Dataflow 웹 UI를 엽니다.
처음에는 워드카운트 작업의 상태가 실행 중으로 표시된 후 성공으로 표시됩니다.
작업을 실행하는 데 약 3~4분 정도 걸립니다.
파이프라인을 실행하고 출력 버킷을 지정한 것을 기억하시나요? 결과를 살펴보겠습니다. 리어왕에서 각 단어가 몇 번 나오는지 확인하고 싶지 않으니까요. Google Cloud Platform Console에서 Cloud Storage 브라우저로 다시 이동합니다. 버킷에서 작업을 통해 만든 출력 파일 및 스테이징 파일을 확인할 수 있습니다.
8. 리소스 종료
Google Cloud Platform 콘솔에서 리소스를 종료할 수 있습니다.
Google Cloud Platform Console에서 Cloud Storage 브라우저를 엽니다.
앞서 만든 버킷 옆에 있는 체크박스를 선택하고 삭제를 클릭하여 버킷과 콘텐츠를 영구 삭제합니다.
9. 축하합니다.
Cloud Dataflow SDK로 Maven 프로젝트를 만들고, Google Cloud Platform 콘솔을 사용하여 예시 파이프라인을 실행하고, 연결된 Cloud Storage 버킷과 콘텐츠를 삭제하는 방법을 알아봤습니다.
자세히 알아보기
- Dataflow 문서: https://cloud.google.com/dataflow/docs/
라이선스
이 작업물은 Creative Commons Attribution 3.0 일반 라이선스 및 Apache 2.0 라이선스에 따라 사용이 허가되었습니다.