Dataproc Serverless

1. 개요 - Google Dataproc

Dataproc은 Apache Spark, Apache Flink, Presto, 기타 많은 오픈소스 도구 및 프레임워크를 실행하기 위한 확장성이 뛰어난 완전 관리형 서비스입니다. Dataproc을 전 세계에서 데이터 레이크 현대화, ETL / ELT, 안전한 데이터 과학에 사용할 수 있습니다. Dataproc은 BigQuery, Cloud Storage, Vertex AI, Dataplex를 비롯한 여러 Google Cloud 서비스와도 완전히 통합됩니다.

Dataproc은 다음 세 가지 버전으로 제공됩니다.

  • Dataproc Serverless를 사용하면 인프라와 자동 확장을 구성하지 않고도 PySpark 작업을 실행할 수 있습니다. Dataproc Serverless는 PySpark 일괄 워크로드 및 세션 / 노트북을 지원합니다.
  • Google Compute Engine의 Dataproc을 사용하면 Flink, Presto와 같은 오픈소스 도구 외에도 YARN 기반 Spark 워크로드용 Hadoop YARN 클러스터를 관리할 수 있습니다. 자동 확장을 비롯해 원하는 만큼 수직 또는 수평 확장을 사용하여 클라우드 기반 클러스터를 맞춤설정할 수 있습니다.
  • Google Kubernetes Engine의 Dataproc을 사용하면 Spark, PySpark, SparkR 또는 Spark SQL 작업을 제출하기 위해 GKE 인프라에 Dataproc 가상 클러스터를 구성할 수 있습니다.

이 Codelab에서는 Dataproc 서버리스를 사용할 수 있는 여러 가지 방법을 알아봅니다.

Apache Spark는 원래 Hadoop 클러스터에서 실행되도록 빌드되었으며 YARN을 리소스 관리자로 사용했습니다. Hadoop 클러스터를 유지하려면 특정 전문 지식이 필요하며 클러스터의 다양한 노브가 올바르게 구성되어야 합니다. 이는 Spark에서 사용자가 설정해야 하는 별도의 노브 세트 외에 추가로 설정해야 하는 것입니다. 이로 인해 개발자가 Spark 코드 자체에서 작업하는 대신 인프라를 구성하는 데 더 많은 시간을 소비하는 시나리오가 많이 발생합니다.

Dataproc Serverless를 사용하면 Hadoop 클러스터나 Spark를 수동으로 구성할 필요가 없습니다. 서버리스 Dataproc은 Hadoop에서 실행되지 않으며 자체 동적 리소스 할당을 사용하여 자동 확장을 비롯한 리소스 요구사항을 결정합니다. 소규모의 Spark 속성 하위 집합은 서버리스 Dataproc을 통해 맞춤설정할 수 있지만 대부분의 경우 이러한 속성을 조정할 필요가 없습니다.

2. 설정

먼저 이 Codelab에서 사용되는 환경과 리소스를 구성합니다.

Google Cloud 프로젝트를 만듭니다. 기존 버킷을 사용해도 됩니다.

Cloud 콘솔 툴바에서 Cloud Shell을 클릭하여 엽니다.

ba0bb17945a73543.png

Cloud Shell은 이 Codelab에서 사용할 수 있는 즉시 사용 가능한 셸 환경을 제공합니다.

68c4ebd2a8539764.png

Cloud Shell은 기본적으로 프로젝트 이름을 설정합니다. echo $GOOGLE_CLOUD_PROJECT을 실행하여 다시 확인합니다. 출력에 프로젝트 ID가 표시되지 않으면 설정합니다.

export GOOGLE_CLOUD_PROJECT=<your-project-id>

리소스의 Compute Engine 리전(예: us-central1 또는 europe-west2)을 설정합니다.

export REGION=<your-region>

API 사용 설정

이 Codelab에서는 다음 API를 사용합니다.

  • BigQuery
  • Dataproc

필요한 API를 사용 설정합니다. 이 작업은 약 1분 정도 걸리며 완료되면 성공 메시지가 표시됩니다.

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

네트워크 액세스 구성

Dataproc Serverless를 사용하려면 Spark 드라이버와 실행자에는 비공개 IP만 있으므로 Spark 작업을 실행할 리전에 Google 비공개 액세스를 사용 설정해야 합니다. 다음을 실행하여 default 서브넷에서 사용 설정합니다.

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

True 또는 False를 출력하는 다음을 통해 Google 비공개 액세스가 사용 설정되었는지 확인할 수 있습니다.

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

스토리지 버킷 만들기

이 Codelab에서 만든 애셋을 저장하는 데 사용할 스토리지 버킷을 만듭니다.

버킷 이름을 선택합니다. 버킷 이름은 모든 사용자 간에 전역적으로 고유해야 합니다.

export BUCKET=<your-bucket-name>

Spark 작업을 실행할 리전에 버킷을 만듭니다.

gsutil mb -l ${REGION} gs://${BUCKET}

Cloud Storage 콘솔에서 버킷을 사용할 수 있습니다. gsutil ls을 실행하여 버킷을 확인할 수도 있습니다.

영구 기록 서버 만들기

Spark UI는 다양한 디버깅 도구와 Spark 작업에 대한 통계를 제공합니다. 완료된 Dataproc 서버리스 작업의 Spark UI를 보려면 영구 기록 서버로 사용할 단일 노드 Dataproc 클러스터를 만들어야 합니다.

영구 기록 서버 이름을 설정합니다.

PHS_CLUSTER_NAME=my-phs

다음을 실행합니다.

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

Spark UI 및 영구 기록 서버는 Codelab 후반부에서 자세히 살펴봅니다.

3. Dataproc Batches로 서버리스 Spark 작업 실행

이 샘플에서는 뉴욕시 (NYC) Citi Bike Trips 공개 데이터 세트의 데이터를 사용합니다. NYC Citi Bikes는 뉴욕시 내에서 운영되는 유료 자전거 공유 시스템입니다. 간단한 변환을 실행하고 가장 인기 있는 Citi Bike 스테이션 ID 10개를 출력합니다. 이 샘플에서는 오픈소스 spark-bigquery-connector를 사용하여 Spark와 BigQuery 간에 데이터를 원활하게 읽고 쓰는 것도 확인할 수 있습니다.

다음 GitHub 저장소를 클론하고 citibike.py 파일이 포함된 디렉터리로 cd를 수행합니다.

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

기본적으로 Cloud Shell에서 사용할 수 있는 Cloud SDK를 사용하여 작업을 서버리스 Spark에 제출합니다. Cloud SDK와 Dataproc Batches API를 사용하여 서버리스 Spark 작업을 제출하는 셸에서 다음 명령어를 실행합니다.

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

자세히 살펴보면 다음과 같습니다.

  • gcloud dataproc batches submitDataproc Batches API를 참조합니다.
  • pyspark는 PySpark 작업을 제출함을 나타냅니다.
  • --batch은 작업의 이름입니다. 제공하지 않으면 무작위로 생성된 UUID가 사용됩니다.
  • --region=${REGION}은 작업이 처리될 지리적 리전입니다.
  • --deps-bucket=${BUCKET}은 서버리스 환경에서 실행하기 전에 로컬 Python 파일이 업로드되는 위치입니다.
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar에는 Spark 런타임 환경의 spark-bigquery-connector용 jar가 포함되어 있습니다.
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}은 영구 기록 서버의 정규화된 이름입니다. Spark 이벤트 데이터 (콘솔 출력과 별개)가 저장되고 Spark UI에서 볼 수 있는 위치입니다.
  • 뒤에 오는 --는 이 뒤에 오는 모든 것이 프로그램의 런타임 인수임을 나타냅니다. 이 경우 작업에 필요한 버킷 이름을 제출합니다.

배치가 제출되면 다음과 같은 출력이 표시됩니다.

Batch [citibike-job] submitted.

몇 분 후 작업의 메타데이터와 함께 다음 출력이 표시됩니다.

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

다음 섹션에서는 이 작업의 로그를 찾는 방법을 알아봅니다.

추가 기능

서버리스 Spark를 사용하면 작업을 실행할 수 있는 추가 옵션이 제공됩니다.

  • 작업이 실행되는 맞춤 Docker 이미지를 만들 수 있습니다. Python 및 R 라이브러리를 비롯한 추가 종속 항목을 포함하는 데 유용합니다.
  • Dataproc Metastore 인스턴스를 작업에 연결하여 Hive 메타데이터에 액세스할 수 있습니다.
  • 추가적인 제어를 위해 서버리스 Dataproc은 소규모 Spark 속성의 구성을 지원합니다.

4. Dataproc 측정항목 및 관측 가능성

Dataproc Batches Console에는 모든 Dataproc Serverless 작업이 나열됩니다. 콘솔에 각 작업의 일괄 ID, 위치, 상태, 생성 시간, 경과 시간, 유형이 표시됩니다. 작업의 일괄 ID를 클릭하여 자세한 정보를 확인합니다.

이 페이지에는 시간 경과에 따라 작업에서 사용한 일괄 Spark 실행자 수를 보여주는 모니터링과 같은 정보가 표시됩니다 (자동 확장된 정도를 나타냄).

세부정보 탭에는 작업과 함께 제출된 인수 및 매개변수를 비롯한 작업에 관한 메타데이터가 표시됩니다.

이 페이지에서 모든 로그에 액세스할 수도 있습니다. Dataproc Serverless 작업이 실행되면 다음과 같은 세 가지 로그가 생성됩니다.

  • 서비스 수준
  • 콘솔 출력
  • Spark 이벤트 로깅

서비스 수준: Dataproc Serverless 서비스에서 생성한 로그가 포함됩니다. 여기에는 자동 확장을 위해 추가 CPU를 요청하는 Dataproc Serverless와 같은 항목이 포함됩니다. 로그 보기를 클릭하여 이러한 로그를 볼 수 있습니다. 그러면 Cloud Logging이 열립니다.

콘솔 출력출력에서 확인할 수 있습니다. 작업에서 생성된 출력으로, 작업을 시작할 때 Spark에서 출력하는 메타데이터 또는 작업에 통합된 print 문이 포함됩니다.

Spark 이벤트 로깅은 Spark UI에서 액세스할 수 있습니다. Spark 작업에 영구 기록 서버를 제공했으므로 이전에 실행된 Spark 작업의 정보가 포함된 Spark 기록 서버 보기를 클릭하여 Spark UI에 액세스할 수 있습니다. 공식 Spark 문서에서 Spark UI에 대해 자세히 알아볼 수 있습니다.

5. Dataproc 템플릿: BQ -> GCS

Dataproc 템플릿은 클라우드 내 데이터 처리 작업을 더욱 간소화하는 데 도움이 되는 오픈소스 도구입니다. 이는 Dataproc Serverless의 래퍼 역할을 하며 다음을 비롯한 여러 데이터 가져오기 및 내보내기 작업의 템플릿을 포함합니다.

  • BigQuerytoGCSGCStoBigQuery
  • GCStoBigTable
  • GCStoJDBCJDBCtoGCS
  • HivetoBigQuery
  • MongotoGCSGCStoMongo

전체 목록은 README에서 확인할 수 있습니다.

이 섹션에서는 Dataproc 템플릿을 사용하여 BigQuery에서 GCS로 데이터를 내보냅니다.

저장소 클론하기

저장소를 클론하고 python 폴더로 변경합니다.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

환경 구성

이제 환경 변수를 설정합니다. Dataproc 템플릿은 프로젝트 ID에 환경 변수 GCP_PROJECT를 사용하므로 이를 GOOGLE_CLOUD_PROJECT.와 동일하게 설정합니다.

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

이전 환경에서 리전을 설정해야 합니다. 그렇지 않으면 여기에서 설정합니다.

export REGION=<region>

Dataproc 템플릿은 spark-bigquery-connector를 사용하여 BigQuery 작업을 처리하며 URI가 환경 변수 JARS에 포함되어야 합니다. JARS 변수를 설정합니다.

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

템플릿 매개변수 구성

서비스에서 사용할 스테이징 버킷의 이름을 설정합니다.

export GCS_STAGING_LOCATION=gs://${BUCKET}

다음으로 작업별 변수를 설정합니다. 입력 테이블의 경우 BigQuery NYC Citibike 데이터 세트를 다시 참조합니다.

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

csv, parquet, avro 또는 json를 선택할 수 있습니다. 이 Codelab에서는 CSV를 선택합니다. 다음 섹션에서는 Dataproc 템플릿을 사용하여 파일 형식을 변환하는 방법을 알아봅니다.

BIGQUERY_GCS_OUTPUT_FORMAT=csv

출력 모드를 overwrite로 설정합니다. overwrite, append, ignore 또는 errorifexists. 중에서 선택할 수 있습니다.

BIGQUERY_GCS_OUTPUT_MODE=overwrite

GCS 출력 위치를 버킷의 경로로 설정합니다.

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

템플릿 실행

아래에 BIGQUERYTOGCS 템플릿을 지정하고 설정한 입력 매개변수를 제공하여 템플릿을 실행합니다.

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

출력은 다소 노이즈가 많지만 약 1분 후에 다음이 표시됩니다.

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

다음을 실행하여 파일이 생성되었는지 확인할 수 있습니다.

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

Spark는 데이터 양에 따라 기본적으로 여러 파일에 작성합니다. 이 경우 생성된 파일이 약 30개 표시됩니다. Spark 출력 파일 이름은 part- 다음에 5자리 숫자 (부품 번호 표시)와 해시 문자열의 형식으로 구성됩니다. 대규모 데이터의 경우 Spark에서는 일반적으로 여러 개의 파일에 씁니다. 파일 이름의 예시는 part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv입니다.

6. Dataproc 템플릿: CSV에서 Parquet으로

이제 Dataproc 템플릿을 사용하여 GCSTOGCS를 사용하여 GCS의 데이터를 한 파일 형식에서 다른 파일 형식으로 변환합니다. 이 템플릿은 SparkSQL을 사용하며 변환 중에 처리할 SparkSQL 쿼리를 제출하여 추가 처리를 수행하는 옵션도 제공합니다.

환경 변수 확인

이전 섹션에서 GCP_PROJECT, REGION, GCS_STAGING_BUCKET이 설정되었는지 확인합니다.

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

템플릿 파라미터 설정

이제 GCStoGCS의 구성 매개변수를 설정합니다. 입력 파일의 위치부터 시작합니다. 디렉터리의 모든 파일이 처리되므로 특정 파일이 아닌 디렉터리입니다. BIGQUERY_GCS_OUTPUT_LOCATION로 설정합니다.

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

입력 파일의 형식을 설정합니다.

GCS_TO_GCS_INPUT_FORMAT=csv

원하는 출력 형식을 설정합니다. parquet, json, avro 또는 csv를 선택할 수 있습니다.

GCS_TO_GCS_OUTPUT_FORMAT=parquet

출력 모드를 overwrite로 설정합니다. overwrite, append, ignore 또는 errorifexists. 중에서 선택할 수 있습니다.

GCS_TO_GCS_OUTPUT_MODE=overwrite

출력 위치를 설정합니다.

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

템플릿 실행

GCStoGCS 템플릿을 실행합니다.

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

출력은 다소 많지만 약 1분 후에 다음과 같은 성공 메시지가 표시됩니다.

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

다음을 실행하여 파일이 생성되었는지 확인할 수 있습니다.

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

이 템플릿을 사용하면 gcs.to.gcs.temp.view.namegcs.to.gcs.sql.query를 템플릿에 전달하여 SparkSQL 쿼리를 제공할 수도 있으므로 GCS에 쓰기 전에 데이터에 SparkSQL 쿼리를 실행할 수 있습니다.

7. 리소스 삭제

이 Codelab을 완료한 후 GCP 계정에 불필요한 요금이 청구되지 않도록 다음을 수행하세요.

  1. 자신이 만든 환경의 Cloud Storage 버킷을 삭제합니다.
gsutil rm -r gs://${BUCKET}
  1. 영구 기록 서버에 사용된 Dataproc 클러스터를 삭제합니다.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. Dataproc Serverless 작업을 삭제합니다. Batches Console로 이동하여 삭제하려는 각 작업 옆의 체크박스를 클릭하고 DELETE를 클릭합니다.

이 Codelab만을 위한 프로젝트를 만든 경우 선택적으로 프로젝트를 삭제할 수도 있습니다.

  1. GCP 콘솔에서 프로젝트 페이지로 이동합니다.
  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 상자에 프로젝트 ID를 입력한 다음 종료를 클릭하여 프로젝트를 삭제합니다.

8. 다음 단계

다음 리소스에서는 서버리스 Spark를 활용할 수 있는 추가 방법을 제공합니다.