Cloud Dataproc의 Apache Spark 및 Jupyter 노트북

1. 개요

이 실습에서는 Cloud Dataproc에서 Apache SparkJupyter 노트북을 설정하고 사용하는 방법을 다룹니다.

Jupyter Notebook은 코드를 대화형으로 실행하고 즉시 결과를 확인할 수 있기 때문에 탐색적 데이터 분석 및 머신러닝 모델 빌드에 널리 사용됩니다.

그러나 Apache Spark 및 Jupyter 노트북을 설정하고 사용하는 것은 복잡할 수 있습니다.

b9ed855863c57d6.png

Cloud Dataproc을 사용하면 Apache Spark, Jupyter 구성요소, 구성요소 게이트웨이가 있는 Dataproc 클러스터를 약 90초 내에 만들 수 있어 작업을 빠르고 쉽게 수행할 수 있습니다.

학습할 내용

이 Codelab에서는 다음 내용을 학습합니다.

  • 클러스터에 사용할 Google Cloud Storage 버킷 만들기
  • Jupyter 및 구성요소 게이트웨이로 Dataproc 클러스터 만들기
  • Dataproc에서 JupyterLab 웹 UI 액세스
  • Spark BigQuery 스토리지 커넥터를 사용하는 노트북 만들기
  • Spark 작업 실행 및 결과 표시

Google Cloud에서 이 실습을 진행하는 데 드는 총 비용은 약 $1입니다. Cloud Dataproc 가격 책정에 대한 자세한 내용은 여기를 참조하세요.

2. 프로젝트 만들기

console.cloud.google.com에서 Google Cloud Platform 콘솔에 로그인하고 새 프로젝트를 만듭니다.

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

다음으로 Google Cloud 리소스를 사용할 수 있도록 Cloud 콘솔에서 결제를 사용 설정해야 합니다.

이 Codelab을 실행하는 데에는 많은 비용이 들지 않지만 더 많은 리소스를 사용하기로 결정하거나 계속 실행하면 비용이 더 많이 들 수 있습니다. 이 Codelab의 마지막 섹션에서는 프로젝트를 삭제하는 방법을 안내합니다.

Google Cloud Platform의 신규 사용자에게는 $300의 무료 체험판이 제공됩니다.

3. 환경 설정

먼저 Cloud 콘솔의 오른쪽 상단에 있는 버튼을 클릭하여 Cloud Shell을 엽니다.

a10c47ee6ca41c54.png

Cloud Shell이 로드된 후 다음 명령어를 실행하여 이전 단계의 프로젝트 ID를 설정합니다**:**

gcloud config set project <project_id>

Cloud 콘솔의 왼쪽 상단에 있는 프로젝트를 클릭하여 프로젝트 ID를 확인할 수도 있습니다.

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

다음으로 Dataproc, Compute Engine, BigQuery Storage API를 사용 설정합니다.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

또는 Cloud 콘솔에서 이 작업을 수행할 수도 있습니다. 화면 왼쪽 상단의 메뉴 아이콘을 클릭합니다.

2bfc27ef9ba2ec7d.png

드롭다운에서 API 관리자를 선택합니다.

408af5f32c4b7c25.png

API 및 서비스 사용 설정을 클릭합니다.

a9c0e84296a7ba5b.png

다음 API를 검색하고 사용 설정합니다.

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. GCS 버킷 만들기

데이터와 가장 가까운 리전에 Google Cloud Storage 버킷을 만들고 고유한 이름을 지정합니다.

이는 Dataproc 클러스터에 사용됩니다.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

다음과 같은 출력이 표시됩니다.

Creating gs://<your-bucket-name>/...

5. Jupyter를 사용하여 Dataproc 클러스터 만들기 및 구성요소 게이트웨이

클러스터 만들기

클러스터의 환경 변수 설정

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

그런 다음 이 gcloud 명령어를 실행하여 클러스터에서 Jupyter를 사용하는 데 필요한 모든 구성요소로 클러스터를 만듭니다.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

클러스터가 생성되는 동안 다음 출력이 표시됩니다.

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

클러스터를 만드는 데 약 90초가 소요되며, 클러스터가 준비되면 Dataproc Cloud 콘솔 UI에서 클러스터에 액세스할 수 있습니다.

기다리는 동안 아래 내용을 계속 읽어 gcloud 명령어에 사용되는 플래그에 대해 자세히 알아보세요.

클러스터가 생성되면 다음과 같이 출력됩니다.

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

gcloud dataproc create 명령어에 사용되는 플래그

다음은 gcloud dataproc create 명령어에 사용되는 플래그의 세부정보입니다.

--region=${REGION}

클러스터가 생성될 리전 및 영역을 지정합니다. 여기에서 사용 가능한 지역 목록을 확인할 수 있습니다.

--image-version=1.4

클러스터에서 사용할 이미지 버전입니다. 여기에서 사용 가능한 버전 목록을 확인할 수 있습니다.

--bucket=${BUCKET_NAME}

앞서 만든 Google Cloud Storage 버킷을 지정하여 클러스터에 사용합니다. GCS 버킷을 제공하지 않으면 자동으로 생성됩니다.

또한 GCS 버킷이 삭제되지 않으므로 클러스터를 삭제해도 노트북이 저장됩니다.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

Dataproc 클러스터에 사용할 머신 유형입니다. 사용 가능한 머신 유형 목록은 여기에서 확인할 수 있습니다.

–num-workers 플래그를 설정하지 않으면 기본적으로 마스터 노드 1개와 워커 노드 2개가 생성됩니다.

--optional-components=ANACONDA,JUPYTER

선택적 구성요소에 이러한 값을 설정하면 Jupyter 및 Anaconda에 필요한 모든 라이브러리 (Jupyter 노트북에 필요)가 클러스터에 설치됩니다.

--enable-component-gateway

Component Gateway를 사용 설정하면 Apache Knox 및 Inverting Proxy를 사용하는 App Engine 링크가 생성되어 Jupyter 및 JupyterLab 웹 인터페이스에 간편하고 안전하고 인증된 액세스 권한을 부여하므로 더 이상 SSH 터널을 만들 필요가 없습니다.

또한 작업 성능과 클러스터 사용 패턴을 확인하는 데 유용한 Yarn Resource Manager 및 Spark History Server 등 클러스터의 다른 도구에 대한 링크도 생성합니다.

6. Apache Spark 노트북 만들기

JupyterLab 웹 인터페이스 액세스

클러스터가 준비되면 Dataproc 클러스터 - Cloud 콘솔로 이동한 후 생성한 클러스터를 클릭하고 '웹 인터페이스' 탭으로 이동하여 JupyterLab 웹 인터페이스로 연결되는 구성요소 게이트웨이 링크를 찾을 수 있습니다.

afc40202d555de47.png

기존 노트북 인터페이스인 Jupyter 또는 Project Jupyter의 차세대 UI로 설명되는 JupyterLab에 액세스할 수 있습니다.

JupyterLab에는 유용한 새 UI 기능이 많이 있습니다. 따라서 노트북을 처음 사용하거나 최신 개선사항을 찾고 있다면 공식 문서에 따라 기존 Jupyter 인터페이스를 대체하게 될 JupyterLab을 사용하는 것이 좋습니다.

Python 3 커널로 노트북 만들기

a463623f2ebf0518.png

런처 탭에서 Python 3 노트북 아이콘을 클릭하여 Python 3 커널 (PySpark 커널이 아님)으로 노트북을 만듭니다. 그러면 노트북에서 SparkSession을 구성하고 BigQuery Storage API를 사용하는 데 필요한 spark-bigquery-connector를 포함할 수 있습니다.

노트북 이름 바꾸기

196a3276ed07e1f3.png

왼쪽 또는 상단 탐색 메뉴의 사이드바에서 노트북 이름을 마우스 오른쪽 버튼으로 클릭하고 노트북 이름을 'BigQuery Storage 및 Spark DataFrames.ipynb'

노트북에서 Spark 코드 실행

fbac38062e5bb9cf.png

이 노트북에서는 BigQuery Storage API를 활용하여 BigQuery와 Spark 간에 데이터를 읽고 쓰는 도구인 spark-bigquery-connector를 사용합니다.

BigQuery Storage API는 RPC 기반 프로토콜을 사용하여 BigQuery의 데이터에 액세스하는 방식을 크게 개선합니다. 병렬 데이터 읽기 및 쓰기는 물론 Apache AvroApache Arrow와 같은 다양한 직렬화 형식을 지원합니다. 개략적으로 설명하면 특히 대규모 데이터 세트에서 성능이 크게 향상됩니다.

첫 번째 셀에서 올바른 버전의 spark-bigquery-connector jar를 포함할 수 있도록 클러스터의 Scala 버전을 확인합니다.

입력 [1]:

!scala -version

출력 [1]:f580e442576b8b1f.png Spark 세션을 만들고 Spark-bigquery-connector 패키지를 포함합니다.

Scala 버전이 2.11인 경우 다음 패키지를 사용합니다.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

Scala 버전이 2.12인 경우 다음 패키지를 사용합니다.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

입력 [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

repl.eagerEval 사용 설정

이렇게 하면 새로 df.show()를 표시할 필요 없이 각 단계에서 DataFrames의 결과가 출력되며 출력 형식도 개선됩니다.

입력 [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

Spark DataFrame으로 BigQuery 테이블 읽기

공개 BigQuery 데이터 세트에서 데이터를 읽어 Spark DataFrame을 만듭니다. 이렇게 하면 spark-bigquery-connector 및 BigQuery Storage API를 사용하여 데이터를 Spark 클러스터에 로드합니다.

Spark DataFrame을 만들고 Wikipedia 페이지 조회를 위한 BigQuery 공개 데이터 세트에서 데이터를 로드합니다. spark-bigquery-connector를 사용하여 데이터 처리가 진행되는 Spark에 데이터를 로드하므로 데이터에 대한 쿼리를 실행하지 않습니다. 이 코드를 실행하면 Spark에서 지연 평가가 실행되므로 실제로 테이블이 로드되지 않으며 다음 단계에서 실행이 실행됩니다.

입력 [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

출력 [4]:

c107a33f6fc30ca.png

필요한 열을 선택하고 filter()의 별칭인 where()을(를) 사용하여 필터를 적용합니다.

이 코드가 실행되면 Spark 작업이 트리거되고 이 시점에서 BigQuery Storage에서 데이터를 읽습니다.

입력 [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

출력 [5]:

ad363cbe510d625a.png

제목별로 그룹화하고 페이지 조회수를 기준으로 정렬하여 인기 페이지를 확인하세요.

입력 [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

출력 [6]:f718abd05afc0f4.png

7. 노트북에서 Python 그래프 작성 라이브러리 사용

Python에서 제공되는 다양한 표시 라이브러리를 활용하여 Spark 작업의 출력을 표시할 수 있습니다.

Spark DataFrame을 Pandas DataFrame으로 변환

Spark DataFrame을 Pandas DataFrame으로 변환하고 날짜 시간을 색인으로 설정합니다. 이는 Python에서 직접 데이터로 작업하고 사용 가능한 다양한 Python 표시 라이브러리를 사용하여 데이터를 표시하려고 할 때 유용합니다.

입력 [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

출력 [7]:

3df2aaa2351f028d.png

Pandas DataFrame 그리기

노트북에 플롯을 표시하는 데 필요한 Matplotlib 라이브러리 가져오기

입력 [8]:

import matplotlib.pyplot as plt

Pandas 플롯 함수를 사용하여 Pandas DataFrame에서 선 차트를 만듭니다.

입력 [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

출력 [9]:bade7042c3033594.png

노트북이 GCS에 저장되었는지 확인하기

이제 Dataproc 클러스터에서 첫 번째 Jupyter 노트북이 실행되고 있습니다. 노트북에 이름을 지정하면 클러스터를 만들 때 사용되는 GCS 버킷에 자동으로 저장됩니다.

Cloud Shell에서 이 gsutil 명령어를 사용하여 확인할 수 있습니다.

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

다음과 같은 출력이 표시됩니다.

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. 최적화 도움말 - 메모리의 데이터 캐시

매번 BigQuery Storage에서 데이터를 읽는 대신 메모리에 데이터를 저장하려는 경우가 있을 수 있습니다.

이 작업은 BigQuery에서 데이터를 읽고 필터를 BigQuery로 푸시합니다. 그런 다음 Apache Spark에서 집계가 계산됩니다.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

테이블의 캐시를 포함하도록 위의 작업을 수정할 수 있으며, 이제 위키 열의 필터가 Apache Spark에 의해 메모리에 적용됩니다.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

그러면 BigQuery 스토리지에서 데이터를 다시 읽는 대신 캐시된 데이터를 사용하여 다른 위키 언어를 필터링할 수 있으므로 훨씬 빠르게 실행됩니다.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

다음을 실행하여 캐시를 삭제할 수 있습니다.

df_wiki_all.unpersist()

9. 추가 사용 사례를 위한 노트북 예시

Cloud Dataproc GitHub 저장소에는 다양한 Google Cloud Platform 제품 및 오픈소스 도구를 사용하여 데이터 로드, 데이터 저장, 데이터 그래프 작성을 위한 일반적인 Apache Spark 패턴이 포함된 Jupyter 노트북이 있습니다.

10. 삭제

이 빠른 시작을 완료한 후 GCP 계정에 불필요한 비용이 청구되지 않도록 하려면 다음 안내를 따르세요.

  1. 내가 만든 환경의 Cloud Storage 버킷 삭제
  2. Dataproc 환경을 삭제합니다.

이 Codelab만을 위해 프로젝트를 만든 경우 선택적으로 프로젝트를 삭제할 수도 있습니다.

  1. GCP Console에서 프로젝트 페이지로 이동합니다.
  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 상자에 프로젝트 ID를 입력한 후 종료를 클릭하여 프로젝트를 삭제합니다.

라이선스

이 작업물은 Creative Commons Attribution 3.0 일반 라이선스 및 Apache 2.0 라이선스에 따라 사용이 허가되었습니다.