1. Overview
This lab will cover how to set-up and use Apache Spark and Jupyter notebooks on Cloud Dataproc.
Jupyter notebooks are widely used for exploratory data analysis and building machine learning models as they allow you to interactively run your code and immediately see your results.
However setting up and using Apache Spark and Jupyter Notebooks can be complicated.
Cloud Dataproc makes this fast and easy by allowing you to create a Dataproc Cluster with Apache Spark, Jupyter component and Component Gateway in around 90 seconds.
What you'll learn
In this codelab, you'll learn how to:
- Create a Google Cloud Storage bucket for your cluster
- Create a Dataproc Cluster with Jupyter and Component Gateway,
- Access the JupyterLab web UI on Dataproc
- Create a Notebook making use of the Spark BigQuery Storage connector
- Running a Spark job and plotting the results.
The total cost to run this lab on Google Cloud is about $1. Full details on Cloud Dataproc pricing can be found here.
2. Creating a project
Sign-in to Google Cloud Platform console at console.cloud.google.com and create a new project:
Next, you'll need to enable billing in the Cloud Console in order to use Google Cloud resources.
Running through this codelab shouldn't cost you more than a few dollars, but it could be more if you decide to use more resources or if you leave them running. The last section of this codelab will walk you through cleaning up your project.
New users of Google Cloud Platform are eligible for a $300 free trial.
3. Setting up your environment
First, open up Cloud Shell by clicking the button in the top right-hand corner of the cloud console:
After the Cloud Shell loads, run the following command to set the project ID from the previous step**:**
gcloud config set project <project_id>
The project ID can also be found by clicking on your project in the top left of the cloud console:
Next, enable the Dataproc, Compute Engine and BigQuery Storage APIs.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
Alternatively this can be done in the Cloud Console. Click on the menu icon in the top left of the screen.
Select API Manager from the drop down.
Click on Enable APIs and Services.
Search for and enable the following APIs:
- Compute Engine API
- Dataproc API
- BigQuery API
- BigQuery Storage API
4. Create a GCS bucket
Create a Google Cloud Storage bucket in the region closest to your data and give it a unique name.
This will be used for the Dataproc cluster.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
You should see the following output
Creating gs://<your-bucket-name>/...
5. Create your Dataproc Cluster with Jupyter & Component Gateway
Creating your cluster
Set the env variables for your cluster
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
Then run this gcloud command to create your cluster with all the necessary components to work with Jupyter on your cluster.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
You should see the following output while your cluster is being created
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
It should take about 90 seconds to create your cluster and once it is ready you will be able to access your cluster from the Dataproc Cloud console UI.
While you are waiting you can carry on reading below to learn more about the flags used in gcloud command.
You should the following output once the cluster is created:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
Flags used in gcloud dataproc create command
Here is a breakdown of the flags used in the gcloud dataproc create command
--region=${REGION}
Specifies the region and zone of where the cluster will be created. You can see the list of available regions here.
--image-version=1.4
The image version to use in your cluster. You can see the list of available versions here.
--bucket=${BUCKET_NAME}
Specify the Google Cloud Storage bucket you created earlier to use for the cluster. If you do not supply a GCS bucket it will be created for you.
This is also where your notebooks will be saved even if you delete your cluster as the GCS bucket is not deleted.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
The machine types to use for your Dataproc cluster. You can see a list of available machine types here.
By default, 1 master node and 2 worker nodes are created if you do not set the flag –num-workers
--optional-components=ANACONDA,JUPYTER
Setting these values for optional components will install all the necessary libraries for Jupyter and Anaconda (which is required for Jupyter notebooks) on your cluster.
--enable-component-gateway
Enabling Component Gateway creates an App Engine link using Apache Knox and Inverting Proxy which gives easy, secure and authenticated access to the Jupyter and JupyterLab web interfaces meaning you no longer need to create SSH tunnels.
It will also create links for other tools on the cluster including the Yarn Resource manager and Spark History Server which are useful for seeing the performance of your jobs and cluster usage patterns.
6. Create an Apache Spark notebook
Accessing the JupyterLab web interface
Once the cluster is ready you can find the Component Gateway link to the JupyterLab web interface by going to Dataproc Clusters - Cloud console, clicking on the cluster you created and going to the Web Interfaces tab.
You will notice that you have access to Jupyter which is the classic notebook interface or JupyterLab which is described as the next-generation UI for Project Jupyter.
There are a lot of great new UI features in JupyterLab and so if you are new to using notebooks or looking for the latest improvements it is recommended to go with using JupyterLab as it will eventually replace the classic Jupyter interface according to the official docs.
Create a notebook with a Python 3 kernel
From the launcher tab click on the Python 3 notebook icon to create a notebook with a Python 3 kernel (not the PySpark kernel) which allows you to configure the SparkSession in the notebook and include the spark-bigquery-connector required to use the BigQuery Storage API.
Rename the notebook
Right click on the notebook name in the sidebar on the left or the top navigation and rename the notebook to "BigQuery Storage & Spark DataFrames.ipynb"
Run your Spark code in the notebook
In this notebook, you will use the spark-bigquery-connector which is a tool for reading and writing data between BigQuery and Spark making use of the BigQuery Storage API.
The BigQuery Storage API brings significant improvements to accessing data in BigQuery by using a RPC-based protocol. It supports data reads and writes in parallel as well as different serialization formats such as Apache Avro and Apache Arrow. At a high-level, this translates to significantly improved performance, especially on larger data sets.
In the first cell check the Scala version of your cluster so you can include the correct version of the spark-bigquery-connector jar.
Input [1]:
!scala -version
Output [1]: Create a Spark session and include the spark-bigquery-connector package.
If your Scala version is 2.11 use the following package.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
If your Scala version is 2.12 use the following package.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
Input [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
Enable repl.eagerEval
This will output the results of DataFrames in each step without the new need to show df.show() and also improves the formatting of the output.
Input [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
Read BigQuery table into Spark DataFrame
Create a Spark DataFrame by reading in data from a public BigQuery dataset. This makes use of the spark-bigquery-connector and BigQuery Storage API to load the data into the Spark cluster.
Create a Spark DataFrame and load data from the BigQuery public dataset for Wikipedia pageviews. You will notice that you are not running a query on the data as you are using the spark-bigquery-connector to load the data into Spark where the processing of the data will occur. When this code is run it will not actually load the table as it is a lazy evaluation in Spark and the execution will occur in the next step.
Input [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
Output [4]:
Select the required columns and apply a filter using where()
which is an alias for filter()
.
When this code is run it triggers a Spark action and the data is read from BigQuery Storage at this point.
Input [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
Output [5]:
Group by title and order by page views to see the top pages
Input [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
Output [6]:
7. Use Python plotting libraries in notebook
You can make use of the various plotting libraries that are available in Python to plot the output of your Spark jobs.
Convert Spark DataFrame to Pandas DataFrame
Convert the Spark DataFrame to Pandas DataFrame and set the datehour as the index. This is useful if you want to work with the data directly in Python and plot the data using the many available Python plotting libraries.
Input [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
Output [7]:
Plotting Pandas Dataframe
Import the matplotlib library which is required to display the plots in the notebook
Input [8]:
import matplotlib.pyplot as plt
Use the Pandas plot function to create a line chart from the Pandas DataFrame.
Input [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
Output [9]:
Check the notebook was saved in GCS
You should now have your first Jupyter notebook up and running on your Dataproc cluster. Give your notebook a name and it will be auto-saved to the GCS bucket used when creating the cluster.
You can check this using this gsutil command in the cloud shell
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
You should see the following output
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. Optimization tip - Cache data in memory
There might be scenarios where you want the data in memory instead of reading from BigQuery Storage every time.
This job will read the data from BigQuery and push the filter to BigQuery. The aggregation will then be computed in Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
You can modify the job above to include a cache of the table and now the filter on the wiki column will be applied in memory by Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
You can then filter for another wiki language using the cached data instead of reading data from BigQuery storage again and therefore will run much faster.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
You can remove the cache by running
df_wiki_all.unpersist()
9. Example notebooks for more use cases
The Cloud Dataproc GitHub repo features Jupyter notebooks with common Apache Spark patterns for loading data, saving data, and plotting your data with various Google Cloud Platform products and open-source tools:
10. Clean up
To avoid incurring unnecessary charges to your GCP account after completion of this quickstart:
- Delete the Cloud Storage bucket for the environment and that you created
- Delete the Dataproc environment.
If you created a project just for this codelab, you can also optionally delete the project:
- In the GCP Console, go to the Projects page.
- In the project list, select the project you want to delete and click Delete.
- In the box, type the project ID, and then click Shut down to delete the project.
License
This work is licensed under a Creative Commons Attribution 3.0 Generic License, and Apache 2.0 license.