Dataproc Serverless

1. Overview - Google Dataproc

Dataproc is a fully managed and highly scalable service for running Apache Spark, Apache Flink, Presto, and many other open source tools and frameworks. Use Dataproc for data lake modernization, ETL / ELT, and secure data science, at planet scale. Dataproc is also fully integrated with several Google Cloud services including BigQuery, Cloud Storage, Vertex AI, and Dataplex.

Dataproc is available in three flavors:

  • Dataproc Serverless allows you to run PySpark jobs without needing to configure infrastructure and autoscaling. Dataproc Serverless supports PySpark batch workloads and sessions / notebooks.
  • Dataproc on Google Compute Engine allows you to manage a Hadoop YARN cluster for YARN-based Spark workloads in addition to open source tools such as Flink and Presto. You can tailor your cloud-based clusters with as much vertical or horizontal scaling as you'd like, including autoscaling.
  • Dataproc on Google Kubernetes Engine allows you to configure Dataproc virtual clusters in your GKE infrastructure for submitting Spark, PySpark, SparkR or Spark SQL jobs.

In this codelab, you will learn several different ways that you can consume Dataproc Serverless.

Apache Spark was originally built to run on Hadoop clusters and used YARN as its resource manager. Maintaining Hadoop clusters requires a specific set of expertise and ensuring many different knobs on the clusters are properly configured. This is in addition to a separate set of knobs that Spark also requires the user to set. This leads to many scenarios where developers are spending more time configuring their infrastructure instead of working on the Spark code itself.

Dataproc Serverless removes the need to manually configure either Hadoop clusters or Spark. Dataproc Serverless does not run on Hadoop and uses its own Dynamic Resource Allocation to determine its resource requirements, including autoscaling. A small subset of Spark properties are still customizable with Dataproc Serverless, however in most instances you will not need to tweak these.

2. Set up

You will begin by configuring your environment and resources used in this codelab.

Create a Google Cloud project. You may use an existing one.

Open Cloud Shell by clicking it in the Cloud Console toolbar.

ba0bb17945a73543.png

Cloud Shell provides a ready-to-use Shell environment you can use for this codelab.

68c4ebd2a8539764.png

Cloud Shell will set your project name by default. Double check by running echo $GOOGLE_CLOUD_PROJECT. If you do not see your project ID in the output, set it.

export GOOGLE_CLOUD_PROJECT=<your-project-id>

Set a Compute Engine region for your resources, such as us-central1 or europe-west2.

export REGION=<your-region>

Enable APIs

The codelab utilizes the following APIs:

  • BigQuery
  • Dataproc

Enable the necessary APIs. This will take about a minute, and a success message will appear when completed.

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

Configure network access

Dataproc Serverless requires Google Private Access to be enabled in the region where you will run your Spark jobs since the Spark drivers and executors only have private IPs. Run the following to enable it in the default subnet.

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

You can verify that Google Private Access is enabled via the following which will output True or False.

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

Create a storage bucket

Create a storage bucket that will be used to store assets created in this codelab.

Choose a name for your bucket. Bucket names must be globally unique across all users.

export BUCKET=<your-bucket-name>

Create the bucket in the region you intend to run your Spark jobs.

gsutil mb -l ${REGION} gs://${BUCKET}

You can see that your bucket is available in the Cloud Storage console. You can also run gsutil ls to see your bucket.

Create a Persistent History Server

The Spark UI provides a rich set of debugging tools and insights into Spark jobs. To view the Spark UI for completed Dataproc Serverless jobs, you must create a single node Dataproc cluster to utilize as a persistent history server.

Set a name for your persistent history server.

PHS_CLUSTER_NAME=my-phs

Run the following.

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

The Spark UI and persistent history server will be explored in more detail later in the codelab.

3. Run Serverless Spark jobs with Dataproc Batches

In this sample, you will work with a set of data from the New York City (NYC) Citi Bike Trips public dataset. NYC Citi Bikes is a paid bike sharing system within NYC. You will perform some simple transformations and print the top ten most popular Citi Bike station ids. This sample also notably uses the open source spark-bigquery-connector to seamlessly read and write data between Spark and BigQuery.

Clone the following Github repo and cd into the directory containing the file citibike.py.

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

Submit the job to Serverless Spark using the Cloud SDK, available in Cloud Shell by default. Run the following command in your shell which utilizes the Cloud SDK and the Dataproc Batches API to submit Serverless Spark jobs.

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

To break this down:

  • gcloud dataproc batches submit references the Dataproc Batches API.
  • pyspark denotes that you are submitting a PySpark job.
  • --batch is the name of the job. If not provided, a random generated UUID will be used.
  • --region=${REGION} is the geographical region the job will be processed in.
  • --deps-bucket=${BUCKET} is where your local Python file is uploaded to before running in the Serverless environment.
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar includes the jar for the spark-bigquery-connector in the Spark runtime environment.
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER} is the fully qualified name of the persistent history server. This is where Spark event data (separate from console output) is stored and viewable from the Spark UI.
  • The trailing -- denotes that anything beyond this will be runtime args for the program. In this case, you are submitting the name of your bucket, as required by the job.

You will see the following output when the batch is submitted.

Batch [citibike-job] submitted.

After a couple of minutes you will see the following output along with metadata from the job.

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

In the next section, you will learn how to locate the logs for this job.

Additional features

With Spark Serverless, you have additional options for running your jobs.

  • You can create a custom docker image that your job runs on. This is a great way to include additional dependencies, including Python and R libraries.
  • You can connect a Dataproc Metastore instance to your job to access Hive metadata.
  • For extra control, Dataproc Serverless supports configuration of a small set of Spark properties.

4. Dataproc Metrics and Observability

The Dataproc Batches Console lists all of your Dataproc Serverless jobs. In the console, you'll see each job's Batch ID, Location, Status, Creation time, Elapsed time and Type. Click on your job's Batch ID to view more information about it.

On this page, you'll see information such as Monitoring which shows how many Batch Spark Executors your job used over time (indicating how much it autoscaled).

On the Details tab you'll see more metadata about the job including any arguments and parameters that were submitted with the job.

You can also access all logs from this page. When Dataproc Serverless jobs are run, three different sets of logs are generated:

  • Service-level
  • Console output
  • Spark event logging

Service-level, includes logs that the Dataproc Serverless service generated. These include things such as Dataproc Serverless requesting extra CPUs for autoscaling. You can view these by clicking View logs which will open Cloud Logging.

Console output can be viewed under Output. This is the output generated by the job, including metadata that Spark prints when beginning a job or any print statements incorporated into the job.

Spark event logging is accessible from the Spark UI. Because you provided your Spark job with a persistent history server, you can access the Spark UI by clicking View Spark History Server, which contains information for your previously run Spark jobs. You can learn more about the Spark UI from the official Spark documentation.

5. Dataproc Templates: BQ -> GCS

Dataproc Templates are open source tools that help further simplify in-Cloud data processing tasks. These serve as a wrapper for Dataproc Serverless and include templates for many data import and export tasks, including:

  • BigQuerytoGCS and GCStoBigQuery
  • GCStoBigTable
  • GCStoJDBC and JDBCtoGCS
  • HivetoBigQuery
  • MongotoGCS and GCStoMongo

The full list is available README.

In this section, you will use Dataproc Templates to export data from BigQuery to GCS.

Clone the repo

Clone the repo and change into the python folder.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

Configure the environment

You'll now set environment variables. Dataproc Templates use the environment variable GCP_PROJECT for your project id, so set this equal to GOOGLE_CLOUD_PROJECT.

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

Your region should be set in the environment from earlier. If not, set it here.

export REGION=<region>

Dataproc Templates use the spark-bigquery-conector for processing BigQuery jobs and require the URI to be included in an environment variable JARS. Set the JARS variable.

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

Configure template parameters

Set the name of a staging bucket for the service to use.

export GCS_STAGING_LOCATION=gs://${BUCKET}

Next, you'll set some job-specific variables. For the input table, you'll again be referencing the BigQuery NYC Citibike dataset.

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

You can choose either csv, parquet, avro or json. For this codelab, choose CSV - in the next section how to use Dataproc Templates to convert file types.

BIGQUERY_GCS_OUTPUT_FORMAT=csv

Set the output mode to overwrite. You can choose between overwrite, append, ignore or errorifexists.

BIGQUERY_GCS_OUTPUT_MODE=overwrite

Set the GCS output location to be a path in your bucket.

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

Run the template

Run the BIGQUERYTOGCS template by specifying it below and providing the input parameters you set.

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

The output will be fairly noisy but after about a minute you will see the following.

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

You can verify that the files were generated by running the following.

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

Spark by default writes to multiple files, depending on the amount of data. In this case, you will see approximately 30 generated files. Spark output file names are formatted with part- followed by a five digit number (indicating the part number) and a hash string. For large amounts of data, Spark will typically write out to several files. An example file name is part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.

6. Dataproc Templates: CSV to parquet

You will now use Dataproc Templates to convert data in GCS from one file type to another using the GCSTOGCS. This template uses SparkSQL and provides the option to also submit a SparkSQL query to be processed during the transformation for additional processing.

Confirm environment variables

Confirm that GCP_PROJECT, REGION, and GCS_STAGING_BUCKET are set from the previous section.

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

Set template parameters

You'll now set configuration parameters for GCStoGCS. Start with the location of the input files. Note that this is a directory and not a specific file as all files in the directory will be processed. Set this to BIGQUERY_GCS_OUTPUT_LOCATION.

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

Set the format of the input file.

GCS_TO_GCS_INPUT_FORMAT=csv

Set the desired output format. You can choose parquet, json, avro or csv.

GCS_TO_GCS_OUTPUT_FORMAT=parquet

Set the output mode to overwrite. You can choose between overwrite, append, ignore or errorifexists.

GCS_TO_GCS_OUTPUT_MODE=overwrite

Set the output location.

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

Run the template

Run the GCStoGCS template.

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

The output will be fairly noisy but after about a minute you should see a success message like below.

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

You can verify that the files were generated by running the following.

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

With this template, you also have the option supply SparkSQL queries by passing gcs.to.gcs.temp.view.name and gcs.to.gcs.sql.query to the template, enabling a SparkSQL query to be run on the data before writing to GCS.

7. Clean up resources

To avoid incurring unnecessary charges to your GCP account after completion of this codelab:

  1. Delete the Cloud Storage bucket for the environment that you created.
gsutil rm -r gs://${BUCKET}
  1. Delete the Dataproc cluster used for your persistent history server.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. Delete the Dataproc Serverless jobs. Go to the Batches Console, click the box next to each job you'd like to delete, and click DELETE.

If you created a project just for this codelab, you can also optionally delete the project:

  1. In the GCP Console, go to the Projects page.
  2. In the project list, select the project you want to delete and click Delete.
  3. In the box, type the project ID, and then click Shut down to delete the project.

8. What's next

The following resources provide additional ways you can take advantage of Serverless Spark: