1. 개요
이 실습에서는 Cloud Dataproc에서 Apache Spark 및 Jupyter 노트북을 설정하고 사용하는 방법을 다룹니다.
Jupyter Notebook은 코드를 대화형으로 실행하고 즉시 결과를 확인할 수 있기 때문에 탐색적 데이터 분석 및 머신러닝 모델 빌드에 널리 사용됩니다.
그러나 Apache Spark 및 Jupyter 노트북을 설정하고 사용하는 것은 복잡할 수 있습니다.
Cloud Dataproc을 사용하면 Apache Spark, Jupyter 구성요소, 구성요소 게이트웨이가 있는 Dataproc 클러스터를 약 90초 내에 만들 수 있어 작업을 빠르고 쉽게 수행할 수 있습니다.
학습할 내용
이 Codelab에서는 다음 내용을 학습합니다.
- 클러스터에 사용할 Google Cloud Storage 버킷 만들기
- Jupyter 및 구성요소 게이트웨이로 Dataproc 클러스터 만들기
- Dataproc에서 JupyterLab 웹 UI 액세스
- Spark BigQuery 스토리지 커넥터를 사용하는 노트북 만들기
- Spark 작업 실행 및 결과 표시
Google Cloud에서 이 실습을 진행하는 데 드는 총 비용은 약 $1입니다. Cloud Dataproc 가격 책정에 대한 자세한 내용은 여기를 참조하세요.
2. 프로젝트 만들기
console.cloud.google.com에서 Google Cloud Platform 콘솔에 로그인하고 새 프로젝트를 만듭니다.
다음으로 Google Cloud 리소스를 사용할 수 있도록 Cloud 콘솔에서 결제를 사용 설정해야 합니다.
이 Codelab을 실행하는 데에는 많은 비용이 들지 않지만 더 많은 리소스를 사용하기로 결정하거나 계속 실행하면 비용이 더 많이 들 수 있습니다. 이 Codelab의 마지막 섹션에서는 프로젝트를 삭제하는 방법을 안내합니다.
Google Cloud Platform의 신규 사용자에게는 $300의 무료 체험판이 제공됩니다.
3. 환경 설정
먼저 Cloud 콘솔의 오른쪽 상단에 있는 버튼을 클릭하여 Cloud Shell을 엽니다.
Cloud Shell이 로드된 후 다음 명령어를 실행하여 이전 단계의 프로젝트 ID를 설정합니다**:**
gcloud config set project <project_id>
Cloud 콘솔의 왼쪽 상단에 있는 프로젝트를 클릭하여 프로젝트 ID를 확인할 수도 있습니다.
다음으로 Dataproc, Compute Engine, BigQuery Storage API를 사용 설정합니다.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
또는 Cloud 콘솔에서 이 작업을 수행할 수도 있습니다. 화면 왼쪽 상단의 메뉴 아이콘을 클릭합니다.
드롭다운에서 API 관리자를 선택합니다.
API 및 서비스 사용 설정을 클릭합니다.
다음 API를 검색하고 사용 설정합니다.
- Compute Engine API
- Dataproc API
- BigQuery API
- BigQuery Storage API
4. GCS 버킷 만들기
데이터와 가장 가까운 리전에 Google Cloud Storage 버킷을 만들고 고유한 이름을 지정합니다.
이는 Dataproc 클러스터에 사용됩니다.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
다음과 같은 출력이 표시됩니다.
Creating gs://<your-bucket-name>/...
5. Jupyter를 사용하여 Dataproc 클러스터 만들기 및 구성요소 게이트웨이
클러스터 만들기
클러스터의 환경 변수 설정
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
그런 다음 이 gcloud 명령어를 실행하여 클러스터에서 Jupyter를 사용하는 데 필요한 모든 구성요소로 클러스터를 만듭니다.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
클러스터가 생성되는 동안 다음 출력이 표시됩니다.
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
클러스터를 만드는 데 약 90초가 소요되며, 클러스터가 준비되면 Dataproc Cloud 콘솔 UI에서 클러스터에 액세스할 수 있습니다.
기다리는 동안 아래 내용을 계속 읽어 gcloud 명령어에 사용되는 플래그에 대해 자세히 알아보세요.
클러스터가 생성되면 다음과 같이 출력됩니다.
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
gcloud dataproc create 명령어에 사용되는 플래그
다음은 gcloud dataproc create 명령어에 사용되는 플래그의 세부정보입니다.
--region=${REGION}
클러스터가 생성될 리전 및 영역을 지정합니다. 여기에서 사용 가능한 지역 목록을 확인할 수 있습니다.
--image-version=1.4
클러스터에서 사용할 이미지 버전입니다. 여기에서 사용 가능한 버전 목록을 확인할 수 있습니다.
--bucket=${BUCKET_NAME}
앞서 만든 Google Cloud Storage 버킷을 지정하여 클러스터에 사용합니다. GCS 버킷을 제공하지 않으면 자동으로 생성됩니다.
또한 GCS 버킷이 삭제되지 않으므로 클러스터를 삭제해도 노트북이 저장됩니다.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
Dataproc 클러스터에 사용할 머신 유형입니다. 사용 가능한 머신 유형 목록은 여기에서 확인할 수 있습니다.
–num-workers 플래그를 설정하지 않으면 기본적으로 마스터 노드 1개와 워커 노드 2개가 생성됩니다.
--optional-components=ANACONDA,JUPYTER
선택적 구성요소에 이러한 값을 설정하면 Jupyter 및 Anaconda에 필요한 모든 라이브러리 (Jupyter 노트북에 필요)가 클러스터에 설치됩니다.
--enable-component-gateway
Component Gateway를 사용 설정하면 Apache Knox 및 Inverting Proxy를 사용하는 App Engine 링크가 생성되어 Jupyter 및 JupyterLab 웹 인터페이스에 간편하고 안전하고 인증된 액세스 권한을 부여하므로 더 이상 SSH 터널을 만들 필요가 없습니다.
또한 작업 성능과 클러스터 사용 패턴을 확인하는 데 유용한 Yarn Resource Manager 및 Spark History Server 등 클러스터의 다른 도구에 대한 링크도 생성합니다.
6. Apache Spark 노트북 만들기
JupyterLab 웹 인터페이스 액세스
클러스터가 준비되면 Dataproc 클러스터 - Cloud 콘솔로 이동한 후 생성한 클러스터를 클릭하고 '웹 인터페이스' 탭으로 이동하여 JupyterLab 웹 인터페이스로 연결되는 구성요소 게이트웨이 링크를 찾을 수 있습니다.
기존 노트북 인터페이스인 Jupyter 또는 Project Jupyter의 차세대 UI로 설명되는 JupyterLab에 액세스할 수 있습니다.
JupyterLab에는 유용한 새 UI 기능이 많이 있습니다. 따라서 노트북을 처음 사용하거나 최신 개선사항을 찾고 있다면 공식 문서에 따라 기존 Jupyter 인터페이스를 대체하게 될 JupyterLab을 사용하는 것이 좋습니다.
Python 3 커널로 노트북 만들기
런처 탭에서 Python 3 노트북 아이콘을 클릭하여 Python 3 커널 (PySpark 커널이 아님)으로 노트북을 만듭니다. 그러면 노트북에서 SparkSession을 구성하고 BigQuery Storage API를 사용하는 데 필요한 spark-bigquery-connector를 포함할 수 있습니다.
노트북 이름 바꾸기
왼쪽 또는 상단 탐색 메뉴의 사이드바에서 노트북 이름을 마우스 오른쪽 버튼으로 클릭하고 노트북 이름을 'BigQuery Storage 및 Spark DataFrames.ipynb'
노트북에서 Spark 코드 실행
이 노트북에서는 BigQuery Storage API를 활용하여 BigQuery와 Spark 간에 데이터를 읽고 쓰는 도구인 spark-bigquery-connector를 사용합니다.
BigQuery Storage API는 RPC 기반 프로토콜을 사용하여 BigQuery의 데이터에 액세스하는 방식을 크게 개선합니다. 병렬 데이터 읽기 및 쓰기는 물론 Apache Avro 및 Apache Arrow와 같은 다양한 직렬화 형식을 지원합니다. 개략적으로 설명하면 특히 대규모 데이터 세트에서 성능이 크게 향상됩니다.
첫 번째 셀에서 올바른 버전의 spark-bigquery-connector jar를 포함할 수 있도록 클러스터의 Scala 버전을 확인합니다.
입력 [1]:
!scala -version
출력 [1]: Spark 세션을 만들고 Spark-bigquery-connector 패키지를 포함합니다.
Scala 버전이 2.11인 경우 다음 패키지를 사용합니다.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
Scala 버전이 2.12인 경우 다음 패키지를 사용합니다.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
입력 [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
repl.eagerEval 사용 설정
이렇게 하면 새로 df.show()를 표시할 필요 없이 각 단계에서 DataFrames의 결과가 출력되며 출력 형식도 개선됩니다.
입력 [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
Spark DataFrame으로 BigQuery 테이블 읽기
공개 BigQuery 데이터 세트에서 데이터를 읽어 Spark DataFrame을 만듭니다. 이렇게 하면 spark-bigquery-connector 및 BigQuery Storage API를 사용하여 데이터를 Spark 클러스터에 로드합니다.
Spark DataFrame을 만들고 Wikipedia 페이지 조회를 위한 BigQuery 공개 데이터 세트에서 데이터를 로드합니다. spark-bigquery-connector를 사용하여 데이터 처리가 진행되는 Spark에 데이터를 로드하므로 데이터에 대한 쿼리를 실행하지 않습니다. 이 코드를 실행하면 Spark에서 지연 평가가 실행되므로 실제로 테이블이 로드되지 않으며 다음 단계에서 실행이 실행됩니다.
입력 [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
출력 [4]:
필요한 열을 선택하고 filter()
의 별칭인 where()
을(를) 사용하여 필터를 적용합니다.
이 코드가 실행되면 Spark 작업이 트리거되고 이 시점에서 BigQuery Storage에서 데이터를 읽습니다.
입력 [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
출력 [5]:
제목별로 그룹화하고 페이지 조회수를 기준으로 정렬하여 인기 페이지를 확인하세요.
입력 [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
출력 [6]:
7. 노트북에서 Python 그래프 작성 라이브러리 사용
Python에서 제공되는 다양한 표시 라이브러리를 활용하여 Spark 작업의 출력을 표시할 수 있습니다.
Spark DataFrame을 Pandas DataFrame으로 변환
Spark DataFrame을 Pandas DataFrame으로 변환하고 날짜 시간을 색인으로 설정합니다. 이는 Python에서 직접 데이터로 작업하고 사용 가능한 다양한 Python 표시 라이브러리를 사용하여 데이터를 표시하려고 할 때 유용합니다.
입력 [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
출력 [7]:
Pandas DataFrame 그리기
노트북에 플롯을 표시하는 데 필요한 Matplotlib 라이브러리 가져오기
입력 [8]:
import matplotlib.pyplot as plt
Pandas 플롯 함수를 사용하여 Pandas DataFrame에서 선 차트를 만듭니다.
입력 [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
출력 [9]:
노트북이 GCS에 저장되었는지 확인하기
이제 Dataproc 클러스터에서 첫 번째 Jupyter 노트북이 실행되고 있습니다. 노트북에 이름을 지정하면 클러스터를 만들 때 사용되는 GCS 버킷에 자동으로 저장됩니다.
Cloud Shell에서 이 gsutil 명령어를 사용하여 확인할 수 있습니다.
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
다음과 같은 출력이 표시됩니다.
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. 최적화 도움말 - 메모리의 데이터 캐시
매번 BigQuery Storage에서 데이터를 읽는 대신 메모리에 데이터를 저장하려는 경우가 있을 수 있습니다.
이 작업은 BigQuery에서 데이터를 읽고 필터를 BigQuery로 푸시합니다. 그런 다음 Apache Spark에서 집계가 계산됩니다.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
테이블의 캐시를 포함하도록 위의 작업을 수정할 수 있으며, 이제 위키 열의 필터가 Apache Spark에 의해 메모리에 적용됩니다.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
그러면 BigQuery 스토리지에서 데이터를 다시 읽는 대신 캐시된 데이터를 사용하여 다른 위키 언어를 필터링할 수 있으므로 훨씬 빠르게 실행됩니다.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
다음을 실행하여 캐시를 삭제할 수 있습니다.
df_wiki_all.unpersist()
9. 추가 사용 사례를 위한 노트북 예시
Cloud Dataproc GitHub 저장소에는 다양한 Google Cloud Platform 제품 및 오픈소스 도구를 사용하여 데이터 로드, 데이터 저장, 데이터 그래프 작성을 위한 일반적인 Apache Spark 패턴이 포함된 Jupyter 노트북이 있습니다.
10. 삭제
이 빠른 시작을 완료한 후 GCP 계정에 불필요한 비용이 청구되지 않도록 하려면 다음 안내를 따르세요.
- 내가 만든 환경의 Cloud Storage 버킷 삭제
- Dataproc 환경을 삭제합니다.
이 Codelab만을 위해 프로젝트를 만든 경우 선택적으로 프로젝트를 삭제할 수도 있습니다.
- GCP Console에서 프로젝트 페이지로 이동합니다.
- 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
- 상자에 프로젝트 ID를 입력한 후 종료를 클릭하여 프로젝트를 삭제합니다.
라이선스
이 작업물은 Creative Commons Attribution 3.0 일반 라이선스 및 Apache 2.0 라이선스에 따라 사용이 허가되었습니다.