Dataproc Serverless

1. 개요 - Google Dataproc

Dataproc은 Apache Spark, Apache Flink, Presto 및 기타 여러 오픈소스 도구와 프레임워크를 실행하기 위한 고도로 확장 가능한 완전 관리형 서비스입니다. Dataproc을 전 세계적 규모의 데이터 레이크 현대화, ETL / ELT, 안전한 데이터 과학에 사용하세요. 또한 Dataproc은 BigQuery, Cloud Storage, Vertex AI, Dataplex를 비롯한 여러 Google Cloud 서비스와 완전히 통합됩니다.

Dataproc은 다음 세 가지 버전으로 제공됩니다.

  • Dataproc Serverless를 사용하면 인프라 및 자동 확장을 구성하지 않고도 PySpark 작업을 실행할 수 있습니다. Dataproc Serverless는 PySpark 일괄 워크로드 및 세션 / 노트북을 지원합니다.
  • Google Compute Engine의 Dataproc을 사용하면 Flink 및 Presto와 같은 오픈소스 도구뿐 아니라 YARN 기반 Spark 워크로드를 위한 Hadoop YARN 클러스터를 관리할 수 있습니다. 자동 확장을 포함하여 수직 또는 수평 확장을 원하는 만큼 사용하여 클라우드 기반 클러스터를 맞춤설정할 수 있습니다.
  • Google Kubernetes Engine의 Dataproc을 사용하면 Spark, PySpark, SparkR 또는 Spark SQL 작업을 제출하기 위해 GKE 인프라에 Dataproc 가상 클러스터를 구성할 수 있습니다.

이 Codelab에서는 Dataproc Serverless를 사용할 수 있는 여러 가지 방법을 알아봅니다.

Apache Spark는 원래 Hadoop 클러스터에서 실행되도록 빌드되었으며 YARN을 리소스 관리자로 사용했습니다. Hadoop 클러스터를 유지관리하려면 특정 전문 기술 세트가 필요하며 클러스터의 여러 노브가 올바르게 구성되었는지 확인해야 합니다. 이는 Spark에서 사용자가 설정해야 하는 별도의 노브 세트에 추가됩니다. 이로 인해 개발자가 Spark 코드 자체로 작업하는 대신 인프라를 구성하는 데 더 많은 시간을 소비하는 경우가 많습니다.

Dataproc Serverless를 사용하면 Hadoop 클러스터나 Spark를 수동으로 구성할 필요가 없습니다. Dataproc Serverless는 Hadoop에서 실행되지 않으며 자체 동적 리소스 할당을 사용해 자동 확장을 비롯한 리소스 요구사항을 결정합니다. 일부 Spark 속성은 Dataproc Serverless로 계속 맞춤설정할 수 있지만 대부분의 경우 조정할 필요가 없습니다.

2. 설정

먼저 이 Codelab에서 사용하는 환경과 리소스를 구성합니다.

Google Cloud 프로젝트를 만듭니다. 기존 제품을 사용할 수도 있습니다.

Cloud 콘솔 툴바에서 Cloud Shell을 클릭하여 엽니다.

ba0bb17945a73543.png

Cloud Shell은 이 Codelab에 즉시 사용할 수 있는 Shell 환경을 제공합니다.

68c4ebd2a8539764.png

Cloud Shell에서 기본적으로 프로젝트 이름을 설정합니다. echo $GOOGLE_CLOUD_PROJECT를 실행하여 다시 확인합니다. 출력에 프로젝트 ID가 표시되지 않으면 프로젝트 ID를 설정합니다.

export GOOGLE_CLOUD_PROJECT=<your-project-id>

리소스의 Compute Engine 리전을 설정합니다(예: us-central1 또는 europe-west2).

export REGION=<your-region>

API 사용 설정

Codelab에서는 다음 API를 사용합니다.

  • BigQuery
  • Dataproc

필요한 API를 사용 설정합니다. 완료하는 데 1분 정도 걸리며 완료되면 성공 메시지가 표시됩니다.

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

네트워크 액세스 구성

Dataproc Serverless를 사용하려면 Spark 드라이버 및 실행자에 비공개 IP만 있으므로 Spark 작업을 실행할 리전에 Google 비공개 액세스를 사용 설정해야 합니다. 다음을 실행하여 default 서브넷에서 사용 설정합니다.

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

다음을 통해 Google 비공개 액세스가 사용 설정되어 있는지 확인할 수 있으며 True 또는 False가 출력됩니다.

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

스토리지 버킷 만들기

이 Codelab에서 만든 애셋을 저장하는 데 사용할 스토리지 버킷을 만듭니다.

버킷의 이름을 선택합니다. 버킷 이름은 모든 사용자에 대해 전역적으로 고유해야 합니다.

export BUCKET=<your-bucket-name>

Spark 작업을 실행할 리전에 버킷을 만듭니다.

gsutil mb -l ${REGION} gs://${BUCKET}

Cloud Storage 콘솔에서 버킷을 사용할 수 있음을 확인할 수 있습니다. gsutil ls를 실행하여 버킷을 확인할 수도 있습니다.

영구 기록 서버 만들기

Spark UI는 다양한 디버깅 도구와 Spark 작업에 관한 유용한 정보를 제공합니다. 완료된 Dataproc 서버리스 작업의 Spark UI를 보려면 영구 기록 서버로 활용할 단일 노드 Dataproc 클러스터를 만들어야 합니다.

영구 기록 서버 이름을 설정합니다.

PHS_CLUSTER_NAME=my-phs

다음을 실행합니다.

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

Spark UI 및 영구 기록 서버는 Codelab의 후반부에서 자세히 살펴봅니다.

3. Dataproc 배치로 서버리스 Spark 작업 실행

이 샘플에서는 뉴욕시 (NYC) Citi Bike Trips 공개 데이터 세트의 데이터 세트를 사용합니다. NYC Citi Bikes는 NYC 내의 유료 자전거 공유 시스템입니다. 몇 가지 간단한 변환을 수행하여 가장 인기 있는 Citi Bike 정차장 ID 상위 10개를 출력합니다. 또한 이 샘플은 오픈소스 spark-bigquery-connector를 사용하여 Spark와 BigQuery 간에 데이터를 원활하게 읽고 씁니다.

다음 GitHub 저장소와 cdcitibike.py 파일이 포함된 디렉터리에 클론합니다.

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

기본적으로 Cloud Shell에서 사용할 수 있는 Cloud SDK를 사용하여 서버리스 Spark에 작업을 제출합니다. Cloud SDK 및 Dataproc Batches API를 활용하는 셸에서 다음 명령어를 실행하여 서버리스 Spark 작업을 제출합니다.

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

이를 나누는 방법은 다음과 같습니다.

  • gcloud dataproc batches submitDataproc Batches API를 참조합니다.
  • pyspark은 PySpark 작업을 제출함을 나타냅니다.
  • --batch은 작업의 이름입니다. 제공하지 않으면 무작위로 생성된 UUID가 사용됩니다.
  • --region=${REGION}은 작업이 처리될 지리적 리전입니다.
  • --deps-bucket=${BUCKET}은 서버리스 환경에서 실행하기 전에 로컬 Python 파일을 업로드하는 위치입니다.
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar에는 Spark 런타임 환경의 spark-bigquery-connector에 대한 jar가 포함됩니다.
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}는 영구 기록 서버의 정규화된 이름입니다. Spark 이벤트 데이터 (콘솔 출력과 별개)가 저장되고 Spark UI에서 볼 수 있는 위치입니다.
  • 후행 --는 이보다 작은 모든 것이 프로그램의 런타임 인수임을 나타냅니다. 이 경우 작업에 필요한 대로 버킷의 이름을 제출합니다.

배치가 제출되면 다음과 같은 출력이 표시됩니다.

Batch [citibike-job] submitted.

몇 분 후 작업의 메타데이터와 함께 다음 출력이 표시됩니다.

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

다음 섹션에서는 이 작업의 로그를 찾는 방법을 알아봅니다.

추가 기능

Spark 서버리스를 사용하면 작업을 실행할 수 있는 추가 옵션이 있습니다.

  • 작업이 실행되는 커스텀 Docker 이미지를 만들 수 있습니다. 이는 Python 및 R 라이브러리를 비롯한 추가 종속 항목을 포함하는 좋은 방법입니다.
  • Dataproc Metastore 인스턴스를 작업에 연결하여 Hive 메타데이터에 액세스할 수 있습니다.
  • 추가 제어를 위해 Dataproc Serverless는 소규모의 Spark 속성 구성을 지원합니다.

4. Dataproc 측정항목 및 관측 가능성

Dataproc Batches Console에는 모든 Dataproc Serverless 작업이 나열됩니다. 콘솔에 각 작업의 일괄 ID, 위치, 상태, 생성 시간, 경과 시간유형이 표시됩니다. 작업의 일괄 ID를 클릭하여 자세한 정보를 확인합니다.

이 페이지에는 시간 경과에 따라 작업에서 사용한 Batch Spark Executor 수 (자동 확장된 정도를 나타냄)를 보여주는 Monitoring과 같은 정보가 표시됩니다.

세부정보 탭에는 작업과 함께 제출된 인수 및 매개변수를 비롯하여 작업에 대한 추가 메타데이터가 표시됩니다.

이 페이지에서 모든 로그에 액세스할 수도 있습니다. Dataproc 서버리스 작업이 실행되면 세 가지 로그 세트가 생성됩니다.

  • 서비스 수준
  • 콘솔 출력
  • Spark 이벤트 로깅

서비스 수준에는 Dataproc Serverless 서비스에서 생성한 로그가 포함됩니다. 여기에는 자동 확장을 위해 추가 CPU를 요청하는 Dataproc Serverless 등이 포함됩니다. 로그 보기를 클릭하면 Cloud Logging이 열립니다.

콘솔 출력출력에서 볼 수 있습니다. Spark가 작업을 시작할 때 인쇄하는 메타데이터 또는 작업에 통합된 print 문을 포함하여 작업에서 생성되는 출력입니다.

Spark UI에서 Spark 이벤트 로깅에 액세스할 수 있습니다. Spark 작업에 영구 기록 서버를 제공했으므로 이전에 실행한 Spark 작업에 대한 정보가 포함된 Spark 기록 서버 보기를 클릭하여 Spark UI에 액세스할 수 있습니다. 공식 Spark 문서에서 Spark UI에 대해 자세히 알아볼 수 있습니다.

5. Dataproc 템플릿: BQ -> GCS

Dataproc 템플릿은 클라우드 내 데이터 처리 작업을 더욱 간소화하는 데 도움이 되는 오픈소스 도구입니다. 이는 Dataproc Serverless의 래퍼 역할을 하며 다음과 같은 다양한 데이터 가져오기 및 내보내기 작업을 위한 템플릿을 포함합니다.

  • BigQuerytoGCSGCStoBigQuery
  • GCStoBigTable
  • GCStoJDBCJDBCtoGCS
  • HivetoBigQuery
  • MongotoGCSGCStoMongo

전체 목록은 리드미에서 확인할 수 있습니다.

이 섹션에서는 Dataproc 템플릿을 사용하여 BigQuery에서 GCS로 데이터를 내보냅니다.

저장소 클론하기

저장소를 클론하고 python 폴더로 변경합니다.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

환경 구성

이제 환경 변수를 설정합니다. Dataproc 템플릿은 프로젝트 ID에 환경 변수 GCP_PROJECT를 사용하므로 GOOGLE_CLOUD_PROJECT.와 동일하게 설정합니다.

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

리전은 이전 환경에서 설정해야 합니다. 그렇지 않은 경우 여기에서 설정하세요.

export REGION=<region>

Dataproc 템플릿은 BigQuery 작업을 처리하는 데 spark-bigquery-conector를 사용하며 URI가 환경 변수 JARS에 포함되어야 합니다. JARS 변수를 설정합니다.

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

템플릿 매개변수 구성

서비스에서 사용할 스테이징 버킷의 이름을 설정합니다.

export GCS_STAGING_LOCATION=gs://${BUCKET}

다음으로 몇 가지 작업별 변수를 설정합니다. 입력 테이블의 경우 BigQuery NYC Citibike 데이터 세트를 다시 참조합니다.

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

csv, parquet, avro, json 중에서 선택할 수 있습니다. 이 Codelab에서는 CSV를 선택합니다. 다음 섹션에서는 Dataproc 템플릿을 사용하여 파일 유형을 변환하는 방법을 설명합니다.

BIGQUERY_GCS_OUTPUT_FORMAT=csv

출력 모드를 overwrite로 설정합니다. overwrite, append, ignore 또는 errorifexists. 중에서 선택할 수 있습니다.

BIGQUERY_GCS_OUTPUT_MODE=overwrite

GCS 출력 위치를 버킷의 경로로 설정합니다.

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

템플릿 실행

아래에 템플릿을 지정하고 설정한 입력 매개변수를 제공하여 BIGQUERYTOGCS 템플릿을 실행합니다.

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

출력에 노이즈가 상당히 많지만 약 1분 후에 다음과 같이 표시됩니다.

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

다음을 실행하여 파일이 생성되었는지 확인할 수 있습니다.

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

Spark는 기본적으로 데이터 양에 따라 여러 파일에 씁니다. 이 경우 약 30개의 파일이 생성됩니다. Spark 출력 파일 이름은 part로 형식이 지정되며, 그 뒤에 5자리 숫자 (부품 번호 표시)와 해시 문자열이 옵니다. 대량의 데이터의 경우 Spark는 일반적으로 여러 파일에 씁니다. 파일 이름의 예는 part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv입니다.

6. Dataproc 템플릿: CSV에서 Parquet로

이제 Dataproc 템플릿을 사용하여 GCSTOGCS를 사용하여 GCS의 데이터를 한 파일 형식에서 다른 파일 형식으로 변환해 보겠습니다. 이 템플릿은 SparkSQL을 사용하며 추가 처리를 위해 변환 중에 처리할 SparkSQL 쿼리를 제출하는 옵션도 제공합니다.

환경 변수 확인

GCP_PROJECT, REGION, GCS_STAGING_BUCKET가 이전 섹션에서 설정되어 있는지 확인합니다.

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

템플릿 매개변수 설정

이제 GCStoGCS의 구성 매개변수를 설정합니다. 입력 파일의 위치로 시작합니다. 디렉터리에 있는 모든 파일이 처리되므로 이는 특정 파일이 아닌 디렉터리라는 점에 유의하세요. BIGQUERY_GCS_OUTPUT_LOCATION로 설정합니다.

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

입력 파일의 형식을 설정합니다.

GCS_TO_GCS_INPUT_FORMAT=csv

원하는 출력 형식을 설정합니다. parquet, json, avro 또는 csv를 선택할 수 있습니다.

GCS_TO_GCS_OUTPUT_FORMAT=parquet

출력 모드를 overwrite로 설정합니다. overwrite, append, ignore 또는 errorifexists. 중에서 선택할 수 있습니다.

GCS_TO_GCS_OUTPUT_MODE=overwrite

출력 위치를 설정합니다.

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

템플릿 실행

GCStoGCS 템플릿을 실행합니다.

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

출력에 노이즈가 상당히 많지만 약 1분 후에 아래와 같은 성공 메시지가 표시됩니다.

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

다음을 실행하여 파일이 생성되었는지 확인할 수 있습니다.

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

이 템플릿을 사용하면 gcs.to.gcs.temp.view.namegcs.to.gcs.sql.query를 템플릿에 전달하여 SparkSQL 쿼리를 제공할 수 있으며, 이를 통해 GCS에 쓰기 전에 데이터에서 SparkSQL 쿼리를 실행할 수 있습니다.

7. 리소스 삭제

이 Codelab을 완료한 후 GCP 계정에 불필요한 비용이 청구되지 않도록 하려면 다음 안내를 따르세요.

  1. 만든 환경의 Cloud Storage 버킷을 삭제합니다.
gsutil rm -r gs://${BUCKET}
  1. 영구 기록 서버에 사용된 Dataproc 클러스터를 삭제합니다.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. Dataproc 서버리스 작업을 삭제합니다. Batches Console(일괄 처리 콘솔)로 이동하여 삭제하려는 각 작업 옆에 있는 상자를 클릭한 후 삭제를 클릭합니다.

이 Codelab만을 위해 프로젝트를 만든 경우 선택적으로 프로젝트를 삭제할 수도 있습니다.

  1. GCP 콘솔에서 프로젝트 페이지로 이동합니다.
  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 상자에 프로젝트 ID를 입력한 후 '종료'를 클릭하여 프로젝트를 삭제합니다.

8. 다음 단계

다음 리소스는 서버리스 Spark를 활용할 수 있는 추가적인 방법을 제공합니다.