Cloud Dataproc의 Apache Spark 및 Jupyter 노트북

1. 개요

이 실습에서는 Cloud Dataproc에서 Apache SparkJupyter 노트북을 설정하고 사용하는 방법을 알아봅니다.

Jupyter 노트북은 코드를 대화형으로 실행하고 결과를 즉시 확인할 수 있으므로 실험적 데이터 분석 및 머신러닝 모델 빌드에 널리 사용됩니다.

하지만 Apache Spark 및 Jupyter Notebook을 설정하고 사용하는 것은 복잡할 수 있습니다.

b9ed855863c57d6.png

Cloud Dataproc을 사용하면 Apache Spark, Jupyter 구성요소, 구성요소 게이트웨이를 사용하여 약 90초 만에 Dataproc 클러스터를 만들어 빠르고 쉽게 시작할 수 있습니다.

학습할 내용

이 Codelab에서는 다음 내용을 학습합니다.

  • 클러스터용 Google Cloud Storage 버킷 만들기
  • Jupyter 및 구성요소 게이트웨이를 사용하여 Dataproc 클러스터를 만듭니다.
  • Dataproc에서 JupyterLab 웹 UI 액세스
  • Spark BigQuery Storage 커넥터를 사용하는 노트북 만들기
  • Spark 작업을 실행하고 결과를 플롯합니다.

Google Cloud에서 이 실습을 진행하는 데 드는 총 비용은 약 $1입니다. Cloud Dataproc 가격 책정에 대한 자세한 내용은 여기에서 확인할 수 있습니다.

2. 프로젝트 만들기

console.cloud.google.com에서 Google Cloud Platform 콘솔에 로그인하고 새 프로젝트를 만듭니다.

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

그런 후 Google Cloud 리소스를 사용할 수 있도록 Cloud 콘솔에서 결제를 사용 설정해야 합니다.

이 Codelab을 실행하는 과정에는 많은 비용이 들지 않지만 더 많은 리소스를 사용하려고 하거나 실행 중일 경우 비용이 더 들 수 있습니다. 이 Codelab의 마지막 섹션에서는 프로젝트를 정리하는 방법을 안내합니다.

Google Cloud Platform 신규 사용자는 $300 상당의 무료 체험판을 사용할 수 있습니다.

3. 환경 설정

먼저 클라우드 콘솔의 오른쪽 상단에 있는 버튼을 클릭하여 Cloud Shell을 엽니다.

a10c47ee6ca41c54.png

Cloud Shell이 로드되면 다음 명령어를 실행하여 이전 단계에서 프로젝트 ID를 설정합니다.

gcloud config set project <project_id>

프로젝트 ID는 클라우드 콘솔의 왼쪽 상단에 있는 프로젝트를 클릭하여 확인할 수도 있습니다.

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

다음으로 Dataproc, Compute Engine, BigQuery Storage API를 사용 설정합니다.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

또는 Cloud 콘솔에서 이 작업을 수행할 수 있습니다. 화면 왼쪽 상단의 메뉴 아이콘을 클릭합니다.

2bfc27ef9ba2ec7d.png

드롭다운에서 API 관리자를 선택합니다.

408af5f32c4b7c25.png

API 및 서비스 사용 설정을 클릭합니다.

a9c0e84296a7ba5b.png

다음 API를 검색하여 사용 설정합니다.

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. GCS 버킷 만들기

데이터와 가장 가까운 리전에 Google Cloud Storage 버킷을 만들고 고유한 이름을 지정합니다.

이는 Dataproc 클러스터에 사용됩니다.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

다음 출력이 표시될 것입니다.

Creating gs://<your-bucket-name>/...

5. Jupyter 및 구성요소 게이트웨이를 사용하여 Dataproc 클러스터 만들기

클러스터 만들기

클러스터의 환경 변수 설정

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

그런 다음 다음 gcloud 명령어를 실행하여 클러스터에서 Jupyter를 사용하는 데 필요한 모든 구성요소가 포함된 클러스터를 만듭니다.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

클러스터가 생성되는 동안 다음 출력이 표시됩니다.

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

클러스터를 만드는 데 약 90초가 걸리며, 클러스터가 준비되면 Dataproc Cloud 콘솔 UI에서 클러스터에 액세스할 수 있습니다.

기다리는 동안 아래를 계속 읽고 gcloud 명령어에 사용되는 플래그에 대해 자세히 알아보세요.

클러스터가 생성되면 다음 출력이 표시됩니다.

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

gcloud dataproc create 명령어에 사용되는 플래그

gcloud dataproc create 명령어에 사용되는 플래그의 분류는 다음과 같습니다.

--region=${REGION}

클러스터가 생성될 리전과 영역을 지정합니다. 사용 가능한 지역 목록은 여기에서 확인할 수 있습니다.

--image-version=1.4

클러스터에서 사용할 이미지 버전입니다. 사용 가능한 버전 목록은 여기에서 확인할 수 있습니다.

--bucket=${BUCKET_NAME}

클러스터에 사용할 이전에 만든 Google Cloud Storage 버킷을 지정합니다. GCS 버킷을 제공하지 않으면 버킷이 생성됩니다.

GCS 버킷은 삭제되지 않으므로 클러스터를 삭제해도 노트북이 저장되는 위치이기도 합니다.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

Dataproc 클러스터에 사용할 머신 유형입니다. 사용 가능한 머신 유형 목록은 여기에서 확인할 수 있습니다.

--num-workers 플래그를 설정하지 않으면 기본적으로 마스터 노드 1개와 워커 노드 2개가 생성됩니다.

--optional-components=ANACONDA,JUPYTER

선택적 구성요소에 이러한 값을 설정하면 클러스터에 Jupyter 및 Anaconda (Jupyter 노트북에 필요)에 필요한 모든 라이브러리가 설치됩니다.

--enable-component-gateway

구성요소 게이트웨이를 사용 설정하면 Apache Knox 및 역방향 프록시를 사용하여 App Engine 링크가 생성되므로 Jupyter 및 JupyterLab 웹 인터페이스에 쉽고 안전하며 인증된 액세스가 제공되어 더 이상 SSH 터널을 만들 필요가 없습니다.

또한 작업의 성능과 클러스터 사용 패턴을 확인하는 데 유용한 Yarn 리소스 관리자 및 Spark 기록 서버를 비롯한 클러스터의 다른 도구 링크도 생성됩니다.

6. Apache Spark 노트북 만들기

JupyterLab 웹 인터페이스 액세스

클러스터가 준비되면 Dataproc 클러스터 - Cloud 콘솔로 이동하고, 생성한 클러스터를 클릭하고, 웹 인터페이스 탭으로 이동하여 JupyterLab 웹 인터페이스의 구성요소 게이트웨이 링크를 찾을 수 있습니다.

afc40202d555de47.png

클래식 노트북 인터페이스인 Jupyter 또는 Project Jupyter의 차세대 UI로 설명되는 JupyterLab에 액세스할 수 있습니다.

JupyterLab에는 다양한 새로운 UI 기능이 있으므로 노트북을 처음 사용하거나 최신 개선사항을 찾고 있다면 JupyterLab을 사용하는 것이 좋습니다. 공식 문서에 따르면 JupyterLab이 결국 기존 Jupyter 인터페이스를 대체할 예정이기 때문입니다.

Python 3 커널로 노트북 만들기

a463623f2ebf0518.png

런처 탭에서 Python 3 노트북 아이콘을 클릭하여 Python 3 커널 (PySpark 커널 아님)이 있는 노트북을 만듭니다. 이렇게 하면 노트북에서 SparkSession을 구성하고 BigQuery Storage API를 사용하는 데 필요한 spark-bigquery-connector를 포함할 수 있습니다.

노트북 이름 변경

196a3276ed07e1f3.png

왼쪽 사이드바 또는 상단 탐색에서 노트북 이름을 마우스 오른쪽 버튼으로 클릭하고 노트북 이름을 'BigQuery Storage & Spark DataFrames.ipynb'로 바꿉니다.

노트북에서 Spark 코드 실행

fbac38062e5bb9cf.png

이 노트북에서는 BigQuery Storage API를 사용하여 BigQuery와 Spark 간에 데이터를 읽고 쓰는 도구인 spark-bigquery-connector를 사용합니다.

BigQuery Storage API는 RPC 기반 프로토콜을 사용하여 BigQuery의 데이터 액세스를 크게 개선합니다. Apache AvroApache Arrow와 같은 다양한 직렬화 형식은 물론 병렬 데이터 읽기 및 쓰기를 지원합니다. 개괄적으로 말하자면 이 기능은 특히 대규모 데이터 세트에서 성능을 크게 향상시킵니다.

첫 번째 셀에서 클러스터의 Scala 버전을 확인하여 올바른 버전의 spark-bigquery-connector jar를 포함할 수 있습니다.

입력 [1]:

!scala -version

출력 [1]:f580e442576b8b1f.png Spark 세션을 만들고 spark-bigquery-connector 패키지를 포함합니다.

Scala 버전이 2.11인 경우 다음 패키지를 사용하세요.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

Scala 버전이 2.12인 경우 다음 패키지를 사용하세요.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

입력 [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

repl.eagerEval 사용 설정

이렇게 하면 각 단계에서 DataFrame의 결과가 출력되며 df.show()를 새로 표시할 필요가 없고 출력의 형식이 개선됩니다.

입력 [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

BigQuery 테이블을 Spark DataFrame으로 읽기

공개 BigQuery 데이터 세트에서 데이터를 읽어 Spark DataFrame을 만듭니다. 이렇게 하면 spark-bigquery-connector 및 BigQuery Storage API를 사용하여 데이터를 Spark 클러스터에 로드할 수 있습니다.

Spark DataFrame을 만들고 Wikipedia 페이지 조회수를 위한 BigQuery 공개 데이터 세트에서 데이터를 로드합니다. spark-bigquery-connector를 사용하여 데이터를 Spark에 로드하면 데이터 처리가 발생하는 Spark에서 데이터에 대한 쿼리가 실행되지 않습니다. 이 코드를 실행하면 실제로 표가 로드되지 않습니다. Spark의 지연 평가이기 때문이며 실행은 다음 단계에서 발생합니다.

입력 [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

출력 [4]:

c107a33f6fc30ca.png

필요한 열을 선택하고 filter()의 별칭인 where()를 사용하여 필터를 적용합니다.

이 코드를 실행하면 Spark 작업이 트리거되고 이 시점에서 BigQuery Storage에서 데이터가 읽힙니다.

입력 [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

출력 [5]:

ad363cbe510d625a.png

제목별로 그룹화하고 페이지 조회수별로 정렬하여 인기 페이지 확인

입력 [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

출력 [6]:f718abd05afc0f4.png

7. 노트북에서 Python 플로팅 라이브러리 사용

Python에서 사용할 수 있는 다양한 플로팅 라이브러리를 사용하여 Spark 작업의 출력을 플롯할 수 있습니다.

Spark DataFrame을 Pandas DataFrame으로 변환

Spark DataFrame을 Pandas DataFrame으로 변환하고 datehour를 색인으로 설정합니다. Python에서 직접 데이터를 처리하고 사용 가능한 다양한 Python 플로팅 라이브러리를 사용하여 데이터를 플롯하려는 경우에 유용합니다.

입력 [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

출력 [7]:

3df2aaa2351f028d.png

Pandas DataFrame 플로팅

노트북에 플롯을 표시하는 데 필요한 matplotlib 라이브러리를 가져옵니다.

입력 [8]:

import matplotlib.pyplot as plt

Pandas 플롯 함수를 사용하여 Pandas DataFrame에서 선 차트를 만듭니다.

입력 [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

출력 [9]:bade7042c3033594.png

노트북이 GCS에 저장되었는지 확인

이제 Dataproc 클러스터에서 첫 번째 Jupyter 노트북이 실행됩니다. 노트북에 이름을 지정하면 클러스터를 만들 때 사용된 GCS 버킷에 자동으로 저장됩니다.

Cloud Shell에서 다음 gsutil 명령어를 사용하여 이를 확인할 수 있습니다.

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

다음 출력이 표시될 것입니다.

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. 최적화 팁 - 메모리에 데이터 캐시

매번 BigQuery 스토리지에서 데이터를 읽는 대신 메모리에 데이터를 저장해야 하는 시나리오가 있을 수 있습니다.

이 작업은 BigQuery에서 데이터를 읽고 필터를 BigQuery로 푸시합니다. 그러면 집계가 Apache Spark에서 계산됩니다.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

위의 작업을 수정하여 테이블의 캐시를 포함할 수 있으며 이제 Apache Spark에서 메모리에 wiki 열의 필터가 적용됩니다.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

그런 다음 BigQuery 스토리지에서 데이터를 다시 읽는 대신 캐시된 데이터를 사용하여 다른 위키 언어를 필터링할 수 있으므로 훨씬 빠르게 실행됩니다.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

다음 명령어를 실행하여 캐시를 삭제할 수 있습니다.

df_wiki_all.unpersist()

9. 다양한 사용 사례의 예시 노트북

Cloud Dataproc GitHub 저장소에는 다양한 Google Cloud Platform 제품 및 오픈소스 도구를 사용하여 데이터를 로드하고, 데이터를 저장하고, 데이터를 플롯하는 일반적인 Apache Spark 패턴이 포함된 Jupyter 노트북이 있습니다.

10. 삭제

이 빠른 시작을 완료한 후 GCP 계정에 불필요한 비용이 청구되지 않도록 다음 단계를 따르세요.

  1. 환경 및 사용자가 만든 환경의 Cloud Storage 버킷을 삭제합니다.
  2. Dataproc 환경을 삭제합니다.

이 Codelab만을 위한 프로젝트를 만든 경우 선택적으로 프로젝트를 삭제할 수도 있습니다.

  1. GCP 콘솔에서 프로젝트 페이지로 이동합니다.
  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 상자에 프로젝트 ID를 입력한 다음 종료를 클릭하여 프로젝트를 삭제합니다.

라이선스

이 작업물은 크리에이티브 커먼즈 저작자 표시 3.0 일반 라이선스 및 Apache 2.0 라이선스에 따라 사용이 허가되었습니다.