Apache Spark and Jupyter Notebooks on Cloud Dataproc

1. Overview

This lab will cover how to set-up and use Apache Spark and Jupyter notebooks on Cloud Dataproc.

Jupyter notebooks are widely used for exploratory data analysis and building machine learning models as they allow you to interactively run your code and immediately see your results.

However setting up and using Apache Spark and Jupyter Notebooks can be complicated.

b9ed855863c57d6.png

Cloud Dataproc makes this fast and easy by allowing you to create a Dataproc Cluster with Apache Spark, Jupyter component and Component Gateway in around 90 seconds.

What you'll learn

In this codelab, you'll learn how to:

  • Create a Google Cloud Storage bucket for your cluster
  • Create a Dataproc Cluster with Jupyter and Component Gateway,
  • Access the JupyterLab web UI on Dataproc
  • Create a Notebook making use of the Spark BigQuery Storage connector
  • Running a Spark job and plotting the results.

The total cost to run this lab on Google Cloud is about $1. Full details on Cloud Dataproc pricing can be found here.

2. Creating a project

Sign-in to Google Cloud Platform console at console.cloud.google.com and create a new project:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Next, you'll need to enable billing in the Cloud Console in order to use Google Cloud resources.

Running through this codelab shouldn't cost you more than a few dollars, but it could be more if you decide to use more resources or if you leave them running. The last section of this codelab will walk you through cleaning up your project.

New users of Google Cloud Platform are eligible for a $300 free trial.

3. Setting up your environment

First, open up Cloud Shell by clicking the button in the top right-hand corner of the cloud console:

a10c47ee6ca41c54.png

After the Cloud Shell loads, run the following command to set the project ID from the previous step**:**

gcloud config set project <project_id>

The project ID can also be found by clicking on your project in the top left of the cloud console:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

Next, enable the Dataproc, Compute Engine and BigQuery Storage APIs.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

Alternatively this can be done in the Cloud Console. Click on the menu icon in the top left of the screen.

2bfc27ef9ba2ec7d.png

Select API Manager from the drop down.

408af5f32c4b7c25.png

Click on Enable APIs and Services.

a9c0e84296a7ba5b.png

Search for and enable the following APIs:

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. Create a GCS bucket

Create a Google Cloud Storage bucket in the region closest to your data and give it a unique name.

This will be used for the Dataproc cluster.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

You should see the following output

Creating gs://<your-bucket-name>/...

5. Create your Dataproc Cluster with Jupyter & Component Gateway

Creating your cluster

Set the env variables for your cluster

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

Then run this gcloud command to create your cluster with all the necessary components to work with Jupyter on your cluster.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

You should see the following output while your cluster is being created

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

It should take about 90 seconds to create your cluster and once it is ready you will be able to access your cluster from the Dataproc Cloud console UI.

While you are waiting you can carry on reading below to learn more about the flags used in gcloud command.

You should the following output once the cluster is created:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

Flags used in gcloud dataproc create command

Here is a breakdown of the flags used in the gcloud dataproc create command

--region=${REGION}

Specifies the region and zone of where the cluster will be created. You can see the list of available regions here.

--image-version=1.4

The image version to use in your cluster. You can see the list of available versions here.

--bucket=${BUCKET_NAME}

Specify the Google Cloud Storage bucket you created earlier to use for the cluster. If you do not supply a GCS bucket it will be created for you.

This is also where your notebooks will be saved even if you delete your cluster as the GCS bucket is not deleted.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

The machine types to use for your Dataproc cluster. You can see a list of available machine types here.

By default, 1 master node and 2 worker nodes are created if you do not set the flag –num-workers

--optional-components=ANACONDA,JUPYTER

Setting these values for optional components will install all the necessary libraries for Jupyter and Anaconda (which is required for Jupyter notebooks) on your cluster.

--enable-component-gateway

Enabling Component Gateway creates an App Engine link using Apache Knox and Inverting Proxy which gives easy, secure and authenticated access to the Jupyter and JupyterLab web interfaces meaning you no longer need to create SSH tunnels.

It will also create links for other tools on the cluster including the Yarn Resource manager and Spark History Server which are useful for seeing the performance of your jobs and cluster usage patterns.

6. Create an Apache Spark notebook

Accessing the JupyterLab web interface

Once the cluster is ready you can find the Component Gateway link to the JupyterLab web interface by going to Dataproc Clusters - Cloud console, clicking on the cluster you created and going to the Web Interfaces tab.

afc40202d555de47.png

You will notice that you have access to Jupyter which is the classic notebook interface or JupyterLab which is described as the next-generation UI for Project Jupyter.

There are a lot of great new UI features in JupyterLab and so if you are new to using notebooks or looking for the latest improvements it is recommended to go with using JupyterLab as it will eventually replace the classic Jupyter interface according to the official docs.

Create a notebook with a Python 3 kernel

a463623f2ebf0518.png

From the launcher tab click on the Python 3 notebook icon to create a notebook with a Python 3 kernel (not the PySpark kernel) which allows you to configure the SparkSession in the notebook and include the spark-bigquery-connector required to use the BigQuery Storage API.

Rename the notebook

196a3276ed07e1f3.png

Right click on the notebook name in the sidebar on the left or the top navigation and rename the notebook to "BigQuery Storage & Spark DataFrames.ipynb"

Run your Spark code in the notebook

fbac38062e5bb9cf.png

In this notebook, you will use the spark-bigquery-connector which is a tool for reading and writing data between BigQuery and Spark making use of the BigQuery Storage API.

The BigQuery Storage API brings significant improvements to accessing data in BigQuery by using a RPC-based protocol. It supports data reads and writes in parallel as well as different serialization formats such as Apache Avro and Apache Arrow. At a high-level, this translates to significantly improved performance, especially on larger data sets.

In the first cell check the Scala version of your cluster so you can include the correct version of the spark-bigquery-connector jar.

Input [1]:

!scala -version

Output [1]:f580e442576b8b1f.png Create a Spark session and include the spark-bigquery-connector package.

If your Scala version is 2.11 use the following package.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

If your Scala version is 2.12 use the following package.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

Input [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

Enable repl.eagerEval

This will output the results of DataFrames in each step without the new need to show df.show() and also improves the formatting of the output.

Input [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

Read BigQuery table into Spark DataFrame

Create a Spark DataFrame by reading in data from a public BigQuery dataset. This makes use of the spark-bigquery-connector and BigQuery Storage API to load the data into the Spark cluster.

Create a Spark DataFrame and load data from the BigQuery public dataset for Wikipedia pageviews. You will notice that you are not running a query on the data as you are using the spark-bigquery-connector to load the data into Spark where the processing of the data will occur. When this code is run it will not actually load the table as it is a lazy evaluation in Spark and the execution will occur in the next step.

Input [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

Output [4]:

c107a33f6fc30ca.png

Select the required columns and apply a filter using where() which is an alias for filter().

When this code is run it triggers a Spark action and the data is read from BigQuery Storage at this point.

Input [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

Output [5]:

ad363cbe510d625a.png

Group by title and order by page views to see the top pages

Input [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

Output [6]:f718abd05afc0f4.png

7. Use Python plotting libraries in notebook

You can make use of the various plotting libraries that are available in Python to plot the output of your Spark jobs.

Convert Spark DataFrame to Pandas DataFrame

Convert the Spark DataFrame to Pandas DataFrame and set the datehour as the index. This is useful if you want to work with the data directly in Python and plot the data using the many available Python plotting libraries.

Input [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

Output [7]:

3df2aaa2351f028d.png

Plotting Pandas Dataframe

Import the matplotlib library which is required to display the plots in the notebook

Input [8]:

import matplotlib.pyplot as plt

Use the Pandas plot function to create a line chart from the Pandas DataFrame.

Input [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

Output [9]:bade7042c3033594.png

Check the notebook was saved in GCS

You should now have your first Jupyter notebook up and running on your Dataproc cluster. Give your notebook a name and it will be auto-saved to the GCS bucket used when creating the cluster.

You can check this using this gsutil command in the cloud shell

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

You should see the following output

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. Optimization tip - Cache data in memory

There might be scenarios where you want the data in memory instead of reading from BigQuery Storage every time.

This job will read the data from BigQuery and push the filter to BigQuery. The aggregation will then be computed in Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

You can modify the job above to include a cache of the table and now the filter on the wiki column will be applied in memory by Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

You can then filter for another wiki language using the cached data instead of reading data from BigQuery storage again and therefore will run much faster.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

You can remove the cache by running

df_wiki_all.unpersist()

9. Example notebooks for more use cases

The Cloud Dataproc GitHub repo features Jupyter notebooks with common Apache Spark patterns for loading data, saving data, and plotting your data with various Google Cloud Platform products and open-source tools:

10. Clean up

To avoid incurring unnecessary charges to your GCP account after completion of this quickstart:

  1. Delete the Cloud Storage bucket for the environment and that you created
  2. Delete the Dataproc environment.

If you created a project just for this codelab, you can also optionally delete the project:

  1. In the GCP Console, go to the Projects page.
  2. In the project list, select the project you want to delete and click Delete.
  3. In the box, type the project ID, and then click Shut down to delete the project.

License

This work is licensed under a Creative Commons Attribution 3.0 Generic License, and Apache 2.0 license.