SQLListener Spark Listener
SQLListener Spark Listener
SQLListener
is a custom SparkListener that collects information about SQL query executions for web UI (to display in SQL tab). It relies on spark.sql.execution.id key to distinguish between queries.
Internally, it uses SQLExecutionUIData data structure exclusively to record all the necessary data for a single SQL query execution. SQLExecutionUIData
is tracked in the internal registries, i.e. activeExecutions
, failedExecutions
, and completedExecutions
as well as lookup tables, i.e. _executionIdToData
, _jobIdToExecutionId
, and _stageIdToStageMetrics
.
SQLListener
starts recording a query execution by intercepting a SparkListenerSQLExecutionStart event (using onOtherEvent callback).
SQLListener
stops recording information about a SQL query execution when SparkListenerSQLExecutionEnd event arrives.
It defines the other callbacks (from SparkListener interface):
-
onExecutorMetricsUpdate
-
onStageSubmitted
-
onTaskEnd
Registering Job and Stages under Active Execution — onJobStart
Callback
1 2 3 4 5 |
onJobStart(jobStart: SparkListenerJobStart): Unit |
onJobStart
reads the spark.sql.execution.id
key, the identifiers of the job and the stages and then updates the SQLExecutionUIData for the execution id in activeExecutions
internal registry.
Note
|
When onJobStart is executed, it is assumed that SQLExecutionUIData has already been created and available in the internal activeExecutions registry.
|
The job in SQLExecutionUIData is marked as running with the stages added (to stages
). For each stage, a SQLStageMetrics
is created in the internal _stageIdToStageMetrics
registry. At the end, the execution id is recorded for the job id in the internal _jobIdToExecutionId
.
onOtherEvent
Callback
In onOtherEvent
, SQLListener
listens to the following SparkListenerEvent events:
Registering Active Execution — SparkListenerSQLExecutionStart
Event
1 2 3 4 5 6 7 8 9 10 11 12 |
case class SparkListenerSQLExecutionStart( executionId: Long, description: String, details: String, physicalPlanDescription: String, sparkPlanInfo: SparkPlanInfo, time: Long) extends SparkListenerEvent |
SparkListenerSQLExecutionStart
events starts recording information about the executionId
SQL query execution.
When a SparkListenerSQLExecutionStart
event arrives, a new SQLExecutionUIData for the executionId
query execution is created and stored in activeExecutions
internal registry. It is also stored in _executionIdToData
lookup table.
SparkListenerSQLExecutionEnd
Event
1 2 3 4 5 6 7 8 |
case class SparkListenerSQLExecutionEnd( executionId: Long, time: Long) extends SparkListenerEvent |
SparkListenerSQLExecutionEnd
event stops recording information about the executionId
SQL query execution (tracked as SQLExecutionUIData). SQLListener
saves the input time
as completionTime
.
If there are no other running jobs (registered in SQLExecutionUIData), the query execution is removed from the activeExecutions
internal registry and moved to either completedExecutions
or failedExecutions
registry.
This is when SQLListener
checks the number of SQLExecutionUIData
entires in either registry — failedExecutions
or completedExecutions
— and removes the excess of the old entries beyond spark.sql.ui.retainedExecutions Spark property.
SparkListenerDriverAccumUpdates
Event
1 2 3 4 5 6 7 8 |
case class SparkListenerDriverAccumUpdates( executionId: Long, accumUpdates: Seq[(Long, Long)]) extends SparkListenerEvent |
When SparkListenerDriverAccumUpdates
comes, SQLExecutionUIData for the input executionId
is looked up (in _executionIdToData
) and SQLExecutionUIData.driverAccumUpdates
is updated with the input accumUpdates
.
onJobEnd
Callback
1 2 3 4 5 |
onJobEnd(jobEnd: SparkListenerJobEnd): Unit |
When called, onJobEnd
retrieves the SQLExecutionUIData for the job and records it either successful or failed depending on the job result.
If it is the last job of the query execution (tracked as SQLExecutionUIData), the execution is removed from activeExecutions
internal registry and moved to either
If the query execution has already been marked as completed (using completionTime
) and there are no other running jobs (registered in SQLExecutionUIData), the query execution is removed from the activeExecutions
internal registry and moved to either completedExecutions
or failedExecutions
registry.
This is when SQLListener
checks the number of SQLExecutionUIData
entires in either registry — failedExecutions
or completedExecutions
— and removes the excess of the old entries beyond spark.sql.ui.retainedExecutions Spark property.
Getting SQL Execution Data — getExecution
Method
1 2 3 4 5 |
getExecution(executionId: Long): Option[SQLExecutionUIData] |
Getting Execution Metrics — getExecutionMetrics
Method
1 2 3 4 5 |
getExecutionMetrics(executionId: Long): Map[Long, String] |
getExecutionMetrics
gets the metrics (aka accumulator updates) for executionId
(by which it collects all the tasks that were used for an execution).
It is exclusively used to render the ExecutionPage page in web UI.
mergeAccumulatorUpdates
Method
mergeAccumulatorUpdates
is a private
helper method for…TK
It is used exclusively in getExecutionMetrics method.
SQL Tab — Monitoring Structured Queries in web UI
SQL Tab — Monitoring Structured Queries in web UI
SQL tab in web UI shows SQLMetrics per physical operator in a structured query physical plan.
You can access the SQL tab under /SQL
URL, e.g. http://localhost:4040/SQL/.
By default, it displays all SQL query executions. However, after a query has been selected, the SQL tab displays the details for the structured query execution.
AllExecutionsPage
AllExecutionsPage
displays all SQL query executions in a Spark application per state sorted by their submission time reversed.
Internally, the page requests SQLListener for query executions in running, completed, and failed states (the states correspond to the respective tables on the page).
ExecutionPage — Details for Query
ExecutionPage
shows details for structured query execution by id
.
Note
|
The id request parameter is mandatory.
|
ExecutionPage
displays a summary with Submitted Time, Duration, the clickable identifiers of the Running Jobs, Succeeded Jobs, and Failed Jobs.
It also display a visualization (using accumulator updates and the SparkPlanGraph
for the query) with the expandable Details section (that corresponds to SQLExecutionUIData.physicalPlanDescription
).
If there is no information to display for a given query id
, you should see the following page.
Internally, it uses SQLListener exclusively to get the SQL query execution metrics. It requests SQLListener
for SQL execution data to display for the id
request parameter.
Creating SQLTab Instance
SQLTab
is created when SharedState is or at the first SparkListenerSQLExecutionStart event when Spark History Server is used.
Note
|
SharedState represents the shared state across SparkSessions .
|
ShuffledRowRDD
ShuffledRowRDD
ShuffledRowRDD
is an RDD
of internal binary rows (i.e. RDD[InternalRow]
).
Note
|
ShuffledRowRDD looks like ShuffledRDD, and the difference is in the type of the values to process, i.e. InternalRow and (K, C) key-value pairs, respectively.
|
ShuffledRowRDD
takes a ShuffleDependency (of integer keys and InternalRow values).
Note
|
The dependency property is mutable and is of type ShuffleDependency[Int, InternalRow, InternalRow] .
|
ShuffledRowRDD
takes an optional specifiedPartitionStartIndices
collection of integers that is the number of post-shuffle partitions. When not specified, the number of post-shuffle partitions is managed by the Partitioner of the input ShuffleDependency
.
Note
|
Post-shuffle partition is…FIXME |
Name | Description |
---|---|
|
A single-element collection with |
|
CoalescedPartitioner (with the Partitioner of the |
Computing Partition (in TaskContext) — compute
Method
1 2 3 4 5 |
compute(split: Partition, context: TaskContext): Iterator[InternalRow] |
Note
|
compute is part of Spark Core’s RDD Contract to compute a partition (in a TaskContext ).
|
Internally, compute
makes sure that the input split
is a ShuffledRowRDDPartition. It then requests ShuffleManager
for a ShuffleReader
to read InternalRow
s for the split
.
Note
|
compute uses SparkEnv to access the current ShuffleManager .
|
Note
|
compute uses ShuffleHandle (of ShuffleDependency dependency) and the pre-shuffle start and end partition offsets.
|
Getting Placement Preferences of Partition — getPreferredLocations
Method
1 2 3 4 5 |
getPreferredLocations(partition: Partition): Seq[String] |
Note
|
getPreferredLocations is part of RDD contract to specify placement preferences (aka preferred task locations), i.e. where tasks should be executed to be as close to the data as possible.
|
Internally, getPreferredLocations
requests MapOutputTrackerMaster
for the preferred locations of the input partition
(for the single ShuffleDependency).
Note
|
getPreferredLocations uses SparkEnv to access the current MapOutputTrackerMaster (which runs on the driver).
|
FileScanRDD — Input RDD of FileSourceScanExec Physical Operator
FileScanRDD — Input RDD of FileSourceScanExec Physical Operator
FileScanRDD
is an RDD
of internal binary rows (i.e. RDD[InternalRow]
) that is the one and only input RDD of FileSourceScanExec physical operator.
FileScanRDD
is created exclusively when FileSourceScanExec
physical operator is requested to createBucketedReadRDD and createNonBucketedReadRDD (which is when FileSourceScanExec
is requested for the input RDD that WholeStageCodegenExec
physical operator uses when executed).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
val q = spark.read.text("README.md") val sparkPlan = q.queryExecution.executedPlan import org.apache.spark.sql.execution.FileSourceScanExec val scan = sparkPlan.collectFirst { case exec: FileSourceScanExec => exec }.get val inputRDD = scan.inputRDDs.head val rdd = q.queryExecution.toRdd scala> println(rdd.toDebugString) (1) MapPartitionsRDD[1] at toRdd at <console>:26 [] | FileScanRDD[0] at toRdd at <console>:26 [] val fileScanRDD = q.queryExecution.toRdd.dependencies.head.rdd // What FileSourceScanExec uses for the input RDD is exactly the first RDD in the lineage assert(inputRDD == fileScanRDD) |
Name | Description |
---|---|
|
spark.sql.files.ignoreCorruptFiles Used exclusively when |
|
spark.sql.files.ignoreMissingFiles Used exclusively when |
getPreferredLocations
Method
1 2 3 4 5 |
getPreferredLocations(split: RDDPartition): Seq[String] |
Note
|
getPreferredLocations is part of the RDD Contract to…FIXME.
|
getPreferredLocations
…FIXME
getPartitions
Method
1 2 3 4 5 |
getPartitions: Array[RDDPartition] |
Note
|
getPartitions is part of the RDD Contract to…FIXME.
|
getPartitions
…FIXME
Creating FileScanRDD Instance
FileScanRDD
takes the following when created:
-
Read function that takes a PartitionedFile and gives internal rows back (i.e.
(PartitionedFile) ⇒ Iterator[InternalRow]
)
Computing Partition (in TaskContext) — compute
Method
1 2 3 4 5 |
compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] |
Note
|
compute is part of Spark Core’s RDD Contract to compute a partition (in a TaskContext ).
|
compute
creates a Scala Iterator (of Java Objects
) that…FIXME
compute
then requests the input TaskContext
to register a completion listener to be executed when a task completes (i.e. addTaskCompletionListener
) that simply closes the iterator.
In the end, compute
returns the iterator.
LocalDateTimeEncoder — Custom ExpressionEncoder for java.time.LocalDateTime
LocalDateTimeEncoder — Custom ExpressionEncoder for java.time.LocalDateTime
Spark SQL does not support java.time.LocalDateTime
values in a Dataset
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
import java.time.LocalDateTime val data = Seq((0, LocalDateTime.now)) scala> val times = data.toDF("time") java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDateTime - field (class: "java.time.LocalDateTime", name: "_2") - root class: "scala.Tuple2" at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:643) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:445) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:445) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:637) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:625) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:344) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:625) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:445) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:445) at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:434) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71) at org.apache.spark.sql.Encoders$.product(Encoders.scala:275) at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34) ... 50 elided |
As it is clearly said in the exception, the root cause is no Encoder found for java.time.LocalDateTime
(as there is not one available in Spark SQL).
You could define one using ExpressionEncoder, but that does not seem to work either.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder scala> ExpressionEncoder[java.time.LocalDateTime] java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDateTime - root class: "java.time.LocalDateTime" at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:643) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:445) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:445) at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:434) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71) ... 50 elided |
The simplest solution is to transform the Dataset
with java.time.LocalDateTime
to a supported type that Spark SQL offers an encoder for.
A much better solution would be to provide a custom Encoder
that would expand the types supported in Spark SQL.
LocalDateTimeEncoder
is an attempt to develop a custom ExpressionEncoder for Java’s java.time.LocalDateTime so you don’t have to map values to another supported type.
public final class LocalDateTime
A date-time without a time-zone in the ISO-8601 calendar system, such as 2007-12-03T10:15:30
.
LocalDateTime
is an immutable date-time object that represents a date-time, often viewed as year-month-day-hour-minute-second.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
// A very fresh attempt at creating an Encoder for java.time.LocalDateTime // See ExpressionEncoder.apply import org.apache.spark.sql.catalyst.expressions.BoundReference import org.apache.spark.sql.catalyst.ScalaReflection import java.time.LocalDateTime val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[LocalDateTime], nullable = true) // ScalaReflection.serializerFor[LocalDateTime](inputObject) import org.apache.spark.sql.catalyst.expressions.{CreateNamedStruct, Literal} import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types.DateType // Simply invokes DateTimeUtils.fromJavaDate // fromJavaDate(date: Date): Int // serializing a Date to an Int // Util object to do conversion (serialization) object LocalDateTimeUtils { import java.time.LocalDateTime def fromLocalDateTime(date: LocalDateTime): Long = date.toEpochSecond(java.time.ZoneOffset.UTC) } val other = StaticInvoke( LocalDateTimeUtils.getClass, DateType, "fromLocalDateTime", inputObject :: Nil, returnNullable = false) val serializer = CreateNamedStruct(Literal("value") :: other :: Nil) val schema = serializer.dataType // ScalaReflection.deserializerFor[T] // FIXME Create it as for ScalaReflection.serializerFor above val deserializer = serializer // FIXME import scala.reflect.ClassTag import scala.reflect.runtime.universe.{typeTag, TypeTag} val mirror = ScalaReflection.mirror val tpe = typeTag[java.time.LocalDateTime].in(mirror).tpe val cls = mirror.runtimeClass(tpe) import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder val localDateTimeEncoder = new ExpressionEncoder[java.time.LocalDateTime]( schema, flat = true, serializer.flatten, deserializer, ClassTag[java.time.LocalDateTime](cls)) import org.apache.spark.sql.Encoder implicit val encLocalDateTime: Encoder[java.time.LocalDateTime] = localDateTimeEncoder // DEMO val data = Seq(LocalDateTime.now) val times = spark.createDataset(data) // (encLocalDateTime) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
// $ SPARK_SUBMIT_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005" ./bin/spark-shell --conf spark.rpc.askTimeout=5m import java.time.LocalDateTime import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types._ val schema = StructType( $"year".int :: $"month".int :: $"day".int :: Nil) import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.types.ObjectType import org.apache.spark.sql.catalyst.expressions.BoundReference val clazz = classOf[java.time.LocalDateTime] val inputObject = BoundReference(0, ObjectType(clazz), nullable = true) val nullSafeInput = inputObject import org.apache.spark.sql.types.TimestampType val staticInvoke = StaticInvoke( classOf[java.time.LocalDateTime], TimestampType, "parse", inputObject :: Nil)) // Based on UDTRegistration val clazz = classOf[java.time.LocalDateTime] import org.apache.spark.sql.catalyst.expressions.objects.NewInstance val obj = NewInstance( cls = clazz, arguments = Nil, dataType = ObjectType(clazz)) import org.apache.spark.sql.catalyst.expressions.objects.Invoke // the following would be nice to have // FIXME How to bind them all up into one BoundReference? import org.apache.spark.sql.types.IntegerType val yearRef = BoundReference(0, IntegerType, nullable = true) val monthRef = BoundReference(1, IntegerType, nullable = true) val dayOfMonthRef = BoundReference(2, IntegerType, nullable = true) val hourRef = BoundReference(3, IntegerType, nullable = true) val minuteRef = BoundReference(4, IntegerType, nullable = true) import org.apache.spark.sql.types.ArrayType val inputObject = BoundReference(0, ArrayType(IntegerType), nullable = true) def invoke(inputObject: Expression, fieldName: String) = Invoke( targetObject = inputObject, functionName = fieldName, dataType = IntegerType) import org.apache.spark.sql.catalyst.expressions.CreateNamedStruct import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.expressions.GetArrayItem val year = GetArrayItem(inputObject, Literal(0)) val month = GetArrayItem(inputObject, Literal(1)) val day = GetArrayItem(inputObject, Literal(2)) val hour = GetArrayItem(inputObject, Literal(3)) val minute = GetArrayItem(inputObject, Literal(4)) // turn LocalDateTime into InternalRow // by saving LocalDateTime in parts val serializer = CreateNamedStruct( Literal("year") :: year :: Literal("month") :: month :: Literal("day") :: day :: Literal("hour") :: hour :: Literal("minute") :: minute :: Nil) import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.util.DateTimeUtils val getPath: Expression = Literal("value") val deserializer: Expression = StaticInvoke( DateTimeUtils.getClass, ObjectType(classOf[java.time.LocalDateTime]), "toJavaTimestamp", getPath :: Nil) // we ask serializer about the schema val schema: StructType = serializer.dataType import scala.reflect._ implicit def scalaLocalDateTime: Encoder[java.time.LocalDateTime] = new ExpressionEncoder[java.time.LocalDateTime]( schema, flat = false, // serializer.size == 1 serializer.flatten, deserializer, classTag[java.time.LocalDateTime]) // the above leads to the following exception // Add log4j.logger.org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator=DEBUG to see the code scala> scalaLocalDateTime.asInstanceOf[ExpressionEncoder[LocalDateTime]].toRow(LocalDateTime.now) java.lang.RuntimeException: Error while encoding: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData input[0, array<int>, true][0] AS year#0 input[0, array<int>, true][1] AS month#1 input[0, array<int>, true][2] AS day#2 input[0, array<int>, true][3] AS hour#3 input[0, array<int>, true][4] AS minute#4 at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291) ... 52 elided Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getArray(rows.scala:48) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getArray(rows.scala:194) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288) ... 52 more // and so the following won't work either val times = Seq(LocalDateTime.now).toDF("time") |
Open Questions
-
ScalaReflection.serializerFor
passesObjectType
objects through -
ScalaReflection.serializerFor
usesStaticInvoke
forjava.sql.Timestamp
andjava.sql.Date
.1234567891011121314151617case t if t <:< localTypeOf[java.sql.Timestamp] =>StaticInvoke(DateTimeUtils.getClass,TimestampType,"fromJavaTimestamp",inputObject :: Nil)case t if t <:< localTypeOf[java.sql.Date] =>StaticInvoke(DateTimeUtils.getClass,DateType,"fromJavaDate",inputObject :: Nil) -
How could
SQLUserDefinedType
andUDTRegistration
help here?
RowEncoder — Encoder for DataFrames
RowEncoder — Encoder for DataFrames
RowEncoder
is part of the Encoder framework and acts as the encoder for DataFrames, i.e. Dataset[Row]
— Datasets of Rows.
Note
|
DataFrame type is a mere type alias for Dataset[Row] that expects a Encoder[Row] available in scope which is indeed RowEncoder itself.
|
RowEncoder
is an object
in Scala with apply and other factory methods.
RowEncoder
can create ExpressionEncoder[Row]
from a schema (using apply method).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import org.apache.spark.sql.types._ val schema = StructType( StructField("id", LongType, nullable = false) :: StructField("name", StringType, nullable = false) :: Nil) import org.apache.spark.sql.catalyst.encoders.RowEncoder scala> val encoder = RowEncoder(schema) encoder: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row] = class[id[0]: bigint, name[0]: string] // RowEncoder is never flat scala> encoder.flat res0: Boolean = false |
RowEncoder
object belongs to org.apache.spark.sql.catalyst.encoders
package.
Creating ExpressionEncoder For Row Type — apply
method
1 2 3 4 5 |
apply(schema: StructType): ExpressionEncoder[Row] |
apply
builds ExpressionEncoder of Row, i.e. ExpressionEncoder[Row]
, from the input StructType (as schema
).
Internally, apply
creates a BoundReference for the Row type and returns a ExpressionEncoder[Row]
for the input schema
, a CreateNamedStruct
serializer (using serializerFor
internal method), a deserializer for the schema, and the Row
type.
serializerFor
Internal Method
1 2 3 4 5 |
serializerFor(inputObject: Expression, inputType: DataType): Expression |
serializerFor
creates an Expression
that is assumed to be CreateNamedStruct
.
serializerFor
takes the input inputType
and:
-
Returns the input
inputObject
as is for native types, i.e.NullType
,BooleanType
,ByteType
,ShortType
,IntegerType
,LongType
,FloatType
,DoubleType
,BinaryType
,CalendarIntervalType
.CautionFIXME What does being native type mean? -
For
UserDefinedType
s, it takes the UDT class from theSQLUserDefinedType
annotation orUDTRegistration
object and returns an expression withInvoke
to callserialize
method on aNewInstance
of the UDT class. -
For TimestampType, it returns an expression with a StaticInvoke to call
fromJavaTimestamp
onDateTimeUtils
class. -
…FIXME
Caution
|
FIXME Describe me. |
ExpressionEncoder — Expression-Based Encoder
ExpressionEncoder — Expression-Based Encoder
ExpressionEncoder[T]
is a generic Encoder of JVM objects of the type T
to and from internal binary rows.
ExpressionEncoder[T]
uses expressions for a serializer and a deserializer.
Note
|
ExpressionEncoder is the only supported implementation of Encoder which is explicitly enforced when Dataset is created (even though Dataset data structure accepts a bare Encoder[T] ).
|
1 2 3 4 5 6 7 8 9 10 11 12 |
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder val stringEncoder = ExpressionEncoder[String] scala> val row = stringEncoder.toRow("hello world") row: org.apache.spark.sql.catalyst.InternalRow = [0,100000000b,6f77206f6c6c6568,646c72] import org.apache.spark.sql.catalyst.expressions.UnsafeRow scala> val unsafeRow = row match { case ur: UnsafeRow => ur } unsafeRow: org.apache.spark.sql.catalyst.expressions.UnsafeRow = [0,100000000b,6f77206f6c6c6568,646c72] |
ExpressionEncoder
uses serializer expressions to encode (aka serialize) a JVM object of type T
to an internal binary row format (i.e. InternalRow
).
Note
|
It is assumed that all serializer expressions contain at least one and the same BoundReference. |
ExpressionEncoder
uses a deserializer expression to decode (aka deserialize) a JVM object of type T
from internal binary row format.
ExpressionEncoder
is flat when serializer uses a single expression (which also means that the objects of a type T
are not created using constructor parameters only like Product
or DefinedByConstructorParams
types).
Internally, a ExpressionEncoder
creates a UnsafeProjection (for the input serializer), a InternalRow (of size 1
), and a safe Projection
(for the input deserializer). They are all internal lazy attributes of the encoder.
Property | Description |
---|---|
Used exclusively when |
|
Used exclusively when |
|
Used…FIXME |
Note
|
Encoders object contains the default ExpressionEncoders for Scala and Java primitive types, e.g. boolean , long , String , java.sql.Date , java.sql.Timestamp , Array[Byte] .
|
Creating ExpressionEncoder — apply
Method
1 2 3 4 5 |
apply[T : TypeTag](): ExpressionEncoder[T] |
Caution
|
FIXME |
Creating ExpressionEncoder Instance
ExpressionEncoder
takes the following when created:
-
Serializer expressions (to convert objects of type
T
to internal rows) -
Deserializer expression (to convert internal rows to objects of type
T
) -
Scala’s ClassTag for the JVM type
T
Creating Deserialize Expression — ScalaReflection.deserializerFor
Method
1 2 3 4 5 |
deserializerFor[T: TypeTag]: Expression |
deserializerFor
creates an expression to deserialize from internal binary row format to a Scala object of type T
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import org.apache.spark.sql.catalyst.ScalaReflection.deserializerFor val timestampDeExpr = deserializerFor[java.sql.Timestamp] scala> println(timestampDeExpr.numberedTreeString) 00 staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - root class: "java.sql.Timestamp"), true) 01 +- upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - root class: "java.sql.Timestamp") 02 +- getcolumnbyordinal(0, TimestampType) val tuple2DeExpr = deserializerFor[(java.sql.Timestamp, Double)] scala> println(tuple2DeExpr.numberedTreeString) 00 newInstance(class scala.Tuple2) 01 :- staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - field (class: "java.sql.Timestamp", name: "_1"), - root class: "scala.Tuple2"), true) 02 : +- upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - field (class: "java.sql.Timestamp", name: "_1"), - root class: "scala.Tuple2") 03 : +- getcolumnbyordinal(0, TimestampType) 04 +- upcast(getcolumnbyordinal(1, DoubleType), DoubleType, - field (class: "scala.Double", name: "_2"), - root class: "scala.Tuple2") 05 +- getcolumnbyordinal(1, DoubleType) |
Internally, deserializerFor
calls the recursive internal variant of deserializerFor with a single-element walked type path with - root class: "[clsName]"
Tip
|
Read up on Scala’s TypeTags in TypeTags and Manifests.
|
Note
|
deserializerFor is used exclusively when ExpressionEncoder is created for a Scala type T .
|
Recursive Internal deserializerFor
Method
1 2 3 4 5 6 7 8 |
deserializerFor( tpe: `Type`, path: Option[Expression], walkedTypePath: Seq[String]): Expression |
JVM Type (Scala or Java) | Deserialize Expressions |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
User Defined Types (UDTs) |
|
Creating Serialize Expression — ScalaReflection.serializerFor
Method
1 2 3 4 5 |
serializerFor[T: TypeTag](inputObject: Expression): CreateNamedStruct |
serializerFor
creates a CreateNamedStruct expression to serialize a Scala object of type T
to internal binary row format.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import org.apache.spark.sql.catalyst.ScalaReflection.serializerFor import org.apache.spark.sql.catalyst.expressions.BoundReference import org.apache.spark.sql.types.TimestampType val boundRef = BoundReference(ordinal = 0, dataType = TimestampType, nullable = true) val timestampSerExpr = serializerFor[java.sql.Timestamp](boundRef) scala> println(timestampSerExpr.numberedTreeString) 00 named_struct(value, input[0, timestamp, true]) 01 :- value 02 +- input[0, timestamp, true] |
Internally, serializerFor
calls the recursive internal variant of serializerFor with a single-element walked type path with - root class: "[clsName]"
and pattern match on the result expression.
Caution
|
FIXME the pattern match part |
Tip
|
Read up on Scala’s TypeTags in TypeTags and Manifests.
|
Note
|
serializerFor is used exclusively when ExpressionEncoder is created for a Scala type T .
|
Recursive Internal serializerFor
Method
1 2 3 4 5 6 7 8 9 |
serializerFor( inputObject: Expression, tpe: `Type`, walkedTypePath: Seq[String], seenTypeSet: Set[`Type`] = Set.empty): Expression |
serializerFor
creates an expression for serializing an object of type T
to an internal row.
Caution
|
FIXME |
Encoding JVM Object to Internal Binary Row Format — toRow
Method
1 2 3 4 5 |
toRow(t: T): InternalRow |
toRow
encodes (aka serializes) a JVM object t
as an internal binary row.
Internally, toRow
sets the only JVM object to be t
in inputRow and converts the inputRow
to a unsafe binary row (using extractProjection).
In case of any exception while serializing, toRow
reports a RuntimeException
:
1 2 3 4 5 6 |
Error while encoding: [initial exception] [multi-line serializer] |
Note
|
|
Decoding JVM Object From Internal Binary Row Format — fromRow
Method
1 2 3 4 5 |
fromRow(row: InternalRow): T |
fromRow
decodes (aka deserializes) a JVM object from a row
InternalRow (with the required values only).
Internally, fromRow
uses constructProjection with row
and gets the 0th element of type ObjectType
that is then cast to the output type T
.
In case of any exception while deserializing, fromRow
reports a RuntimeException
:
1 2 3 4 5 6 |
Error while decoding: [initial exception] [deserializer] |
Note
|
|
Creating ExpressionEncoder For Tuple — tuple
Method
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] tuple[T](e: ExpressionEncoder[T]): ExpressionEncoder[Tuple1[T]] tuple[T1, T2]( e1: ExpressionEncoder[T1], e2: ExpressionEncoder[T2]): ExpressionEncoder[(T1, T2)] tuple[T1, T2, T3]( e1: ExpressionEncoder[T1], e2: ExpressionEncoder[T2], e3: ExpressionEncoder[T3]): ExpressionEncoder[(T1, T2, T3)] tuple[T1, T2, T3, T4]( e1: ExpressionEncoder[T1], e2: ExpressionEncoder[T2], e3: ExpressionEncoder[T3], e4: ExpressionEncoder[T4]): ExpressionEncoder[(T1, T2, T3, T4)] tuple[T1, T2, T3, T4, T5]( e1: ExpressionEncoder[T1], e2: ExpressionEncoder[T2], e3: ExpressionEncoder[T3], e4: ExpressionEncoder[T4], e5: ExpressionEncoder[T5]): ExpressionEncoder[(T1, T2, T3, T4, T5)] |
tuple
…FIXME
Note
|
tuple is used when…FIXME
|
resolveAndBind
Method
1 2 3 4 5 6 7 |
resolveAndBind( attrs: Seq[Attribute] = schema.toAttributes, analyzer: Analyzer = SimpleAnalyzer): ExpressionEncoder[T] |
resolveAndBind
…FIXME
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
// A very common use case case class Person(id: Long, name: String) import org.apache.spark.sql.Encoders val schema = Encoders.product[Person].schema import org.apache.spark.sql.catalyst.encoders.{RowEncoder, ExpressionEncoder} import org.apache.spark.sql.Row val encoder: ExpressionEncoder[Row] = RowEncoder.apply(schema).resolveAndBind() import org.apache.spark.sql.catalyst.InternalRow val row = InternalRow(1, "Jacek") val deserializer = encoder.deserializer scala> deserializer.eval(row) java.lang.UnsupportedOperationException: Only code-generated evaluation is supported at org.apache.spark.sql.catalyst.expressions.objects.CreateExternalRow.eval(objects.scala:1105) ... 54 elided import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext val ctx = new CodegenContext val code = deserializer.genCode(ctx).code |
Note
|
|
Encoders Factory Object
Encoders Factory Object
Encoders
is a factory object that…FIXME
Creating Encoder Using Kryo — kryo
Method
1 2 3 4 5 |
kryo[T: ClassTag]: Encoder[T] |
kryo
simply creates an encoder that serializes objects of type T
using Kryo (i.e. the useKryo
flag is enabled).
Note
|
kryo is used when…FIXME
|
Creating Encoder Using Java Serialization — javaSerialization
Method
1 2 3 4 5 |
javaSerialization[T: ClassTag]: Encoder[T] |
javaSerialization
simply creates an encoder that serializes objects of type T
using the generic Java serialization (i.e. the useKryo
flag is disabled).
Note
|
javaSerialization is used when…FIXME
|
Creating Generic Encoder — genericSerializer
Internal Method
1 2 3 4 5 |
genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] |
genericSerializer
…FIXME
Note
|
genericSerializer is used when Encoders is requested for a generic encoder using Kryo and Java Serialization.
|
Encoder — Internal Row Converter
Encoder — Internal Row Converter
Encoder is the fundamental concept in the serialization and deserialization (SerDe) framework in Spark SQL 2.0. Spark SQL uses the SerDe framework for IO to make it efficient time- and space-wise.
Tip
|
Spark has borrowed the idea from the Hive SerDe library so it might be worthwhile to get familiar with Hive a little bit, too. |
Encoders are modelled in Spark SQL 2.0 as Encoder[T]
trait.
1 2 3 4 5 6 7 8 |
trait Encoder[T] extends Serializable { def schema: StructType def clsTag: ClassTag[T] } |
The type T
stands for the type of records a Encoder[T]
can deal with. An encoder of type T
, i.e. Encoder[T]
, is used to convert (encode and decode) any JVM object or primitive of type T
(that could be your domain object) to and from Spark SQL’s InternalRow which is the internal binary row format representation (using Catalyst expressions and code generation).
Note
|
Encoder is also called “a container of serde expressions in Dataset”.
|
Note
|
The one and only implementation of the Encoder trait in Spark SQL 2 is ExpressionEncoder.
|
Encoders are integral (and internal) part of any Dataset[T] (of records of type T
) with a Encoder[T]
that is used to serialize and deserialize the records of this dataset.
Note
|
Dataset[T] type is a Scala type constructor with the type parameter T . So is Encoder[T] that handles serialization and deserialization of T to the internal representation.
|
Encoders know the schema of the records. This is how they offer significantly faster serialization and deserialization (comparing to the default Java or Kryo serializers).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
// The domain object for your records in a large dataset case class Person(id: Long, name: String) import org.apache.spark.sql.Encoders scala> val personEncoder = Encoders.product[Person] personEncoder: org.apache.spark.sql.Encoder[Person] = class[id[0]: bigint, name[0]: string] scala> personEncoder.schema res0: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(name,StringType,true)) scala> personEncoder.clsTag res1: scala.reflect.ClassTag[Person] = Person import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder scala> val personExprEncoder = personEncoder.asInstanceOf[ExpressionEncoder[Person]] personExprEncoder: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[Person] = class[id[0]: bigint, name[0]: string] // ExpressionEncoders may or may not be flat scala> personExprEncoder.flat res2: Boolean = false // The Serializer part of the encoder scala> personExprEncoder.serializer res3: Seq[org.apache.spark.sql.catalyst.expressions.Expression] = List(assertnotnull(input[0, Person, true], top level non-flat input object).id AS id#0L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, Person, true], top level non-flat input object).name, true) AS name#1) // The Deserializer part of the encoder scala> personExprEncoder.deserializer res4: org.apache.spark.sql.catalyst.expressions.Expression = newInstance(class Person) scala> personExprEncoder.namedExpressions res5: Seq[org.apache.spark.sql.catalyst.expressions.NamedExpression] = List(assertnotnull(input[0, Person, true], top level non-flat input object).id AS id#2L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, Person, true], top level non-flat input object).name, true) AS name#3) // A record in a Dataset[Person] // A mere instance of Person case class // There could be a thousand of Person in a large dataset val jacek = Person(0, "Jacek") // Serialize a record to the internal representation, i.e. InternalRow scala> val row = personExprEncoder.toRow(jacek) row: org.apache.spark.sql.catalyst.InternalRow = [0,0,1800000005,6b6563614a] // Spark uses InternalRows internally for IO // Let's deserialize it to a JVM object, i.e. a Scala object import org.apache.spark.sql.catalyst.dsl.expressions._ // in spark-shell there are competing implicits // That's why DslSymbol is used explicitly in the following line scala> val attrs = Seq(DslSymbol('id).long, DslSymbol('name).string) attrs: Seq[org.apache.spark.sql.catalyst.expressions.AttributeReference] = List(id#8L, name#9) scala> val jacekReborn = personExprEncoder.resolveAndBind(attrs).fromRow(row) jacekReborn: Person = Person(0,Jacek) // Are the jacek instances same? scala> jacek == jacekReborn res6: Boolean = true |
You can create custom encoders using static methods of Encoders
object. Note however that encoders for common Scala types and their product types are already available in implicits
object.
1 2 3 4 5 6 |
val spark = SparkSession.builder.getOrCreate() import spark.implicits._ |
Tip
|
The default encoders are already imported in spark-shell. |
Encoders map columns (of your dataset) to fields (of your JVM object) by name. It is by Encoders that you can bridge JVM objects to data sources (CSV, JDBC, Parquet, Avro, JSON, Cassandra, Elasticsearch, memsql) and vice versa.
Note
|
In Spark SQL 2.0 DataFrame type is a mere type alias for Dataset[Row] with RowEncoder being the encoder.
|
Creating Custom Encoders (Encoders object)
Encoders
factory object defines methods to create Encoder
instances.
Import org.apache.spark.sql
package to have access to the Encoders
factory object.
1 2 3 4 5 6 7 8 |
import org.apache.spark.sql.Encoders scala> Encoders.LONG res1: org.apache.spark.sql.Encoder[Long] = class[value[0]: bigint] |
You can find methods to create encoders for Java’s object types, e.g. Boolean
, Integer
, Long
, Double
, String
, java.sql.Timestamp
or Byte
array, that could be composed to create more advanced encoders for Java bean classes (using bean
method).
1 2 3 4 5 6 7 8 |
import org.apache.spark.sql.Encoders scala> Encoders.STRING res2: org.apache.spark.sql.Encoder[String] = class[value[0]: string] |
You can also create encoders based on Kryo or Java serializers.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import org.apache.spark.sql.Encoders case class Person(id: Int, name: String, speaksPolish: Boolean) scala> Encoders.kryo[Person] res3: org.apache.spark.sql.Encoder[Person] = class[value[0]: binary] scala> Encoders.javaSerialization[Person] res5: org.apache.spark.sql.Encoder[Person] = class[value[0]: binary] |
You can create encoders for Scala’s tuples and case classes, Int
, Long
, Double
, etc.
1 2 3 4 5 6 7 8 |
import org.apache.spark.sql.Encoders scala> Encoders.tuple(Encoders.scalaLong, Encoders.STRING, Encoders.scalaBoolean) res9: org.apache.spark.sql.Encoder[(Long, String, Boolean)] = class[_1[0]: bigint, _2[0]: string, _3[0]: boolean] |
Further Reading and Watching
-
(video) Modern Spark DataFrame and Dataset (Intermediate Tutorial) by Adam Breindel from Databricks.