Chapter 4. Serverless Spark and Ephemeral Dataproc Clusters

Dataproc Serverless Spark is an auto-scaling serverless product for Spark, and it simplifies the execution of Spark applications because the user doesn’t have to think about infrastructure in order to run Spark jobs.

Ephemeral Dataproc clusters, also known as transient clusters, are temporary clusters that run until specific jobs are completed or terminated. Throughout this book, we’ll refer to them as Ephemeral Clusters. Both Serverless and Ephemeral Dataproc clusters support running jobs within a virtual private cloud (VPC) or default network.

In this chapter, you will gain a clear understanding of when to use Dataproc Serverless Spark and Ephemeral Dataproc clusters. You will also learn:

  • How to submit Spark jobs to Dataproc Serverless

  • How to create and run jobs on Ephemeral clusters

  • How to configure a Spark History Server

  • Leveraging Spark RAPIDS Accelerator

  • Pricing and Monitoring of Serverless Spark jobs

Let’s dive in and explore how to scale your Spark workloads efficiently!

4.1 Running on Dataproc: Serverless vs Ephemeral Clusters

Problem

Scenario 1:

You’re tasked with running a Spark job on Dataproc, with the following three criteria:

  1. You want to avoid managing or customizing the cluster, including hardware selection.

  2. There’s no requirement to sequence Spark jobs on the same cluster.

  3. The objective is to execute a Spark job, and not Hadoop MapReduce jobs or Hive QL scripts.

Scenario 2:

You have either a Spark job, Hadoop job, or a sequence involving both, and you intend to run them on a tailored Dataproc cluster, deleting the clusters upon job completion.

Solution

For Scenario 1: Use the Dataproc Serverless service to run the Spark job.

For Scenario 2: Use an Ephemeral Dataproc cluster approach. Create a Dataproc cluster. Run the jobs on the Dataproc cluster. Delete the cluster upon completion of jobs.

Discussion

As above, let’s consider serverless and ephemeral approaches separately.

Dataproc Serverless

Dataproc Serverless for Spark is tailored for Spark jobs. These jobs can be executed on either a private VPC or the default network. To understand how to run a Dataproc Serverless job, refer to Recipe 4.3.

Although you won’t be able to change the hardware (such as machine type), you can select between standard or premium tiers. The premium tier offers upgraded compute and storage capabilities which you can configure for drivers and executors.

Note

For Spark ML, Spark R, or highly computational Spark jobs, opt for the premium tier Dataproc Serverless service for optimal performance.

Ephemeral Dataproc Clusters

Customers managing a data lake often require sequential jobs, spanning tasks from Data Engineering to Data Science. For example, there might be a Spark job 1 that converts raw data from the Bronze zone into a standardized format like Parquet in the Silver zone, followed by a Spark job 2, which generates facts, dimensions, or data marts in the Gold zone.

These jobs can be executed sequentially on a Dataproc cluster, orchestrated via Cloud Composer or Dataproc Workflow Templates, and subsequently deleted using the cluster post-job completion as shown in Recipe 4.2. This cluster is referred to as an Ephemeral Dataproc Cluster because it’s set up to run on-demand and deleted upon job completion.

In this context, leveraging an Ephemeral Dataproc cluster offers several benefits:

  • Customized, job-scoped clusters allow benchmarking and optimal selection of machine types and disks to achieve target throughput

  • Ability to install necessary packages using initialization scripts during startup

  • Cost savings compared to static Dataproc clusters

Let’s consider how the features differ between dataproc serverless and ephemeral dataproc in Table 4-1.

Table 4-1. Comparison of Dataproc Ephemeral and Dataproc Serverless
Feature Dataproc Serverless Ephemeral Dataproc
Auto-scaling Yes, handled by Dataproc Serverless Configurable
Machine Type Selection Cannot be changed Customizable
Tier Selection Standard or Premium Not applicable as customizable at the granular levels like machine type, disks etc.
Startup time 60 seconds 90 seconds
Resource Management Spark Based YARN Framework
Use Cases Suitable for Spark workloads Suitable for Hive, MapReduce, Pig, Spark, and other Hadoop workloads

4.2 Run Sequence of Jobs on Ephemeral Cluster

Problem

You’re orchestrating an end-to-end analytics pipeline. You have a first Spark job that converts CSV to Parquet format and lands in a GCS bucket. Then, job 2 takes the output of the first Spark job, performs aggregations, and prepares a Facts table Parquet output, and you want to run these on an ephemeral cluster.

Solution

Use Dataproc Workflow Templates to orchestrate this process.

  1. Use the gcloud command to create a template:

    gcloud dataproc workflow-templates create {template_name_here} --region=us-central1
  2. Import the jobs and ephemeral cluster details to the workflow template:

    gcloud dataproc workflow-templates import {template_name_here} --region us-central1 --source {config_file_name.yaml}
  3. Instantiate the workflow template using the following gcloud command:

    gcloud dataproc workflow-templates instantiate my_template --region us-central1

Discussion

To run the jobs sequentially on Dataproc cluster, add the job details and ephemeral cluster details in a YAML file as shown below:

jobs:
- pysparkJob:
   args:
   - gcs://lakehouse/input.xml
   mainPythonFileUri: gs://lakehouse/process.py
 stepId: process-large-pyspark

- pysparkJob:
   args:
   - gcs://lakehouse/silver/standard.parquet
   mainPythonFileUri: gs://lakehouse/analytics.py
 stepId: analytics-large-pyspark

placement:
 managedCluster:
   clusterName: three-node-cluster
   config:
     gceClusterConfig:
       zoneUri: us-central1-b
     masterConfig:
       diskConfig:
         bootDiskSizeGb: 500
       machineTypeUri: n1-standard-2
     workerConfig:
       diskConfig:
         bootDiskSizeGb: 500
       machineTypeUri: n1-standard-2
       numInstances: 2

Before instantiating, import the template.yaml file to the workflow template as below:

gcloud dataproc workflow-templates import my_template  --region us-central1 --source template.yaml

4.3 Executing a Spark Batch Job to Convert XML Data to Parquet on Dataproc Serverless

Problem

You want to run a Spark Scala batch job that converts XML to Parquet on Dataproc Serverless.

Solution

Here is the gcloud command to submit a Spark Scala batch job to Dataproc Serverless:

gcloud dataproc batches submit spark  --region=us-central1     --class={class_name_here} --jars={jar1_gcs_path_here},{jar2_gcs_path_here}  -- {file_input_gcs_path_here} -- {argument1} {argument2} {argument3}

This job requires 3 arguments and relies on 2 dependency JARs. Replace placeholders with the appropriate values:

{class_name_here}

Replace with the name of the main class for your Spark job.

{jar1_gcs_path_here}

This is the GCS (Google Cloud Storage) path for the Spark XML JAR.

{jar2_gcs_path_here}

This is the GCS path for the Spark application JAR.

{file_input_gcs_path_here}

This is the GCS path for the input XML file.

{argument1}

Replace with the appropriate value for the input XML file.

{argument2}

Replace with the desired output file path.

{argument3}

Replace with the root tag for the input XML file.

Discussion

Here’s a snippet of the Scala code for the XML to Parquet conversion:

import org.apache.spark.sql.{SparkSession, SaveMode}

object xmltoparquet {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("XmlToParquet")
      .master("yarn")
      .getOrCreate()
      
    // Reading XML and infer schema
    val xmlDataFrame = spark.read
      .format("com.databricks.spark.xml")
      .option("rowTag", args(2)) // Passing Root Tag
      .load(args(0)) // Input XML file GCS path

    xmlDataFrame.write
      .mode(SaveMode.Overwrite)
      .parquet(args(1)) // Output Parquet files GCS Path

    spark.stop()
  }
}

Execute a batch Spark job on Dataproc Serverless using the provided gcloud command. Here’s an example:

gcloud dataproc batches submit spark \
  --region=us-central1 \
--jars="gs://dataproc-cookbook/chapter2/spark/scala/jar/spark-xml_2.12-0.16.0.jar","gs://dataproc-cookbook/chapter2/spark/scala/jar/xmltoparquet_2.12-0.1.jar" \
  --class=com.dataprocessing.XmltoParquet \
  -- "gs://dataproc-cookbook/chapter2/spark/scala/inputfiles/menu.xml" "gs://dataproc-cookbook/chapter2/spark/scala/outputfiles/" "food"

In order to run a batch Spark job on Dataproc Serverless via Console, navigate to Dataproc and select “Batches” in the Serverless section. Then, create a batch job by selecting “Create” as shown in Figure 4-1.

Creating a batch Spark job
Figure 4-1. Creating a batch Spark job

Choose the appropriate runtime environment for your Spark job by selecting from the “Runtime Version” and include any necessary arguments or dependency files that are referenced by the Spark code, as shown in Figure 4-2 and Figure 4-3. These dependency files will be extracted into the working directory of each Spark executor.

Adding application and dependency JARs
Figure 4-2. Adding application and dependency JARs
Adding Arguments for the Spark Job
Figure 4-3. Adding Arguments for the Spark Job

Next, choose appropriate tiers for driver and executor and then submit the job, as shown in Figure 4-4.

Choosing the Executor and Driver Compute and Storage Tiers
Figure 4-4. Choosing the Executor and Driver Compute and Storage Tiers
Note

Best Practice: Configure --ttl to terminate the application if it keeps running indefinitely.

4.4 Running a Serverless Job Using Premium Tier Configuration

Problem

You’re dealing with a Serverless Spark application that processes large volumes of data, currently experiencing extended runtime. Upon monitoring, it’s evident that the job involves compute-intensive and disk write-intensive operations. How can you optimize this job?

Solution

Leverage Premium Tier.

Utilize the provided gcloud command template to submit a Spark Scala batch job to Dataproc Serverless with premium driver and executor compute and disk tier configuration:

gcloud dataproc batches submit --project {project_name_here} --region {region_name_here} pyspark --batch {unique_job_name_here} {pyspark_file_path_here} --version {version_number_here} --properties spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium,spark.dataproc.driver.disk.tier=premium,spark.dataproc.executor.disk.tier=premium,spark.dataproc.driver.disk.size=750g,spark.dataproc.executor.disk.size=1500g

Discussion

The choice between Premium and Standard tiers in Dataproc Serverless depends on your task’s demands and cost considerations. Here’s a breakdown of what each tier is and when you should consider using them:

Standard Tier

Offers a comparatively lower per-core performance but is a lower cost option than the premium tier.

When to choose Standard:

  • Cost-sensitive tasks: When performance requirements are minimal, and cost optimization is a primary concern.

  • Tasks with minimal memory or shuffle requirements: When memory usage and shuffle operations are not significant bottlenecks.

Premium Tier

Offers higher per core performance, but the pricing is also high.

Allows for a higher limit of driver and executor memory per core compared to the Standard tier. For instance, spark.driver.memory must fall within the range of 1024m to 7424m for the Standard compute tier, whereas it can go up to 24576m for the premium tier.

Premium disk tiers upgrade local and shuffle storage on both the driver and executors for better IOPS and throughput.

When to choose Premium:

  • Memory-intensive tasks and shuffle-heavy tasks

Table 4-2. Feature comparison between Premium and Standard tiers
Feature Premium Standard
Compute Tier Higher per-core performance Lower per-core performance
Cost Higher billing rate Comparatively Lower
Driver and Executor Disk Tier Better IOPS & throughput Standard performance
Driver Memory Overhead Higher (larger memory allocations) Lower

Below is a sample PySpark code snippet for reading a Parquet file, aggregating, and writing as Parquet output:

import sys
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark=SparkSession.builder.appName("dailyinsights").getOrCreate()

#Get the input path, output path
inputPath = sys.argv[1]
outputPath = sys.argv[2]

# Read parquet file using read.parquet()
parquetDF=spark.read.parquet(inputPath)

aggregateDF=parquetDF.groupBy("day").agg(sum("views"))

aggregateDF.show(10)

aggregateDF.printSchema()

aggregateDF.write.mode("Overwrite").parquet(outputPath)

print("Application Completed!!!")

# Closing the Spark session
spark.stop()

Here’s a sample gcloud command for running a PySpark job that reads a Parquet file and runs on premium storage and compute. When this PySpark job starts, autoscaling will adjust the number of active executors.

gcloud dataproc batches submit --project anu-psodata-lab --region us-central1 pyspark --batch test-1d gs://dataproc-cookbook/chapter4/gcstobiqguery.py --version 1.1.37 --subnet default --service-account 1072535324208-compute@developer.gserviceaccount.com --properties spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium,spark.dataproc.driver.disk.tier=premium,spark.dataproc.executor.disk.tier=premium,spark.dataproc.driver.disk.size=750g,spark.dataproc.executor.disk.size=1500g,spark.executor.instances=2,spark.driver.cores=4,spark.executor.cores=4,spark.dynamicAllocation.executorAllocationRatio=0.3

4.5 Giving a Unique Custom Name to a Dataproc Serverless Spark Job

Problem

You want to run a batch Spark job on Dataproc Serverless and give a name to the batch job. How do you give a custom name to a Dataproc Serverless batch Spark job?

Solution

By default the gcloud command in Recipe 4.3 generates a unique name. You can use --batch property in the gcloud command to customize the name of the batch Spark job.

The following gcloud command submits a Spark Scala batch job to Dataproc Serverless with the custom name you passed.

gcloud dataproc batches submit spark  --batch={job_name_here} --region=us-central1     --class={class_name_here} --jars={jar1_gcs_path_here},{jar2_gcs_path_here}  -- {file_input_gcs_path_here} -- {argument1} {argument2} {argument3}

Discussion

Here is the sample gcloud command thats creates and run a batch serverless Spark job named “serverless-1h” to convert XML input menu.xml to Parquet:

gcloud dataproc batches submit spark  --batch=serverless-1k --region=us-central1     --jars="gs://dataproc-cookbook/chapter2/spark/scala/jar/spark-xml_2.12-0.16.0.jar","gs://dataproc-cookbook/chapter2/spark/scala/jar/xmltoparquet_2.12-0.1.jar" --class=com.dataprocessing.XmltoParquet --version=1.1 -- "gs://dataproc-cookbook/chapter2/spark/scala/inputfiles/menu.xml" "gs://dataproc-cookbook/chapter2/spark/scala/outputfiles/" "food"

The gcloud command creates the serverless Spark job which you can see in the “Batches” console. Navigate to the job’s output section to see the status and output of the Spark application as shown in Figure 4-5.

Validating the name of the Serverless Spark job in Batches console
Figure 4-5. Validating the name of the Serverless Spark job in Batches console
Note

Batch id has to be unique for every run, add a logic to apply a unique string for every run and also adhere to appropriate naming conventions for the Dataproc Serverless Spark jobs.

4.6 Clone a Dataproc Serverless Spark Job

Problem

You have successfully run a Dataproc Serverless Spark job with specific arguments and Spark Properties. How do you clone and submit this job again without manually updating the arguments and properties?

Solution

Navigate to the Jobs list in the Batches section, identify the successful Spark run and click “clone” to replicate the job (see Figure 4-6).

Job Details of the job  test premium 17  in Batches Section
Figure 4-6. Job Details of the job “test-premium-17” in Batches Section

Discussion

Once you clone, provide a unique name for the Spark batch ID. Then, validate all configurations and hit Submit (Figure 4-7).

Cloning the job in Batches Section
Figure 4-7. Cloning the job in Batches Section

Alternatively, you can clone the batch job by copying the gcloud command using the “Equivalent Command Line” feature (see Figure 4-8).

Equivalent Command Line of the Job
Figure 4-8. Equivalent Command Line of the Job

In a production environment, developers might have restricted access and they may not have access to create clusters or run jobs from the console. To ensure best practices, orchestrate and standardize the cloning process through services like Cloud Composer.

4.7 Run a Serverless Job on Spark RAPIDS Accelerator

Problem

You have a Serverless spark job that runs large-scale data analytics workloads. How do you accelerate Spark workload performance using Spark RAPIDS ?

Solution

You can configure GPU accelerators when submitting batch jobs to Dataproc Serverless to boost the performance significantly. Here is the gcloud command to configure GPUs when submitting the batch job:

gcloud dataproc batches submit --project {project_name_here} --region {region_here} pyspark --batch {batch_job_name} {pyspark_gcs_file_path} --version 1.1 --subnet default --service-account {service_account_email} --properties spark.dataproc.executor.resource.accelerator.type=l4,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium,spark.dataproc.driver.disk.tier=premium,spark.dataproc.executor.disk.tier=premium --labels org-domain=finance

Discussion

The Spark RAPIDS accelerates Spark workloads by enabling the power of GPUs in Spark DataFrame and Spark SQL. GPU offers higher throughput, Spark features like RDD are not GPU enabled and they will directly run on CPUs.

Here is the sample gcloud batch job that runs a PySpark job to read and process large input data on Spark RAPIDS.

gcloud dataproc batches submit --project anu-psodata-lab --region us-central1 pyspark --batch test-accelerator-1 gs://dataproc-cookbook/chapter4/gcstobiqguery.py --version 1.1 --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar --subnet default --service-account 1072535324208-compute@developer.gserviceaccount.com --properties spark.dataproc.executor.resource.accelerator.type=l4,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium,spark.dataproc.driver.disk.tier=premium,spark.dataproc.executor.disk.tier=premium,spark.dataproc.driver.disk.size=375g,spark.dataproc.executor.disk.size=1500g --labels org-domain=finance -- gs://dataproc-cookbook/chapter4/large/* gs://dataproc-cookbook/chapter4/largeoutput

The property “spark.dataproc.executor.resource.accelerator.type” configures GPUs. Accelerator type could be L4 or A100 (NVIDIA L4 or NVIDIA A100).

Note

When using the accelerator, you must configure the compute and storage tier to be ‘premium’ for both driver and executor.

When there is a wide dependency transformation, Spark performs a shuffle. Accelerator supports Spark shuffle plugin. It leverages Unified Communication X (UCX) as the transport and speeds up shuffle transfers.

You can also configure the accelerators via console as shown in Figure 4-9.

Configure accelerators in  Properties  section
Figure 4-9. Configure accelerators in “Properties” section

The driver and executor compute/storage must be premium and the value of driver and executor disk size must not be set as shown in Figure 4-10.

Configure premium disk and compute tier
Figure 4-10. Configure premium disk and compute tier

When accelerator is configured, pricing will include accelerator usage and also the premium tiers.

4.8 How to Configure a Spark History Server

Problem

You are running Spark jobs on Dataproc Serverless or Ephemeral Dataproc Cluster and you want to set up a Spark History Server to monitor all the completed Spark Applications run on Dataproc Serverless or Ephemeral Cluster. How do you configure a Spark History Server?

Solution

Here is the sample gcloud command to setup a Spark History Server:

gcloud dataproc clusters create {phs_cluster_name_here} --enable-component-gateway --region {region_name_here} --zone {zone_here} --single-node --master-machine-type n2-highmem-8 --master-boot-disk-size 500 --image-version 2.0-debian10 --properties yarn:yarn.nodemanager.remote-app-log-dir=gs://$GCS_BUCKET/yarn-logs,mapred:mapreduce.jobhistory.done-dir=gs://$GCS_BUCKET/events/mapreduce-job-history/done,mapred:mapreduce.jobhistory.intermediate-done-dir=gs://$GCS_BUCKET/events/mapreduce-job-history/intermediate-done,spark:spark.eventLog.dir=gs://$GCS_BUCKET/events/spark-job-history,spark:spark.history.fs.logDirectory=gs://$GCS_BUCKET/events/spark-job-history,spark:SPARK_DAEMON_MEMORY=16000m,spark:spark.history.custom.executor.log.url.applyIncompleteApplication=false,spark:spark.history.custom.executor.log.url={{YARN_LOG_SERVER_URL}}/{{NM_HOST}}:{{NM_PORT}}/{{CONTAINER_ID}}/{{CONTAINER_ID}}/{{USER}}/{{FILE_NAME}} \
--project $PROJECT_NAME

Discussion

Spark History Serverless is stateless, it parses the Spark application event logs persisted in GCS and constructs the Spark UI of the applications.

As a best practice, increase the history server memory config SPARK_DAEMON_MEMORY to 16g if there are long-running Spark jobs with more than 50K tasks.

Configure the server on n2-highmem-8 or n2-highmem-8+ machines.

If you want to configure an Ephemeral Dataproc Cluster to write logs to the Spark History Server, here is the gcloud command:

gcloud dataproc clusters create {job_cluster_name_here} --enable-component-gateway \
--region {region_name_here} --zone {zone} --master-machine-type n1-standard-4 --master-boot-disk-size 500 --num-workers 2 --worker-machine-type n1-standard-4 --worker-boot-disk-size 500 --image-version 2.0-debian10 --properties yarn:yarn.nodemanager.remote-app-log-dir=gs://$GCS_BUCKET/yarn-logs,
spark:spark.eventLog.dir=gs://$GCS_BUCKET/events/spark-job-history,spark:spark.history.fs.logDirectory=gs://$GCS_BUCKET/events/spark-job-history,spark:spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE,spark:spark.history.fs.gs.outputstream.sync.min.interval.ms=1000ms --project $PROJECT_NAME

If you want to configure Dataproc Serverless to write logs to Spark History Server, refer to Recipe 4.7.

4.9 Writing Spark Events to the Spark History Server from Dataproc Serverless

Problem

You have a Spark History Server setup, and you are submitting Spark jobs to Dataproc Serverless. You want to read the logs in Spark History Server after it is completed.

Solution

If you haven’t configured a Spark History Server yet, refer to Recipe 4.6 for setup instructions.

Once configured, utilize the --history-server-cluster property to set up Spark UI for Serverless Spark jobs.

The following gcloud command submits a Spark Scala batch job to Dataproc Serverless and writes logs to the specified Spark History Server:

gcloud dataproc batches submit spark  --batch={job_name_here} --region={region_here}     --class={class_name_here} --jars={jar1_gcs_path_here},{jar2_gcs_path_here}  -- {file_input_gcs_path_here} -- {argument1} {argument2} {argument3} --history-server-cluster projects/{project_name_here}/regions/{region_here}/clusters/{cluster_name_here}

Discussion

The following is a sample gcloud command that submits the Scala Spark job that converts XML files to Parquet to the Dataproc Serverless and logs the Spark events to “dataproc-phs” history server.

gcloud dataproc batches submit --project anu-psodata-lab --region us-central1 spark --batch serverless-1s --class com.dataprocessing.XmltoParquet --version 1.1 --jars gs://dataproc-cookbook/chapter2/spark/scala/jar/spark-xml_2.12-0.16.0.jar,gs://dataproc-cookbook/chapter2/spark/scala/jar/xmltoparquet_2.12-0.1.jar --subnet default --history-server-cluster projects/anu-psodata-lab/regions/us-central1/clusters/dataproc-phs -- gs://dataproc-cookbook/chapter2/spark/scala/inputfiles/menu.xml gs://dataproc-cookbook/chapter2/spark/scala/outputfiles/ food

Another way to configure is via Console as shown in Figure 4-11.

Configure History Server Cluster when Submitting Dataproc Serverless job via Console
Figure 4-11. Configure History Server Cluster when Submitting Dataproc Serverless job via Console

There are two ways to monitor completed batch Serveless Spark jobs in the Spark Web UI:

  1. Navigate to the corresponding batch job. “View Spark History Server” will be active if it’s configured as shown in Figure 4-12. If it’s not active, configure the history server when submitting a job.

    Batch Job has  View Spark History Server  active to Show the Completed Applications
    Figure 4-12. Batch Job has “View Spark History Server” active to Show the Completed Applications
  2. Navigate to the Spark History Server cluster and access the Web Interfaces as shown in Figure 4-13. The component gateway will show the details of the Spark Applications.

    Web Interfaces section in Spark History Server cluster
    Figure 4-13. Web Interfaces section in Spark History Server cluster

Configuring Ephemeral Dataproc Clusters for the Spark History Server

To configure an Ephemeral Dataproc Cluster to write event logs to the Spark History Server, here is the gcloud command:

gcloud dataproc clusters create {job_cluster_name_here} --enable-component-gateway --region {region_here} --zone {zone_here} --master-machine-type {machine_type_here} --master-boot-disk-size 500 --num-workers 2 --worker-machine-type {machine_type_here} --worker-boot-disk-size 500 --image-version 2.0-debian10 --properties yarn:yarn.nodemanager.remote-app-log-dir=gs://{gcs_bucket_here}/yarn-logs,mapred:mapreduce.jobhistory.done-dir=gs://{gcs_bucket_here}/events/mapreduce-job-history/done,mapred:mapreduce.jobhistory.intermediate-done-dir=gs://{gcs_bucket_here}/events/mapreduce-job-history/intermediate-done,spark:spark.eventLog.dir=gs://{gcs_bucket_here}/events/spark-job-history,spark:spark.history.fs.logDirectory=gs://{gcs_bucket_here}/events/spark-job-history,spark:spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE,spark:spark.history.fs.gs.outputstream.sync.min.interval.ms=1000ms --project {project_name_here}

4.10 Monitoring of Serverless Spark jobs

Problem

You are running a Spark job on Dataproc Serverless and you want to monitor the status and performance of the job.

Solution

To monitor your Serverless Spark jobs on Dataproc:

Monitoring from UI:

Navigate to the job in the Dataproc Batches console. Here, you will see details like:

  • The number of running Spark executors

  • Job status (running, failed, succeeded)

Navigate to the corresponding job in Dataproc Batches console as shown in Figure 4-14.

Job Monitoring UI in Batches console
Figure 4-14. Job Monitoring UI in Batches console
Monitoring using gcloud command:

Use the gcloud command to submit the job (refer to Recipe 4.3) and observe the streamed details:

  • Job status

  • Driver output

Spark Events and Driver Logs:

For deeper insights, use Spark Web UI (Spark History Server) outlined in Recipe 9.4

Discussion

Here is the sample gcloud command that creates a Serverless Spark job named “serverless-1j” which converts the XML to Parquet.

gcloud dataproc batches submit spark  --batch=serverless-1j --region=us-central1     --jars="gs://dataproc-cookbook/chapter2/spark/scala/jar/spark-xml_2.12-0.16.0.jar","gs://dataproc-cookbook/chapter2/spark/scala/jar/xmltoparquet_2.12-0.1.jar" --class=com.dataprocessing.XmltoParquet -- "gs://dataproc-cookbook/chapter2/spark/scala/inputfiles/menu.xml" "gs://dataproc-cookbook/chapter2/spark/scala/outputfiles/" "food"

Gcloud command in the terminal will stream details like Job status and driver output as shown in the Figure 4-15:

Job status and driver output in the Terminal
Figure 4-15. Job status and driver output in the Terminal

Let’s take an example PySpark job that reads Parquet input that has daily Wikipedia views. The Spark code snippet shown here groups by day, sums the views and writes the output as Parquet.

import sys
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark=SparkSession.builder.appName("dailyinsights").getOrCreate()

#Get the input path, output path
inputPath = sys.argv[1]
outputPath = sys.argv[2]

# Read parquet file using read.parquet()
parquetDF=spark.read.parquet(inputPath)

aggregateDF=parquetDF.groupBy("day").agg(sum("views"))

aggregateDF.write.mode("Overwrite").parquet(outputPath)

print("Application Completed!!!")

# Closing the Spark session
spark.stop()

In order to determine what aspects to monitor and where, let’s first understand the Spark Execution Model at a broad level

When the sample PySpark job is submitted, Spark Resource Manager will create an Application Master (AM) container. The AM container launches the driver thread and runs the main method. Spark is written in Scala and always runs on JVM. PySpark will start a JVM application using the Py4J connection. Driver doesn’t perform any processing, it requests the resource managers for executor containers. Both the Spark driver and executors are JVM applications.

If we look at the application code shared above, there are 2 actions (read action and write action). Each of these actions trigger one or more Spark jobs. There is one transformation in this code which is a wide dependency transformation (group by and aggregate). The driver prepares a logical query plan for each of the jobs, then, the driver breaks the job into stages after each wide dependency transformation. Each stage consists of a collection of tasks that are run concurrently on executors and all the tasks in a stage perform the same type of operation on a different data.

To monitor your Spark application, there are 4 important places to look:

  • Monitoring UI

  • Output UI

  • Details UI

  • Spark Web UI (Spark History Server)

The Monitoring UI will show the number of executors that run the tasks at each moment as was shown in Figure 4-15.

The Spark driver log is streamed to the Output Console and this contains application output and status as shown in 4-16.

Application driver output in Output UI
Figure 4-16. Application driver output in Output UI

The details section will show the executor and driver compute tier, size of memory and cores as shown in Figure 4-17.

Details section shows the Spark Properties of the job
Figure 4-17. Details section shows the Spark Properties of the job

The Spark UI will help to monitor granular levels like jobs, stages and tasks. You can look for stragglers and data skew. Stragglers are the tasks within a stage that take longer to execute than the other tasks in that stage. Refer to Recipe 9.4 to understand how to monitor using the Spark UI.

See Also

The Anatomy of a Spark Job section in High Performance Spark by Holden Karau and Rachel Warren

4.11 Calculate the Price of a Serverless Batch

Problem

You’re running Spark jobs on Dataproc Serverless, but you’re unsure how to figure out the cost of a batch job. How do you calculate the price of a single Batch job?

Solution

The most straightforward way is using the billing console.

Navigate to the billing reports section in the GCP console, choose the correct project and Dataproc under Services dropdown. Select the label “goog-dataproc-batch-id” and the corresponding batch job id in value as shown in Figure 4-18.

Dataproc Serverless Cost Breakdown
Figure 4-18. Dataproc Serverless Cost Breakdown

Discussion

When you submit a Spark job to Dataproc Serverless, it generates default labels like goog-dataproc-batch-id, goog-dataproc-batch-uuid, and goog-dataproc-location. You can filter using these labels to find the cost of the job. To calculate the cost grouped by domain or teams, add custom labels like org-domain when running the Spark jobs as shown in the Figure 4-19.

Adding Custom Labels to the Dataproc Serverless Batch Job
Figure 4-19. Adding Custom Labels to the Dataproc Serverless Batch Job

Remember, Dataproc Serverless for Spark pricing is based on the amount of compute, storage, and accelerators used.

The total compute cost is the number of Data Compute Units (DCUs), the number of accelerators used, and the amount of shuffle storage used.

Approximate DCU  Accelerator and shuffle storage usage metrics of the Batch Job
Figure 4-20. Approximate DCU, Accelerator and shuffle storage usage metrics of the Batch Job
Tip

To view billing information in a Google Cloud project, the user must have the following privileges:

  • Billing account Viewer role: This role allows the user to view billing information for the entire billing account, including all projects in the account.

  • Project Viewer role: This role allows the user to view billing information for a specific project.

The details section will show the approximate DCU, Accelerator and shuffle storage usage metrics as shown in Figure 4-20. This job ran on premium compute and storage tiers as shown in Figure 4-21. The cost breaks down as follows:

Total Compute Cost: ~0.256 DUC-hours X $0.100232 per hour = $0.0256

Total Storage Cost: ~0.451 GB-months X $0.11 per GB = $0.04961

Total cost: ~$0.0256 + ~$0.04961 = ~$0.07521

Batch Job Configured to run on Premium Tier
Figure 4-21. Batch Job Configured to run on Premium Tier

Get Dataproc Cookbook now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.