1. 개요
이 실습에서는 Cloud Dataproc에서 Apache Spark와 Jupyter 노트북을 설정하고 사용하는 방법을 알아봅니다.
Jupyter 노트북은 코드를 대화형으로 실행하고 결과를 즉시 확인할 수 있으므로 실험적 데이터 분석 및 머신러닝 모델 빌드에 널리 사용됩니다.
하지만 Apache Spark 및 Jupyter Notebook을 설정하고 사용하는 것은 복잡할 수 있습니다.

Cloud Dataproc을 사용하면 Apache Spark, Jupyter 구성요소, 구성요소 게이트웨이를 사용하여 약 90초 만에 Dataproc 클러스터를 만들어 빠르고 쉽게 시작할 수 있습니다.
학습할 내용
이 Codelab에서는 다음 내용을 학습합니다.
- 클러스터용 Google Cloud Storage 버킷 만들기
- Jupyter 및 구성요소 게이트웨이를 사용하여 Dataproc 클러스터를 만듭니다.
- Dataproc에서 JupyterLab 웹 UI 액세스
- Spark BigQuery Storage 커넥터를 사용하는 노트북 만들기
- Spark 작업을 실행하고 결과를 플롯합니다.
Google Cloud에서 이 실습을 진행하는 데 드는 총 비용은 약 $1입니다. Cloud Dataproc 가격 책정에 대한 자세한 내용은 여기에서 확인할 수 있습니다.
2. 프로젝트 만들기
console.cloud.google.com에서 Google Cloud Platform 콘솔에 로그인하고 새 프로젝트를 만듭니다.



그런 후 Google Cloud 리소스를 사용할 수 있도록 Cloud 콘솔에서 결제를 사용 설정해야 합니다.
이 Codelab을 실행하는 과정에는 많은 비용이 들지 않지만 더 많은 리소스를 사용하려고 하거나 실행 중일 경우 비용이 더 들 수 있습니다. 이 Codelab의 마지막 섹션에서는 프로젝트를 정리하는 방법을 안내합니다.
Google Cloud Platform 신규 사용자는 $300 상당의 무료 체험판을 사용할 수 있습니다.
3. 환경 설정
먼저 클라우드 콘솔의 오른쪽 상단에 있는 버튼을 클릭하여 Cloud Shell을 엽니다.

Cloud Shell이 로드되면 다음 명령어를 실행하여 이전 단계에서 프로젝트 ID를 설정합니다.
gcloud config set project <project_id>
프로젝트 ID는 클라우드 콘솔의 왼쪽 상단에 있는 프로젝트를 클릭하여 확인할 수도 있습니다.


다음으로 Dataproc, Compute Engine, BigQuery Storage API를 사용 설정합니다.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
또는 Cloud 콘솔에서 이 작업을 수행할 수 있습니다. 화면 왼쪽 상단의 메뉴 아이콘을 클릭합니다.

드롭다운에서 API 관리자를 선택합니다.

API 및 서비스 사용 설정을 클릭합니다.

다음 API를 검색하여 사용 설정합니다.
- Compute Engine API
- Dataproc API
- BigQuery API
- BigQuery Storage API
4. GCS 버킷 만들기
데이터와 가장 가까운 리전에 Google Cloud Storage 버킷을 만들고 고유한 이름을 지정합니다.
이는 Dataproc 클러스터에 사용됩니다.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
다음 출력이 표시될 것입니다.
Creating gs://<your-bucket-name>/...
5. Jupyter 및 구성요소 게이트웨이를 사용하여 Dataproc 클러스터 만들기
클러스터 만들기
클러스터의 환경 변수 설정
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
그런 다음 다음 gcloud 명령어를 실행하여 클러스터에서 Jupyter를 사용하는 데 필요한 모든 구성요소가 포함된 클러스터를 만듭니다.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
클러스터가 생성되는 동안 다음 출력이 표시됩니다.
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
클러스터를 만드는 데 약 90초가 걸리며, 클러스터가 준비되면 Dataproc Cloud 콘솔 UI에서 클러스터에 액세스할 수 있습니다.
기다리는 동안 아래를 계속 읽고 gcloud 명령어에 사용되는 플래그에 대해 자세히 알아보세요.
클러스터가 생성되면 다음 출력이 표시됩니다.
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
gcloud dataproc create 명령어에 사용되는 플래그
gcloud dataproc create 명령어에 사용되는 플래그의 분류는 다음과 같습니다.
--region=${REGION}
클러스터가 생성될 리전과 영역을 지정합니다. 사용 가능한 지역 목록은 여기에서 확인할 수 있습니다.
--image-version=1.4
클러스터에서 사용할 이미지 버전입니다. 사용 가능한 버전 목록은 여기에서 확인할 수 있습니다.
--bucket=${BUCKET_NAME}
클러스터에 사용할 이전에 만든 Google Cloud Storage 버킷을 지정합니다. GCS 버킷을 제공하지 않으면 버킷이 생성됩니다.
GCS 버킷은 삭제되지 않으므로 클러스터를 삭제해도 노트북이 저장되는 위치이기도 합니다.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
Dataproc 클러스터에 사용할 머신 유형입니다. 사용 가능한 머신 유형 목록은 여기에서 확인할 수 있습니다.
--num-workers 플래그를 설정하지 않으면 기본적으로 마스터 노드 1개와 워커 노드 2개가 생성됩니다.
--optional-components=ANACONDA,JUPYTER
선택적 구성요소에 이러한 값을 설정하면 클러스터에 Jupyter 및 Anaconda (Jupyter 노트북에 필요)에 필요한 모든 라이브러리가 설치됩니다.
--enable-component-gateway
구성요소 게이트웨이를 사용 설정하면 Apache Knox 및 역방향 프록시를 사용하여 App Engine 링크가 생성되므로 Jupyter 및 JupyterLab 웹 인터페이스에 쉽고 안전하며 인증된 액세스가 제공되어 더 이상 SSH 터널을 만들 필요가 없습니다.
또한 작업의 성능과 클러스터 사용 패턴을 확인하는 데 유용한 Yarn 리소스 관리자 및 Spark 기록 서버를 비롯한 클러스터의 다른 도구 링크도 생성됩니다.
6. Apache Spark 노트북 만들기
JupyterLab 웹 인터페이스 액세스
클러스터가 준비되면 Dataproc 클러스터 - Cloud 콘솔로 이동하고, 생성한 클러스터를 클릭하고, 웹 인터페이스 탭으로 이동하여 JupyterLab 웹 인터페이스의 구성요소 게이트웨이 링크를 찾을 수 있습니다.

클래식 노트북 인터페이스인 Jupyter 또는 Project Jupyter의 차세대 UI로 설명되는 JupyterLab에 액세스할 수 있습니다.
JupyterLab에는 다양한 새로운 UI 기능이 있으므로 노트북을 처음 사용하거나 최신 개선사항을 찾고 있다면 JupyterLab을 사용하는 것이 좋습니다. 공식 문서에 따르면 JupyterLab이 결국 기존 Jupyter 인터페이스를 대체할 예정이기 때문입니다.
Python 3 커널로 노트북 만들기

런처 탭에서 Python 3 노트북 아이콘을 클릭하여 Python 3 커널 (PySpark 커널 아님)이 있는 노트북을 만듭니다. 이렇게 하면 노트북에서 SparkSession을 구성하고 BigQuery Storage API를 사용하는 데 필요한 spark-bigquery-connector를 포함할 수 있습니다.
노트북 이름 변경

왼쪽 사이드바 또는 상단 탐색에서 노트북 이름을 마우스 오른쪽 버튼으로 클릭하고 노트북 이름을 'BigQuery Storage & Spark DataFrames.ipynb'로 바꿉니다.
노트북에서 Spark 코드 실행

이 노트북에서는 BigQuery Storage API를 사용하여 BigQuery와 Spark 간에 데이터를 읽고 쓰는 도구인 spark-bigquery-connector를 사용합니다.
BigQuery Storage API는 RPC 기반 프로토콜을 사용하여 BigQuery의 데이터 액세스를 크게 개선합니다. Apache Avro 및 Apache Arrow와 같은 다양한 직렬화 형식은 물론 병렬 데이터 읽기 및 쓰기를 지원합니다. 개괄적으로 말하자면 이 기능은 특히 대규모 데이터 세트에서 성능을 크게 향상시킵니다.
첫 번째 셀에서 클러스터의 Scala 버전을 확인하여 올바른 버전의 spark-bigquery-connector jar를 포함할 수 있습니다.
입력 [1]:
!scala -version
출력 [1]:
Spark 세션을 만들고 spark-bigquery-connector 패키지를 포함합니다.
Scala 버전이 2.11인 경우 다음 패키지를 사용하세요.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
Scala 버전이 2.12인 경우 다음 패키지를 사용하세요.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
입력 [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
repl.eagerEval 사용 설정
이렇게 하면 각 단계에서 DataFrame의 결과가 출력되며 df.show()를 새로 표시할 필요가 없고 출력의 형식이 개선됩니다.
입력 [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
BigQuery 테이블을 Spark DataFrame으로 읽기
공개 BigQuery 데이터 세트에서 데이터를 읽어 Spark DataFrame을 만듭니다. 이렇게 하면 spark-bigquery-connector 및 BigQuery Storage API를 사용하여 데이터를 Spark 클러스터에 로드할 수 있습니다.
Spark DataFrame을 만들고 Wikipedia 페이지 조회수를 위한 BigQuery 공개 데이터 세트에서 데이터를 로드합니다. spark-bigquery-connector를 사용하여 데이터를 Spark에 로드하면 데이터 처리가 발생하는 Spark에서 데이터에 대한 쿼리가 실행되지 않습니다. 이 코드를 실행하면 실제로 표가 로드되지 않습니다. Spark의 지연 평가이기 때문이며 실행은 다음 단계에서 발생합니다.
입력 [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
출력 [4]:

필요한 열을 선택하고 filter()의 별칭인 where()를 사용하여 필터를 적용합니다.
이 코드를 실행하면 Spark 작업이 트리거되고 이 시점에서 BigQuery Storage에서 데이터가 읽힙니다.
입력 [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
출력 [5]:

제목별로 그룹화하고 페이지 조회수별로 정렬하여 인기 페이지 확인
입력 [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
출력 [6]:
7. 노트북에서 Python 플로팅 라이브러리 사용
Python에서 사용할 수 있는 다양한 플로팅 라이브러리를 사용하여 Spark 작업의 출력을 플롯할 수 있습니다.
Spark DataFrame을 Pandas DataFrame으로 변환
Spark DataFrame을 Pandas DataFrame으로 변환하고 datehour를 색인으로 설정합니다. Python에서 직접 데이터를 처리하고 사용 가능한 다양한 Python 플로팅 라이브러리를 사용하여 데이터를 플롯하려는 경우에 유용합니다.
입력 [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
출력 [7]:

Pandas DataFrame 플로팅
노트북에 플롯을 표시하는 데 필요한 matplotlib 라이브러리를 가져옵니다.
입력 [8]:
import matplotlib.pyplot as plt
Pandas 플롯 함수를 사용하여 Pandas DataFrame에서 선 차트를 만듭니다.
입력 [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
출력 [9]:
노트북이 GCS에 저장되었는지 확인
이제 Dataproc 클러스터에서 첫 번째 Jupyter 노트북이 실행됩니다. 노트북에 이름을 지정하면 클러스터를 만들 때 사용된 GCS 버킷에 자동으로 저장됩니다.
Cloud Shell에서 다음 gsutil 명령어를 사용하여 이를 확인할 수 있습니다.
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
다음 출력이 표시될 것입니다.
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. 최적화 팁 - 메모리에 데이터 캐시
매번 BigQuery 스토리지에서 데이터를 읽는 대신 메모리에 데이터를 저장해야 하는 시나리오가 있을 수 있습니다.
이 작업은 BigQuery에서 데이터를 읽고 필터를 BigQuery로 푸시합니다. 그러면 집계가 Apache Spark에서 계산됩니다.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
위의 작업을 수정하여 테이블의 캐시를 포함할 수 있으며 이제 Apache Spark에서 메모리에 wiki 열의 필터가 적용됩니다.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
그런 다음 BigQuery 스토리지에서 데이터를 다시 읽는 대신 캐시된 데이터를 사용하여 다른 위키 언어를 필터링할 수 있으므로 훨씬 빠르게 실행됩니다.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
다음 명령어를 실행하여 캐시를 삭제할 수 있습니다.
df_wiki_all.unpersist()
9. 다양한 사용 사례의 예시 노트북
Cloud Dataproc GitHub 저장소에는 다양한 Google Cloud Platform 제품 및 오픈소스 도구를 사용하여 데이터를 로드하고, 데이터를 저장하고, 데이터를 플롯하는 일반적인 Apache Spark 패턴이 포함된 Jupyter 노트북이 있습니다.
10. 삭제
이 빠른 시작을 완료한 후 GCP 계정에 불필요한 비용이 청구되지 않도록 다음 단계를 따르세요.
- 환경 및 사용자가 만든 환경의 Cloud Storage 버킷을 삭제합니다.
- Dataproc 환경을 삭제합니다.
이 Codelab만을 위한 프로젝트를 만든 경우 선택적으로 프로젝트를 삭제할 수도 있습니다.
- GCP 콘솔에서 프로젝트 페이지로 이동합니다.
- 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
- 상자에 프로젝트 ID를 입력한 다음 종료를 클릭하여 프로젝트를 삭제합니다.
라이선스
이 작업물은 크리에이티브 커먼즈 저작자 표시 3.0 일반 라이선스 및 Apache 2.0 라이선스에 따라 사용이 허가되었습니다.