Preprocessing BigQuery Data with PySpark on Dataproc

1. Overview

This codelab will go over how to create a data processing pipeline using Apache Spark with Dataproc on Google Cloud Platform. It is a common use case in data science and data engineering to read data from one storage location, perform transformations on it and write it into another storage location. Common transformations include changing the content of the data, stripping out unnecessary information, and changing file types.

In this codelab, you'll learn about Apache Spark, run a sample pipeline using Dataproc with PySpark (Apache Spark's Python API), BigQuery, Google Cloud Storage and data from Reddit.

2. Intro to Apache Spark (Optional)

According to the website, " Apache Spark is a unified analytics engine for large-scale data processing." It lets you analyze and process data in parallel and in-memory, which allows for massive parallel computation across multiple different machines and nodes. It was originally released in 2014 as an upgrade to traditional MapReduce and is still one of the most popular frameworks for performing large-scale computations. Apache Spark is written in Scala and subsequently has APIs in Scala, Java, Python and R. It contains a plethora of libraries such as Spark SQL for performing SQL queries on the data, Spark Streaming for streaming data, MLlib for machine learning and GraphX for graph processing, all of which run on the Apache Spark engine.

32add0b6a47bafbc.png

Spark can run by itself or it can leverage a resource management service such as Yarn, Mesos or Kubernetes for scaling. You'll be using Dataproc for this codelab, which utilizes Yarn.

Data in Spark was originally loaded into memory into what is called an RDD, or resilient distributed dataset. Development on Spark has since included the addition of two new, columnar-style data types: the Dataset, which is typed, and the Dataframe, which is untyped. Loosely speaking, RDDs are great for any type of data, whereas Datasets and Dataframes are optimized for tabular data. As Datasets are only available with the Java and Scala APIs, we'll proceed with using the PySpark Dataframe API for this codelab. For more information, please refer to the Apache Spark documentation.

3. Use Case

Data engineers often need data to be easily accessible to data scientists. However, data is often initially dirty (difficult to use for analytics in its current state) and needs to be cleaned before it can be of much use. An example of this is data that has been scraped from the web which may contain weird encodings or extraneous HTML tags.

In this lab, you will load a set of data from BigQuery in the form of Reddit posts into a Spark cluster hosted on Dataproc, extract useful information and store the processed data as zipped CSV files in Google Cloud Storage.

be2a4551ece63bfc.png

The chief data scientist at your company is interested in having their teams work on different natural language processing problems. Specifically, they are interested in analyzing the data in the subreddit "r/food". You'll create a pipeline for a data dump starting with a backfill from January 2017 to August 2019.

4. Accessing BigQuery through the BigQuery Storage API

Pulling data from BigQuery using the tabledata.list API method can prove to be time-consuming and not efficient as the amount of data scales. This method returns a list of JSON objects and requires sequentially reading one page at a time to read an entire dataset.

The BigQuery Storage API brings significant improvements to accessing data in BigQuery by using a RPC-based protocol. It supports data reads and writes in parallel as well as different serialization formats such as Apache Avro and Apache Arrow. At a high-level, this translates to significantly improved performance, especially on larger data sets.

In this codelab you will use the spark-bigquery-connector for reading and writing data between BigQuery and Spark.

5. Creating a Project

Sign in to Google Cloud Platform console at console.cloud.google.com and create a new project:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Next, you'll need to enable billing in the Cloud Console in order to use Google Cloud resources.

Running through this codelab shouldn't cost you more than a few dollars, but it could be more if you decide to use more resources or if you leave them running. The last section of this codelab will walk you through cleaning up your project.

New users of Google Cloud Platform are eligible for a $300 free trial.

6. Setting Up Your Environment

You'll now go through setting up your environment by:

  • Enabling the Compute Engine, Dataproc and BigQuery Storage APIs
  • Configuring project settings
  • Creating a Dataproc cluster
  • Creating a Google Cloud Storage bucket

Enabling APIs and Configuring Your Environment

Open the Cloud Shell by pressing the button in the top right corner of your Cloud Console.

a10c47ee6ca41c54.png

After the Cloud Shell loads, run the following commands to enable the Compute Engine, Dataproc and BigQuery Storage APIs:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Set the project id of your project. You can find it by going to the project selection page and searching for your project. This might not be the same as your project name.

e682e8227aa3c781.png

76d45fb295728542.png

Run the following command to set your project id:

gcloud config set project <project_id>

Set the region of your project by choosing one from the list here. An example might be us-central1.

gcloud config set dataproc/region <region>

Pick a name for your Dataproc cluster and create an environment variable for it.

CLUSTER_NAME=<cluster_name>

Creating a Dataproc Cluster

Create a Dataproc cluster by executing the following command:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

This command will take a couple of minutes to finish. To break down the command:

This will initiate the creation of a Dataproc cluster with the name you provided earlier. Using the beta API will enable beta features of Dataproc such as Component Gateway.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

This will set the type of machine to use for your workers.

--worker-machine-type n1-standard-8

This will set the number of workers your cluster will have.

--num-workers 8

This will set the image version of Dataproc.

--image-version 1.5-debian

This will configure the initialization actions to be used on the cluster. Here, you are including the pip initialization action.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

This is the metadata to include on the cluster. Here, you are providing metadata for the pip initialization action.

--metadata 'PIP_PACKAGES=google-cloud-storage'

This will set the Optional Components to be installed on the cluster.

--optional-components=ANACONDA

This will enable component gateway which allows you to use Dataproc's Component Gateway for viewing common UIs such as Zeppelin, Jupyter or the Spark History

--enable-component-gateway

For a more in-depth introduction to Dataproc, please check out this codelab.

Creating a Google Cloud Storage Bucket

You'll need a Google Cloud Storage bucket for your job output. Determine a unique name for your bucket and run the following command to create a new bucket. Bucket names are unique across all Google Cloud projects for all users, so you may need to attempt this a few times with different names. A bucket is successfully created if you do not receive a ServiceException.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Exploratory Data Analysis

Before performing your preprocessing, you should learn more about the nature of the data you're dealing with. To do this, you'll explore two methods of data exploration. First, you'll view some raw data using the BigQuery Web UI, and then you'll calculate the number of posts per subreddit using PySpark and Dataproc.

Using the BigQuery Web UI

Start by using the BigQuery Web UI to view your data. From the menu icon in the Cloud Console, scroll down and press "BigQuery" to open the BigQuery Web UI.

242a597d7045b4da.png

Next, run the following command in the BigQuery Web UI Query Editor. This will return 10 full rows of the data from January of 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

You can scroll across the page to see all of the columns available as well as some examples. In particular, you'll see two columns that represent the textual content of each post: "title" and "selftext", the latter being the body of the post. Also notice other columns such as "created_utc" which is the utc time that a post was made and "subreddit" which is the subreddit the post exists in.

Executing a PySpark Job

Run the following commands in your Cloud Shell to clone the repo with the sample code and cd into the correct directory:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

You can use PySpark to determine a count of how many posts exist for each subreddit. You can open Cloud Editor and read the script cloud-dataproc/codelabs/spark-bigquery before executing it in the next step:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Click on the "Open Terminal" button in Cloud Editor to switch back to your Cloud Shell and run the following command to execute your first PySpark job:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

This command allows you to submit jobs to Dataproc via the Jobs API. Here you are indicating the job type as pyspark. You can supply the cluster name, optional parameters and the name of the file containing the job. Here, you are providing the parameter --jars which allows you to include the spark-bigquery-connector with your job. You can also set the log output levels using --driver-log-levels root=FATAL which will suppress all log output except for errors. Spark logs tend to be rather noisy.

This should take a few minutes to run and your final output should look something like this:

6c185228db47bb18.png

8. Exploring the Dataproc and Spark UIs

When running Spark jobs on Dataproc, you have access to two UIs for checking the status of your jobs / clusters. The first one is the Dataproc UI, which you can find by clicking on the menu icon and scrolling down to Dataproc. Here, you can see the current memory available as well as pending memory and the number of workers.

6f2987346d15c8e2.png

You can also click on the jobs tab to see completed jobs. You can see job details such as the logs and output of those jobs by clicking on the Job ID for a particular job. 114d90129b0e4c88.png

1b2160f0f484594a.png

You can also view the Spark UI. From the job page, click the back arrow and then click on Web Interfaces. You should see several options under component gateway. Many of these can be enabled via Optional Components when setting up your cluster. For this lab, click on the "Spark History Server.

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

This should open the following window:

8f6786760f994fe8.png

All completed jobs will show up here, and you can click on any application_id to learn more information about the job. Similarly, you can click on "Show Incomplete Applications" at the very bottom of the landing page to view all jobs currently running.

9. Running your Backfill Job

You'll now run a job that loads data into memory, extracts the necessary information and dumps the output into a Google Cloud Storage bucket. You'll extract the "title", "body" (raw text) and "timestamp created" for each reddit comment. You'll then take this data, convert it into a csv, zip it and load it into a bucket with a URI of gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

You can refer to the Cloud Editor again to read through the code for cloud-dataproc/codelabs/spark-bigquery/backfill.sh which is a wrapper script to execute the code in cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

You should shortly see a bunch of job completion messages. The job may take up to 15 minutes to complete. You can also double check your storage bucket to verify successful data output by using gsutil. Once all of the jobs are done, run the following command:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

You should see the following output:

a7c3c7b2e82f9fca.png

Congratulations, you have successfully completed a backfill for your reddit comments data! If you're interested in how you can build models on top of this data, please continue on to the Spark-NLP codelab.

10. Cleanup

To avoid incurring unnecessary charges to your GCP account after completion of this quickstart:

  1. Delete the Cloud Storage bucket for the environment and that you created
  2. Delete the Dataproc environment.

If you created a project just for this codelab, you can also optionally delete the project:

  1. In the GCP Console, go to the Projects page.
  2. In the project list, select the project you want to delete and click Delete.
  3. In the box, type the project ID, and then click Shut down to delete the project.

License

This work is licensed under a Creative Commons Attribution 3.0 Generic License, and Apache 2.0 license.