This codelab will go over how to create a data preprocessing pipeline using Apache Spark with Cloud Dataproc on Google Cloud Platform. It is a common use case in Data Science and Data Engineer to grab data from one storage location, perform transformations on it and load it into another storage location. Common transformations include changing the content of the data, stripping out unnecessary information, and changing file types.

In this codelab, we'll introduce Apache Spark, go over a sample pipeline using Cloud Dataproc, BigQuery, Google Cloud Storage and Reddit Posts data. We will specifically be using PySpark, which is the Python API for Apache Spark.

According to the website, "Apache Spark is a unified analytics engine for large-scale data processing." It lets you analyze and process data in parallel and in-memory, which allows for massive parallel computation across multiple different machines and nodes. It was originally released in 2014 as an upgrade to traditional MapReduce and is still one of the most popular frameworks for performing large-scale computations. Apache Spark is written in Scala and subsequently has APIs in Scala as well as Java, Python and R. It contains a plethora of libraries such as Spark SQL for performing SQL queries on the data, Spark Streaming for streaming data, MLlib for machine learning and GraphX for graph processing, all of which run on the Apache Spark engine.

Spark can run by itself or it can leverage the resource management services of engines such as Yarn, Mesos or most newly Kubernetes. We'll be using Cloud Dataproc for this codelab, which specifically utilizes Yarn.

Data in Spark was originally loaded into memory into what is called an RDD, or resilient distributed dataset. Development on Spark has since included the addition of two new, columnar-style data types: the Dataset and the Dataframe, which are typed and untyped respectively. Loosely speaking, RDDs are great for any type of data, whereas Datasets and Dataframes are optimized for tabular data. As Datasets are only available with the Java and Scala APIs, we'll proceed with using the PySpark Dataframe API for this codelab.

Data Engineers often need data to be easily accessible to data scientists. However, data is often dirty (difficult to use for analytics in its current state) initially and needs to be further processed before it can be of much use. An example of this may be data that has been scraped from the web.

In this lab, we will load a set of data from BigQuery in the form of Reddit posts into a Spark cluster hosted on Cloud Dataproc, extract the useful information we want and store the processed data as zipped CSV files in Google Cloud Storage.

The chief data scientist for our org is interested in having their teams work on different NLP problems within the org. Specifically, they are interested in analyzing the data in the subreddit "r/food". We're going to create a pipeline for a data dump starting with a backfill from January 2016 until now. Reddit data is available on approximately a three month lag, so we will continue to search for data until a table is no longer available.

Note: The astute participant of this codelab may notice that our transformations are perfectly doable using regular SQL against the data itself in BigQuery. You are quite correct and we are glad you noticed this! However, the point of this codelab is to introduce the basic concepts of using Spark and introducing the APIs associated with it.

Pulling data from BigQuery using the tabledata.list API method can prove to be time-consuming and not incredibly efficient as the amount of data scales. This method returns as list of JSON objects and requires sequentially reading one page at a time to read an entire dataset.

The BigQuery Storage API brings significant improvements to accessing data in BigQuery by using a RPC-based protocol. It supports data reads in parallel as well as different serialization formats such as Apache Avro and Apache Arrow. At a high-level, this translates to significantly improved performance, especially on larger data sets.

Connectors to the BigQuery Storage API are currently being worked on for compatibility with other projects in the Data Science landscape. In particular, the spark-bigquery-connector is an excellent tool to use for grabbing data from BigQuery for Spark jobs. We'll be utilizing this connector in this codelab for loading our BigQuery data into Cloud Dataproc. In order to do so, we'll have to enable the BigQuery Storage API, which we'll explain how to do in the next step.

If you don't already have a Google Account (Gmail or Google Apps), you must create one. Sign-in to Google Cloud Platform console (console.cloud.google.com) and create a new project:

Screenshot from 2016-02-10 12:45:26.png

Next, you'll need to enable billing in the Cloud Console in order to use Google Cloud resources.

Running through this codelab shouldn't cost you more than a few dollars, but it could be more if you decide to use more resources or if you leave them running. The PySpark-BigQuery and Spark-NLP codelabs each explain "Clean Up" at the end.

New users of Google Cloud Platform are eligible for a $300 free trial.

First, we need to enable Cloud Dataproc and the Compute Engine APIs.

Click on the menu icon in the top left of the screen.

Select API Manager from the drop down.

Click on Enable APIs and Services.

Search for "Compute Engine" in the search box. Click on "Google Compute Engine API" in the results list that appears.

On the Google Compute Engine page click Enable

Once it has enabled click the arrow pointing left to go back.

Now search for "Google Cloud Dataproc API" and enable it as well.

Next, we'll need to enable the BigQuery Storage API, which we just discussed in the previous section. Similarly to before, enter "BigQuery Storage API" into the search box and enable it.

Next, open up Cloud Shell by clicking the button in the top right-hand corner of the cloud console:

We're going to set some environment variables that we can reference as we proceed with the codelab. First, pick a name for a Cloud Dataproc cluster that we're going to create, such as "my-cluster", and set it in your environment. Feel free to use whatever name you like.

CLUSTER_NAME=<cluster_name>

Next, choose a zone from one of the ones available here. An example might be us-east1-b.

ZONE=<zone>

With our environment variables configured, let's run the following command to create our Cloud Dataproc cluster:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --zone=${ZONE} \
     --worker-machine-type n1-standard-8 \
     --num-workers 4 \
     --image-version 1.4-debian9 \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

Let's step through each of these commands:

gcloud beta dataproc clusters create ${CLUSTER_NAME}: will initiate the creation of a Cloud Dataproc cluster with the name you provided earlier. We include beta here to enable beta features of Cloud Dataproc such as Component Gateway, which we discuss below.

--zone=${ZONE}: This sets the location of the cluster.

--worker-machine-type n1-standard-8: This is the type of machine to use for our workers

--num-workers 4: we will have four workers on our cluster.

--image-version 1.4-debian9: This denotes the image-version of Cloud Dataproc we'll use.

--initialization-actions ...: Initialization Actions are custom scripts that are executed when creating clusters and workers. They can either be user-created and stored in a GCS bucket or referenced from the public bucket dataproc-initialization-actions. Here, we are providing two: one that will install spark-nlp, and one that allows us to pip install any other package that we want, which we provide next.

--metadata 'PIP_PACKAGES=google-cloud-storage': This is a space-separated list of packages to install into Cloud Dataproc. In this case, we will install the google-cloud-storage Python client library.

--optional-components=ANACONDA: Optional Componentsare common packages used with Cloud Dataproc that are automatically installed on Cloud Dataproc clusters during creation. Advantages of using Optional Components over Initialization Actions include faster startup times and being tested for specific Cloud Dataproc versions. Overall, they are more reliable.

--enable-component-gateway: This flag allows us to take advantage of Cloud Dataproc's Component Gateway for viewing common UIs such as Zeppelin, Jupyter or the Spark History. Note: some of these require the associated Optional Component.

For a more in-depth introduction to Cloud Dataproc, please check out this codelab.

Next, run the following commands in your Cloud Shell to clone the repo with the sample code and cd into the correct directory:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
cd cloud-dataproc/codelabs/spark-bigquery

Next, we'll create a Google Cloud Storage bucket for our output files. Go to the Google Cloud Storage page from the side page, press "Create Bucket". From here, add the name as your project-id. If this name isn't available, feel free to use anything for the name. Make sure you remember the name that you used. Press "Create" at the bottom and this should create your bucket.

After making a bucket, go back to your cloud shell and set an environment variable with the name of your bucket:

BUCKET_NAME=<bucket_name>

Awesome! Let's now dive into learning more about the data and code we have at our disposal.

Before we perform our preprocessing, let's learn a bit more about the data we're working with. To do this, we'll show two methods of data exploration. We'll first view some of the data as is using the BigQuery Web UI, and next we'll calculate the number of posts per subreddit using PySpark and Cloud Dataproc.

Let's start with using the BigQuery Web UI to view our data. From the menu icon, scroll down and press "BigQuery" to open the BigQuery Web UI.

Next, run the following command in the BigQuery Web UI Query Editor. This will return 10 full rows of the data from January of 2016:

select * from `fh-bigquery.reddit_posts.2016_01` limit 10;

We can scroll across the page to see all of the columns available to us as well as some examples. In particular, we see two columns that represent the textual content of each post: "title" and "selftext", the latter being the body of the post. We also notice other columns such as "created_utc" which is the utc time that a post was made and "subreddit" which is the subreddit the post exists in.

Next, let's use PySpark to determine a count of how many posts exist for each subreddit. We will grab all of the reddit data for a set of months and dates we've included into a script. You can find the code in counts_by_subreddit.py. Spend a couple of minutes reading through the commented code to make sure you understand what's going on. Your repo will have the most up-to-date version of the code, but an earlier copy can also be seen here:

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# This will help catch some PySpark errors
from py4j.protocol import Py4JJavaError

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit").getOrCreate()

# Create a two column schema consisting of a string and a long integer
fields = [StructField("subreddit", StringType(), True),
          StructField("count", LongType(), True)]
schema = StructType(fields)

# Create an empty DataFrame. We will continuously union our output with this
subreddit_counts = spark.createDataFrame([], schema)

# Establish a set of years and months to iterate over
years = ['2016', '2017', '2018', '2019']
months = ['01', '02', '03', '04', '05', '06',
          '07', '08', '09', '10', '11', '12']

# Keep track of all tables accessed via the job
tables_read = []
for year in years:
    for month in months:
        
        # In the form of <project-id>.<dataset>.<table>
        table = f"fh-bigquery.reddit_posts.{year}_{month}"
        
        # If the table doesn't exist we will simply continue and not
        # log it into our "tables_read" list
        try:
            table_df = spark.read.format('bigquery').option('table', table).load()
            tables_read.append(table)
        except Py4JJavaError:
            continue
        
        # We perform a group-by on subreddit, aggregating by the count and then
        # unioning the output to our base dataframe
        subreddit_counts = (
            table_df
            .groupBy("subreddit")
            .count()
            .union(subreddit_counts)
        )
        
print("The following list of tables will be accounted for in our analysis:")
for table in tables_read:
    print(table)

# From our base table, we perform a group-by, summing over the counts.
# We then rename the column and sort in descending order both for readability.
# show() will collect the table into memory output the table to std out.
(
    subreddit_counts
    .groupBy("subreddit")
    .sum("count")
    .withColumnRenamed("sum(count)", "count")
    .sort("count", ascending=False)
    .show()
)

Next, execute the following command from your cloud shell:

gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

This command allows us to leverage the Cloud Dataproc Jobs API. By including the command pyspark we are indicating to the cluster that this is a PySpark job. We supply the cluster name, optional parameters from those available here and the name of the file containing the job. In our case, we are providing the parameter --jars which allows us to include the jar for the spark-bigquery-connector. This lets us load data from BigQuery directly into PySpark in parallel. Lastly, we are providing the parameters --driver-log-levels root=FATAL which will suppress most of the log output from PySpark except for Error. In general, Spark logs tend to be fairly noisy.

This should take a few minutes to run, so while you're waiting for this to finish up, let's explore some of the UIs we have available to us.

When running Spark jobs on Cloud Dataproc, we have access to two excellent UIs for checking the status of our jobs / clusters. The first one is the Cloud Dataproc UI, which we can find by clicking on Cloud Dataproc from the content viewer. Here, we can see the current memory available to us, as well as pending memory and the number of workers.

We can also click on the jobs tab to see completed jobs. We can see logs and the output of those jobs by clicking on the Job ID for a particular job.

Great! Next, now let's view the second UI that's available to us, which is the Spark UI! This is not Cloud Dataproc-specific and is available to all Spark clusters regardless of the underlying infrastructure. Let's click on the "web interfaces". You should see several options under component gateway. Many of these are enabled via Optional Components when setting up your cluster.

Let's click on the "SparkHistoryServer". This should open the following window:

All completed jobs will show up here, and you can click on any one of application_ids to learn more information about the job. Similarly, you can click on "Show Incomplete Applications" at the very bottom of the landing page to view all jobs currently running.

Go back and take a look at your cloud console to view the output of our Subreddit Counts script. Please note: your values in the "count" column may be higher than this as more data is added in subsequent months:

Excellent! This should give you an idea of the scale of the data that we're working with. Spark is definitely great for running ad-hoc queries and reports, and we can get an insight into the data we're working with here.

We'll now go over the second part of this codelab, which will be loading our data into memory, extracting the necessary information and dumping it into a Google Cloud Storage bucket. As our Chief Data Scientist asked, we'll extract the title, body (raw text) and timestamp created for each reddit comment. We'll take all of this data, convert it into a csv, zip it and load it into a GCS bucket with a URI of gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

Spend a few minutes reading through the script. This Python script will be executed by a wrapper shell script found in backfill.sh that executes individually for each month / year combination. A copy of both scripts can be found below. First, the Python script:

# Python imports
import re
import time
import sys

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# PySpark function for replacing characters using a regex. We'll use this to remove newline characters.
from pyspark.sql.functions import regexp_replace, col

# Library for interacting with Google Cloud Storage
from google.cloud import storage

# This will help catch some PySpark errors
from py4j.protocol import Py4JJavaError

# Create a SparkSession under the name "reddit". Viewable via the Spark UI
spark = SparkSession.builder.appName("reddit").getOrCreate()

# Establish a set of years and months to iterate over
year = sys.argv[1]
month = sys.argv[2]
bucket_name = sys.argv[3]
          
# Establish a subreddit to process
subreddit = 'food'

# Set Google Cloud Storage temp location          
path = "tmp" + str(time.time())

# Keep track of all tables accessed via the job
tables_read = []
        
# In the form of <project-id>.<dataset>.<table>
table = f"fh-bigquery.reddit_posts.{year}_{month}"
        
# If the table doesn't exist we will simply continue and not
# log it into our "tables_read" list
try:
    df = spark.read.format('bigquery').option('table', table).load()
except Py4JJavaError:
   print(f"{table} does not exist. ") 
   sys.exit(0)        

print(f"Processing {table}.")

# Select the "title", "selftext" and "created_utc" columns of the designated subreddit and
# replace newline characters with a single space
subreddit_timestamps = (
    df
    .select(
        regexp_replace(col("title"), "\n", " "),
        regexp_replace(col("selftext"), "\n", " "),
        "created_utc"
    )
    .where(df.subreddit == subreddit)
)
        
tmp_output_path = "gs://" + bucket_name + "/" + path + "/" + year + "/" + month
# Write output to our temp GCS bucket. Spark jobs can be written out to multiple files 
# and partitions. By using coalesce, we ensure the output is consolidated to a single file.
# We then use .options to tell Spark to write out in a gzip format, and .csv to do the write.
(
    subreddit_timestamps
    # Data can get written out to multiple files / partition. 
    # This ensures it will only write to 1.
    .coalesce(1) 
    .write
    # Gzip the output file
    .options(codec="org.apache.hadoop.io.compress.GzipCodec")
    # Write out to csv
    .csv(tmp_output_path)
)
# Lastly, we'll move the temp file to a new bucket and delete the temp directory.
regex = "part-[0-9a-zA-Z\-]*.csv.gz"
new_path = "/".join(["reddit_posts", year, month, subreddit + ".csv.gz"])
        
# Create the storage client
storage_client = storage.Client()
        
# Create an object representing the original bucket
source_bucket = storage_client.get_bucket(bucket_name)
        
# Grab all files in the source bucket. Typically there is also a _SUCCESS file, inside of the
# directory, so we'll make sure to find our single csv file.
buckets = list(source_bucket.list_blobs(prefix=path))
for bucket in buckets:
    name = bucket.name
            
    # Locate the file that represents our partition. Copy to new location and 
    # delete temp directory.
    if re.search(regex, name):
       blob = source_bucket.blob(name)
       source_bucket.copy_blob(blob, source_bucket, new_path)
       blob.delete()

Next up, our wrapper shell script. Please note the usage of the Cloud Dataproc Jobs API towards the bottom of the script to kick off the PySpark jobs:

# Starting year and all months
base_year=2016
months=(01 02 03 04 05 06 07 08 09 10 11 12)

# Grab list of existing BigQuery tables
tables=$(bq ls fh-bigquery:reddit_posts) 

year=${base_year}
warm_up=true 

# Set the name of the output bucket
BUCKET_NAME=${1}
CLUSTER_NAME=${2}

# Iterate for every year / month pair starting from January 2016 up through the current year.
while [[ ${year} -le $(date +%Y) ]]
do
  for month in "${months[@]}"
  do
      # If the YYYY_MM table doesn't exist, we skip over it.
      exists="$(echo "${tables}" | grep " ${year}_${month} ")"
      if [ -z "${exists}" ]; then
        continue
      fi
      echo "${year}_${month}"

      # Submit a PySpark job via the Cloud Dataproc Jobs API
      gcloud dataproc jobs submit pyspark \
        --cluster ${CLUSTER_NAME} \
        --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar \
        --driver-log-levels root=FATAL \
        backfill.py \
        -- ${year} ${month} ${BUCKET_NAME} &
      sleep 5

      if ${warm_up}; then 
          sleep 10
          warm_up=false 
      fi
  done
  ((year ++))
done

Great! Now let's kick off our shell script:

bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Excellent, you should shortly see a bunch of job completion messages. This output will be a bit messy, so feel free to verify that jobs are completing by going back to the "Jobs" tab of the Cloud Dataproc cluster UI. We can also keep double checking our storage buckets to verify that the data was successfully loaded using gsutil. Once all of the jobs are done, running the following command should give you the following output:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Like before, your output may contain more rows than this as more data is added to BigQuery.

Congratulations, you have successfully completed a backfill for your reddit comments data! If you're interested in how you can build models on top of this data, please continue on to the Spark-NLP codelab.

To avoid incurring unnecessary charges to your GCP account after completion of this quickstart:

  1. Delete the Cloud Storage bucket for the environment and that you created
  2. Delete the Cloud Dataproc environment.

If you created a project just for this codelab, you can also optionally delete the project:

  1. In the GCP Console, go to the Projects page.
  2. In the project list, select the project you want to delete and click Delete.
  3. In the box, type the project ID, and then click Shut down to delete the project.

License

This work is licensed under a Creative Commons Attribution 3.0 Generic License, and Apache 2.0 license.