Exercise: Developing Custom SparkListener to monitor DAGScheduler in Scala
The example shows how to develop a custom Spark Listener. You should read Spark Listeners — Intercepting Events from Spark Scheduler first to understand the motivation for the example.
Requirements
-
IntelliJ IDEA (or eventually sbt alone if you’re adventurous).
-
Access to Internet to download Apache Spark’s dependencies.
Setting up Scala project using IntelliJ IDEA
Create a new project custom-spark-listener
.
Add the following line to build.sbt
(the main configuration file for the sbt project) that adds the dependency on Apache Spark.
1 2 3 4 5 |
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.1" |
build.sbt
should look as follows:
1 2 3 4 5 6 7 8 9 10 11 |
name := "custom-spark-listener" organization := "pl.jaceklaskowski.spark" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.1" |
Custom Listener – pl.jaceklaskowski.spark.CustomSparkListener
Create a Scala class — CustomSparkListener
— for your custom SparkListener
. It should be under src/main/scala
directory (create one if it does not exist).
The aim of the class is to intercept scheduler events about jobs being started and tasks completed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
package pl.jaceklaskowski.spark import org.apache.spark.scheduler.{SparkListenerStageCompleted, SparkListener, SparkListenerJobStart} class CustomSparkListener extends SparkListener { override def onJobStart(jobStart: SparkListenerJobStart) { println(s"Job started with ${jobStart.stageInfos.size} stages: $jobStart") } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { println(s"Stage ${stageCompleted.stageInfo.stageId} completed with ${stageCompleted.stageInfo.numTasks} tasks.") } } |
Creating deployable package
Package the custom Spark listener. Execute sbt package
command in the custom-spark-listener
project’s main directory.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
$ sbt package [info] Loading global plugins from /Users/jacek/.sbt/0.13/plugins [info] Loading project definition from /Users/jacek/dev/workshops/spark-workshop/solutions/custom-spark-listener/project [info] Updating {file:/Users/jacek/dev/workshops/spark-workshop/solutions/custom-spark-listener/project/}custom-spark-listener-build... [info] Resolving org.fusesource.jansi#jansi;1.4 ... [info] Done updating. [info] Set current project to custom-spark-listener (in build file:/Users/jacek/dev/workshops/spark-workshop/solutions/custom-spark-listener/) [info] Updating {file:/Users/jacek/dev/workshops/spark-workshop/solutions/custom-spark-listener/}custom-spark-listener... [info] Resolving jline#jline;2.12.1 ... [info] Done updating. [info] Compiling 1 Scala source to /Users/jacek/dev/workshops/spark-workshop/solutions/custom-spark-listener/target/scala-2.11/classes... [info] Packaging /Users/jacek/dev/workshops/spark-workshop/solutions/custom-spark-listener/target/scala-2.11/custom-spark-listener_2.11-1.0.jar ... [info] Done packaging. [success] Total time: 8 s, completed Oct 27, 2016 11:23:50 AM |
You should find the result jar file with the custom scheduler listener ready under target/scala-2.11
directory, e.g. target/scala-2.11/custom-spark-listener_2.11-1.0.jar
.
Activating Custom Listener in Spark shell
Start spark-shell with additional configurations for the extra custom listener and the jar that includes the class.
1 2 3 4 5 |
$ spark-shell --conf spark.logConf=true --conf spark.extraListeners=pl.jaceklaskowski.spark.CustomSparkListener --driver-class-path target/scala-2.11/custom-spark-listener_2.11-1.0.jar |
Create a Dataset and execute an action like show
to start a job as follows:
1 2 3 4 5 6 7 8 9 |
scala> spark.read.text("README.md").count [CustomSparkListener] Job started with 2 stages: SparkListenerJobStart(1,1473946006715,WrappedArray(org.apache.spark.scheduler.StageInfo@71515592, org.apache.spark.scheduler.StageInfo@6852819d),{spark.rdd.scope.noOverride=true, spark.rdd.scope={"id":"14","name":"collect"}, spark.sql.execution.id=2}) [CustomSparkListener] Stage 1 completed with 1 tasks. [CustomSparkListener] Stage 2 completed with 1 tasks. res0: Long = 7 |
The lines with [CustomSparkListener]
came from your custom Spark listener. Congratulations! The exercise’s over.
BONUS Activating Custom Listener in Spark Application
Questions
-
What are the pros and cons of using the command line version vs inside a Spark application?