Managed Service for Apache Spark

1. Introduction - Managed Service for Apache Spark

Managed Service for Apache Spark is a fully managed and highly scalable service for running Apache Spark, Apache Flink, Presto, and many other open source tools and frameworks. Use Managed Service for Apache Spark for data lake modernization, ETL / ELT, and secure data science, at planet scale. Managed Service for Apache Spark is also fully integrated with several Google Cloud services including BigQuery, Cloud Storage, Gemini Enterprise Agent Engine, and Knowledge Catalog.

Managed Service for Apache Spark is available in two deployment modes:

  • Managed Apache Spark serverless allows you to run PySpark jobs without needing to configure infrastructure and autoscaling. Managed Apache Spark supports PySpark batch workloads and sessions / notebooks.
  • Managed Apache Spark clusters allows you to manage a Hadoop YARN cluster for YARN-based Spark workloads in addition to open source tools such as Flink and Presto. You can tailor your cloud-based clusters with as much vertical or horizontal scaling as you'd like, including autoscaling.

2. Create a Managed Apache Spark Cluster on a Google Cloud VPC

In this step, you will create a Managed Apache Spark cluster on Google Cloud using the Google Cloud console.

As a first step, enable the Managed Apache Spark service API on the console. Once it is enabled, search for "Managed Apache Spark" in the search bar and click Create Cluster.

Select Cluster on Compute Engine to use Google Compute Engine(GCE) VMs as the underlying i nfrastructure to run Managed Apache Spark clusters.

a961b2e8895e88da.jpeg

You are now on the Cluster Creation page.

9583c91204a09c12.jpeg

On this page:

  • Provide a unique name for the cluster.
  • Select the specific region. You may also select a Zone, however, Managed Apache Spark provides the ability to automatically choose one for you. For this codelab, select "us-central1" and "us-central1-c"..
  • Select the "Standard" cluster type. This ensures that there is one master node.
  • In the Configure nodes tab, confirm that the number of workers created will be two.
  • In the Customize cluster section, check the box next to Enable Component Gateway. This enables access to web interfaces on the cluster including the Spark UI, Yarn Node Manager and Jupyter notebooks.
  • In the Optional Components, select Jupyter Notebook. This configures the cluster with a Jupyter notebook server.
  • Leave everything else as is and click Create Cluster.

This will spin up a Managed Apache Spark cluster.

3. Launch cluster and SSH into it

Once the cluster status changes to Running, click on the cluster name from the Managed Apache Spark console.

7332f1c2cb25807d.jpeg

Click on the VM Instance tab to view the master node and the two worker nodes of the cluster.

25be1578e00f669f.jpeg

Click on SSH next to the master node to log in to the master node.

2810ffd97f315bdb.jpeg

Run hdfs commands to see the directory structure.

hadoop_commands_example

sudo hadoop fs -ls /
sudo hadoop version
sudo hadoop fs -mkdir /test51 
sudo hadoop fs -ls /

4. Web Interfaces and Component Gateways

From the Managed Apache Spark cluster console click on the name of your cluster, then click on the WEB INTERFACES tab.

6398f71d6293d6ff.jpeg

This shows the available web interfaces, including Jupyter. Click Jupyter to open a Jupyter notebook. You can use this to create notebooks in PySpark stored on GCS. to store your notebook on Google Cloud Storage and open a PySpark notebook to use in this codelab.

5. Monitor and Observe Spark Jobs

With the Managed Apache Spark cluster up and running, create a PySpark batch job and submit the job to the Managed Apache Spark cluster.

Create a Google Cloud Storage (GCS) bucket, to store the PySpark script. Ensure to create the bucket in the same region as the Managed Apache Spark cluster.

679fd2f76806f4e2.jpeg

Now that the GCS bucket is created, copy the following file into this bucket.

https://raw.githubusercontent.com/diptimanr/spark-on-gce/main/test-spark-1.py

This script creates a sample Spark DataFrame and writes it as a Hive table.

hive_job.py

from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row

spark = SparkSession.builder.master("local").enableHiveSupport().getOrCreate()

df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
        (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
    ], schema='a long, b double, c string, d date, e timestamp')
print("..... Writing data .....")
df.write.mode("overwrite").saveAsTable("test_table_1")
print("..... Complete .....")

Submit this script as a Spark batch job in Managed Apache Spark. Click on Jobs in the left navigation menu and then click Submit Job

5767fc7c50b706d3.jpeg

Provide aJob ID and region. Select your cluster and provide the GCS location of the Spark script that you copied. This job will run as a Spark batch job on Managed Apache Spark.

Under Properties add the key spark.submit.deployMode and value client to ensure that the driver runs in the Managed Apache Spark master node and not in the worker nodes. Click Submit to submit the batch job to Managed Apache Spark.

a7ca90f5132faa31.jpeg

The Spark script will create a Dataframe and write to a Hive table test_table_1.

Once the job successfully runs, you can see the console print statements under the Monitoring tab.

bdec2f3ae1055f9.jpeg

Now that the Hive table is created, submit another Hive query job to select the content of the table and to display on the console.

Create another job with the following properties:

c16f02d1b3afaa27.jpeg

Notice the Job Type is set to Hive and the query source type is Query Text, which means we will write the entire HiveQL statement within the Query Text textbox.

Submit the job, keeping the rest of the parameters as default.

e242e50bc2519bf4.jpeg

Notice how the HiveQL selects all records and displays on the console.

6. Autoscaling

Autoscaling is the task of estimating the "right" number of cluster worker nodes for a workload.

The Managed Apache Spark AutoscalingPolicies API provides a mechanism for automating cluster resource management and enables cluster worker VM autoscaling. An Autoscaling Policy is a reusable configuration that describes how cluster workers using the autoscaling policy should scale. It defines scaling boundaries, frequency, and aggressiveness to provide fine-grained control over cluster resources throughout cluster lifetime.

Managed Apache Spark autoscaling policies are written using YAML files and these YAML files are either passed in the CLI command for creating the cluster or selected from a GCS bucket when a cluster is created from the Cloud Console.

Here is an example of a Managed Apache Spark autoscaling policy :

policy.yaml

workerConfig:
  minInstances: 10
  maxInstances: 10
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

7. Configure Managed Apache Spark Optional Components

This will spin up a Managed Apache Spark cluster.

When you create a Managed Apache Spark cluster, standard Apache Hadoop ecosystem components are automatically installed on the cluster (see Managed Apache Spark Version List). You can install additional components, called Optional Components on the cluster when creating the cluster.

e39cc34245af3f01.jpeg

While creating the Managed Apache Spark cluster from the console, we have enabled optional components and selected Jupyter Notebook as the optional component.

8. Clean up resources

To clean up the cluster, click Stop after selecting the cluster from the Managed Apache Spark console. Once the cluster stops, click Delete to delete the cluster.

After the Managed Apache Spark cluster is deleted, delete the GCS buckets where code was copied.

To clean up the resources and to stop any unwanted billing, the Managed Apache Spark cluster needs to be stopped first and then deleted.

Before stopping and deleting the cluster, ensure that all data written to HDFS storage is copied over to GCS for durable storage.

To stop the cluster, click Stop.

52065de928ab52e7.jpeg

Once the cluster stops, click Delete to delete the cluster.

On the confirm dialog, click Delete to delete the cluster.

52065de928ab52e7.jpeg