Dataproc on Google Compute Engine

1. Introduction - Google Dataproc

Dataproc is a fully managed and highly scalable service for running Apache Spark, Apache Flink, Presto, and many other open source tools and frameworks. Use Dataproc for data lake modernization, ETL / ELT, and secure data science, at planet scale. Dataproc is also fully integrated with several Google Cloud services including BigQuery, Cloud Storage, Vertex AI, and Dataplex.

Dataproc is available in three flavors:

  • Dataproc Serverless allows you to run PySpark jobs without needing to configure infrastructure and autoscaling. Dataproc Serverless supports PySpark batch workloads and sessions / notebooks.
  • Dataproc on Google Compute Engine allows you to manage a Hadoop YARN cluster for YARN-based Spark workloads in addition to open source tools such as Flink and Presto. You can tailor your cloud-based clusters with as much vertical or horizontal scaling as you'd like, including autoscaling.
  • Dataproc on Google Kubernetes Engine allows you to configure Dataproc virtual clusters in your GKE infrastructure for submitting Spark, PySpark, SparkR or Spark SQL jobs.

2. Create a Dataproc Cluster on a Google Cloud VPC

In this step, you will create a Dataproc cluster on Google Cloud using the Google Cloud console.

As a first step, enable the Dataproc service API on the console. Once it is enabled, search for "Dataproc" in the search bar and click Create Cluster.

Select Cluster on Compute Engine to use Google Compute Engine(GCE) VMs as the underlying infrastructure to run Dataproc clusters.

a961b2e8895e88da.jpeg

You are now on the Cluster Creation page.

9583c91204a09c12.jpeg

On this page:

  • Provide a unique name for the cluster.
  • Select the specific region. You may also select a Zone, however, Dataproc provides the ability to automatically choose one for you. For this codelab, select "us-central1" and "us-central1-c"..
  • Select the "Standard" cluster type. This ensures that there is one master node.
  • In the Configure nodes tab, confirm that the number of workers created will be two.
  • In the Customize cluster section, check the box next to Enable Component Gateway. This enables access to web interfaces on the cluster including the Spark UI, Yarn Node Manager and Jupyter notebooks.
  • In the Optional Components, select Jupyter Notebook. This configures the cluster with a Jupyter notebook server.
  • Leave everything else as is and click Create Cluster.

This will spin up a Dataproc cluster.

3. Launch cluster and SSH into it

Once the cluster status changes to Running, click on the cluster name from the Dataproc console.

7332f1c2cb25807d.jpeg

Click on the VM Instance tab to view the master node and the two worker nodes of the cluster.

25be1578e00f669f.jpeg

Click on SSH next to the master node to log in to the master node.

2810ffd97f315bdb.jpeg

Run hdfs commands to see the directory structure.

hadoop_commands_example

sudo hadoop fs -ls /
sudo hadoop version
sudo hadoop fs -mkdir /test51 
sudo hadoop fs -ls /

4. Web Interfaces and Component Gateways

From the Dataproc cluster console click on the name of your cluster, then click on the WEB INTERFACES tab.

6398f71d6293d6ff.jpeg

This shows the available web interfaces, including Jupyter. Click Jupyter to open a Jupyter notebook. You can use this to create notebooks in PySpark stored on GCS. to store your notebook on Google Cloud Storage and open a PySpark notebook to use in this codelab.

5. Monitor and Observe Spark Jobs

With the Dataproc cluster up and running, create a PySpark batch job and submit the job to the Dataproc cluster.

Create a Google Cloud Storage (GCS) bucket, to store the PySpark script. Ensure to create the bucket in the same region as the Dataproc cluster.

679fd2f76806f4e2.jpeg

Now that the GCS bucket is created, copy the following file into this bucket.

https://raw.githubusercontent.com/diptimanr/spark-on-gce/main/test-spark-1.py

This script creates a sample Spark DataFrame and writes it as a Hive table.

hive_job.py

from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row

spark = SparkSession.builder.master("local").enableHiveSupport().getOrCreate()

df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
        (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
    ], schema='a long, b double, c string, d date, e timestamp')
print("..... Writing data .....")
df.write.mode("overwrite").saveAsTable("test_table_1")
print("..... Complete .....")

Submit this script as a Spark batch job in Dataproc. Click on Jobs in the left navigation menu and then click Submit Job

5767fc7c50b706d3.jpeg

Provide aJob ID and region. Select your cluster and provide the GCS location of the Spark script that you copied. This job will run as a Spark batch job on Dataproc.

Under Properties add the key spark.submit.deployMode and value client to ensure that the driver runs in the Dataproc master node and not in the worker nodes. Click Submit to submit the batch job to Dataproc.

a7ca90f5132faa31.jpeg

The Spark script will create a Dataframe and write to a Hive table test_table_1.

Once the job successfully runs, you can see the console print statements under the Monitoring tab.

bdec2f3ae1055f9.jpeg

Now that the Hive table is created, submit another Hive query job to select the content of the table and to display on the console.

Create another job with the following properties:

c16f02d1b3afaa27.jpeg

Notice the Job Type is set to Hive and the query source type is Query Text, which means we will write the entire HiveQL statement within the Query Text textbox.

Submit the job, keeping the rest of the parameters as default.

e242e50bc2519bf4.jpeg

Notice how the HiveQL selects all records and displays on the console.

6. Autoscaling

Autoscaling is the task of estimating the "right" number of cluster worker nodes for a workload.

The Dataproc AutoscalingPolicies API provides a mechanism for automating cluster resource management and enables cluster worker VM autoscaling. An Autoscaling Policy is a reusable configuration that describes how cluster workers using the autoscaling policy should scale. It defines scaling boundaries, frequency, and aggressiveness to provide fine-grained control over cluster resources throughout cluster lifetime.

Dataproc autoscaling policies are written using YAML files and these YAML files are either passed in the CLI command for creating the cluster or selected from a GCS bucket when a cluster is created from the Cloud Console.

Here is an example of a Dataproc autoscaling policy :

policy.yaml

workerConfig:
  minInstances: 10
  maxInstances: 10
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

7. Configure Dataproc Optional Components

This will spin up a Dataproc cluster.

When you create a Dataproc cluster, standard Apache Hadoop ecosystem components are automatically installed on the cluster (see Dataproc Version List). You can install additional components, called Optional Components on the cluster when creating the cluster.

e39cc34245af3f01.jpeg

While creating the Dataproc cluster from the console, we have enabled optional components and selected Jupyter Notebook as the optional component.

8. Clean up resources

To clean up the cluster, click Stop after selecting the cluster from the Dataproc console. Once the cluster stops, click Delete to delete the cluster.

After the Dataproc cluster is deleted, delete the GCS buckets where code was copied.

To clean up the resources and to stop any unwanted billing, the Dataproc cluster needs to be stopped first and then deleted.

Before stopping and deleting the cluster, ensure that all data written to HDFS storage is copied over to GCS for durable storage.

To stop the cluster, click Stop.

52065de928ab52e7.jpeg

Once the cluster stops, click Delete to delete the cluster.

On the confirm dialog, click Delete to delete the cluster.

52065de928ab52e7.jpeg