关注 spark技术分享,
撸spark源码 玩spark最佳实践

QueryExecutionListener

admin阅读(2327)

QueryExecutionListener

QueryExecutionListener is…​FIXME

SQLListener Spark Listener

admin阅读(2081)

SQLListener Spark Listener

SQLListener is a custom SparkListener that collects information about SQL query executions for web UI (to display in SQL tab). It relies on spark.sql.execution.id key to distinguish between queries.

Internally, it uses SQLExecutionUIData data structure exclusively to record all the necessary data for a single SQL query execution. SQLExecutionUIData is tracked in the internal registries, i.e. activeExecutions, failedExecutions, and completedExecutions as well as lookup tables, i.e. _executionIdToData, _jobIdToExecutionId, and _stageIdToStageMetrics.

SQLListener starts recording a query execution by intercepting a SparkListenerSQLExecutionStart event (using onOtherEvent callback).

SQLListener stops recording information about a SQL query execution when SparkListenerSQLExecutionEnd event arrives.

It defines the other callbacks (from SparkListener interface):

Registering Job and Stages under Active Execution — onJobStart Callback

onJobStart reads the spark.sql.execution.id key, the identifiers of the job and the stages and then updates the SQLExecutionUIData for the execution id in activeExecutions internal registry.

Note
When onJobStart is executed, it is assumed that SQLExecutionUIData has already been created and available in the internal activeExecutions registry.

The job in SQLExecutionUIData is marked as running with the stages added (to stages). For each stage, a SQLStageMetrics is created in the internal _stageIdToStageMetrics registry. At the end, the execution id is recorded for the job id in the internal _jobIdToExecutionId.

onOtherEvent Callback

In onOtherEvent, SQLListener listens to the following SparkListenerEvent events:

Registering Active Execution — SparkListenerSQLExecutionStart Event

SparkListenerSQLExecutionStart events starts recording information about the executionId SQL query execution.

When a SparkListenerSQLExecutionStart event arrives, a new SQLExecutionUIData for the executionId query execution is created and stored in activeExecutions internal registry. It is also stored in _executionIdToData lookup table.

SparkListenerSQLExecutionEnd Event

SparkListenerSQLExecutionEnd event stops recording information about the executionId SQL query execution (tracked as SQLExecutionUIData). SQLListener saves the input time as completionTime.

If there are no other running jobs (registered in SQLExecutionUIData), the query execution is removed from the activeExecutions internal registry and moved to either completedExecutions or failedExecutions registry.

This is when SQLListener checks the number of SQLExecutionUIData entires in either registry — failedExecutions or completedExecutions — and removes the excess of the old entries beyond spark.sql.ui.retainedExecutions Spark property.

SparkListenerDriverAccumUpdates Event

When SparkListenerDriverAccumUpdates comes, SQLExecutionUIData for the input executionId is looked up (in _executionIdToData) and SQLExecutionUIData.driverAccumUpdates is updated with the input accumUpdates.

onJobEnd Callback

When called, onJobEnd retrieves the SQLExecutionUIData for the job and records it either successful or failed depending on the job result.

If it is the last job of the query execution (tracked as SQLExecutionUIData), the execution is removed from activeExecutions internal registry and moved to either

If the query execution has already been marked as completed (using completionTime) and there are no other running jobs (registered in SQLExecutionUIData), the query execution is removed from the activeExecutions internal registry and moved to either completedExecutions or failedExecutions registry.

This is when SQLListener checks the number of SQLExecutionUIData entires in either registry — failedExecutions or completedExecutions — and removes the excess of the old entries beyond spark.sql.ui.retainedExecutions Spark property.

Getting SQL Execution Data — getExecution Method

Getting Execution Metrics — getExecutionMetrics Method

getExecutionMetrics gets the metrics (aka accumulator updates) for executionId (by which it collects all the tasks that were used for an execution).

It is exclusively used to render the ExecutionPage page in web UI.

mergeAccumulatorUpdates Method

mergeAccumulatorUpdates is a private helper method for…​TK

It is used exclusively in getExecutionMetrics method.

SQLExecutionUIData

SQLExecutionUIData is the data abstraction of SQLListener to describe SQL query executions. It is a container for jobs, stages, and accumulator updates for a single query execution.

SQL Tab — Monitoring Structured Queries in web UI

admin阅读(1992)

SQL Tab — Monitoring Structured Queries in web UI

SQL tab in web UI shows SQLMetrics per physical operator in a structured query physical plan.

You can access the SQL tab under /SQL URL, e.g. http://localhost:4040/SQL/.

By default, it displays all SQL query executions. However, after a query has been selected, the SQL tab displays the details for the structured query execution.

AllExecutionsPage

AllExecutionsPage displays all SQL query executions in a Spark application per state sorted by their submission time reversed.

spark webui sql.png
Figure 1. SQL Tab in web UI (AllExecutionsPage)

Internally, the page requests SQLListener for query executions in running, completed, and failed states (the states correspond to the respective tables on the page).

ExecutionPage — Details for Query

ExecutionPage shows details for structured query execution by id.

Note
The id request parameter is mandatory.

ExecutionPage displays a summary with Submitted Time, Duration, the clickable identifiers of the Running Jobs, Succeeded Jobs, and Failed Jobs.

It also display a visualization (using accumulator updates and the SparkPlanGraph for the query) with the expandable Details section (that corresponds to SQLExecutionUIData.physicalPlanDescription).

spark webui sql execution graph.png
Figure 2. Details for Query in web UI

If there is no information to display for a given query id, you should see the following page.

spark webui sql no details for query.png
Figure 3. No Details for SQL Query

Internally, it uses SQLListener exclusively to get the SQL query execution metrics. It requests SQLListener for SQL execution data to display for the id request parameter.

Creating SQLTab Instance

SQLTab is created when SharedState is or at the first SparkListenerSQLExecutionStart event when Spark History Server is used.

spark SQLTab creating instance.png
Figure 4. Creating SQLTab Instance
Note
SharedState represents the shared state across SparkSessions.

ShuffledRowRDD

admin阅读(1801)

ShuffledRowRDD

ShuffledRowRDD is an RDD of internal binary rows (i.e. RDD[InternalRow]).

Note
ShuffledRowRDD looks like ShuffledRDD, and the difference is in the type of the values to process, i.e. InternalRow and (K, C) key-value pairs, respectively.

ShuffledRowRDD takes a ShuffleDependency (of integer keys and InternalRow values).

Note
The dependency property is mutable and is of type ShuffleDependency[Int, InternalRow, InternalRow].

ShuffledRowRDD takes an optional specifiedPartitionStartIndices collection of integers that is the number of post-shuffle partitions. When not specified, the number of post-shuffle partitions is managed by the Partitioner of the input ShuffleDependency.

Note
Post-shuffle partition is…​FIXME
Table 1. ShuffledRowRDD and RDD Contract
Name Description

getDependencies

A single-element collection with ShuffleDependency[Int, InternalRow, InternalRow].

partitioner

CoalescedPartitioner (with the Partitioner of the dependency)

getPreferredLocations

compute

numPreShufflePartitions Property

Caution
FIXME

Computing Partition (in TaskContext) — compute Method

Note
compute is part of Spark Core’s RDD Contract to compute a partition (in a TaskContext).

Internally, compute makes sure that the input split is a ShuffledRowRDDPartition. It then requests ShuffleManager for a ShuffleReader to read InternalRows for the split.

Note
compute uses ShuffleHandle (of ShuffleDependency dependency) and the pre-shuffle start and end partition offsets.

Getting Placement Preferences of Partition — getPreferredLocations Method

Note
getPreferredLocations is part of RDD contract to specify placement preferences (aka preferred task locations), i.e. where tasks should be executed to be as close to the data as possible.

Internally, getPreferredLocations requests MapOutputTrackerMaster for the preferred locations of the input partition (for the single ShuffleDependency).

Note
getPreferredLocations uses SparkEnv to access the current MapOutputTrackerMaster (which runs on the driver).

CoalescedPartitioner

Caution
FIXME

ShuffledRowRDDPartition

Caution
FIXME

FileScanRDD — Input RDD of FileSourceScanExec Physical Operator

admin阅读(2364)

FileScanRDD — Input RDD of FileSourceScanExec Physical Operator

FileScanRDD is an RDD of internal binary rows (i.e. RDD[InternalRow]) that is the one and only input RDD of FileSourceScanExec physical operator.

FileScanRDD is created exclusively when FileSourceScanExec physical operator is requested to createBucketedReadRDD and createNonBucketedReadRDD (which is when FileSourceScanExec is requested for the input RDD that WholeStageCodegenExec physical operator uses when executed).

Table 1. FileScanRDD’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

ignoreCorruptFiles

spark.sql.files.ignoreCorruptFiles

Used exclusively when FileScanRDD is requested to compute a partition

ignoreMissingFiles

spark.sql.files.ignoreMissingFiles

Used exclusively when FileScanRDD is requested to compute a partition

getPreferredLocations Method

Note
getPreferredLocations is part of the RDD Contract to…​FIXME.

getPreferredLocations…​FIXME

getPartitions Method

Note
getPartitions is part of the RDD Contract to…​FIXME.

getPartitions…​FIXME

Creating FileScanRDD Instance

FileScanRDD takes the following when created:

Computing Partition (in TaskContext) — compute Method

Note
compute is part of Spark Core’s RDD Contract to compute a partition (in a TaskContext).

compute creates a Scala Iterator (of Java Objects) that…​FIXME

compute then requests the input TaskContext to register a completion listener to be executed when a task completes (i.e. addTaskCompletionListener) that simply closes the iterator.

In the end, compute returns the iterator.

LocalDateTimeEncoder — Custom ExpressionEncoder for java.time.LocalDateTime

admin阅读(949)

LocalDateTimeEncoder — Custom ExpressionEncoder for java.time.LocalDateTime

Spark SQL does not support java.time.LocalDateTime values in a Dataset.

As it is clearly said in the exception, the root cause is no Encoder found for java.time.LocalDateTime (as there is not one available in Spark SQL).

You could define one using ExpressionEncoder, but that does not seem to work either.

The simplest solution is to transform the Dataset with java.time.LocalDateTime to a supported type that Spark SQL offers an encoder for.

A much better solution would be to provide a custom Encoder that would expand the types supported in Spark SQL.

LocalDateTimeEncoder is an attempt to develop a custom ExpressionEncoder for Java’s java.time.LocalDateTime so you don’t have to map values to another supported type.

public final class LocalDateTime

A date-time without a time-zone in the ISO-8601 calendar system, such as 2007-12-03T10:15:30.

LocalDateTime is an immutable date-time object that represents a date-time, often viewed as year-month-day-hour-minute-second.

Open Questions

  1. ScalaReflection.serializerFor passes ObjectType objects through

  2. ScalaReflection.serializerFor uses StaticInvoke for java.sql.Timestamp and java.sql.Date.

  3. How could SQLUserDefinedType and UDTRegistration help here?

RowEncoder — Encoder for DataFrames

admin阅读(2543)

RowEncoder — Encoder for DataFrames

RowEncoder is part of the Encoder framework and acts as the encoder for DataFrames, i.e. Dataset[Row] — Datasets of Rows.

Note
DataFrame type is a mere type alias for Dataset[Row] that expects a Encoder[Row] available in scope which is indeed RowEncoder itself.

RowEncoder is an object in Scala with apply and other factory methods.

RowEncoder can create ExpressionEncoder[Row] from a schema (using apply method).

RowEncoder object belongs to org.apache.spark.sql.catalyst.encoders package.

Creating ExpressionEncoder For Row Type — apply method

apply builds ExpressionEncoder of Row, i.e. ExpressionEncoder[Row], from the input StructType (as schema).

Internally, apply creates a BoundReference for the Row type and returns a ExpressionEncoder[Row] for the input schema, a CreateNamedStruct serializer (using serializerFor internal method), a deserializer for the schema, and the Row type.

serializerFor Internal Method

serializerFor creates an Expression that is assumed to be CreateNamedStruct.

serializerFor takes the input inputType and:

  1. Returns the input inputObject as is for native types, i.e. NullType, BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, BinaryType, CalendarIntervalType.

    Caution
    FIXME What does being native type mean?
  2. For UserDefinedTypes, it takes the UDT class from the SQLUserDefinedType annotation or UDTRegistration object and returns an expression with Invoke to call serialize method on a NewInstance of the UDT class.

  3. For TimestampType, it returns an expression with a StaticInvoke to call fromJavaTimestamp on DateTimeUtils class.

  4. …​FIXME

Caution
FIXME Describe me.

ExpressionEncoder — Expression-Based Encoder

admin阅读(1592)

ExpressionEncoder — Expression-Based Encoder

ExpressionEncoder[T] is a generic Encoder of JVM objects of the type T to and from internal binary rows.

ExpressionEncoder[T] uses expressions for a serializer and a deserializer.

Note
ExpressionEncoder is the only supported implementation of Encoder which is explicitly enforced when Dataset is created (even though Dataset data structure accepts a bare Encoder[T]).

ExpressionEncoder uses serializer expressions to encode (aka serialize) a JVM object of type T to an internal binary row format (i.e. InternalRow).

Note
It is assumed that all serializer expressions contain at least one and the same BoundReference.

ExpressionEncoder uses a deserializer expression to decode (aka deserialize) a JVM object of type T from internal binary row format.

ExpressionEncoder is flat when serializer uses a single expression (which also means that the objects of a type T are not created using constructor parameters only like Product or DefinedByConstructorParams types).

Internally, a ExpressionEncoder creates a UnsafeProjection (for the input serializer), a InternalRow (of size 1), and a safe Projection (for the input deserializer). They are all internal lazy attributes of the encoder.

Table 1. ExpressionEncoder’s (Lazily-Initialized) Internal Properties
Property Description

constructProjection

Projection generated for the deserializer expression

Used exclusively when ExpressionEncoder is requested for a JVM object from a Spark SQL row (i.e. InternalRow).

extractProjection

UnsafeProjection generated for the serializer expressions

Used exclusively when ExpressionEncoder is requested for an encoded version of a JVM object as a Spark SQL row (i.e. InternalRow).

inputRow

GenericInternalRow (with the underlying storage array) of size 1 (i.e. it can only store a single JVM object of any type).

Used…​FIXME

Note
Encoders object contains the default ExpressionEncoders for Scala and Java primitive types, e.g. boolean, long, String, java.sql.Date, java.sql.Timestamp, Array[Byte].

Creating ExpressionEncoder — apply Method

Caution
FIXME

Creating ExpressionEncoder Instance

ExpressionEncoder takes the following when created:

  • Schema

  • Flag whether ExpressionEncoder is flat or not

  • Serializer expressions (to convert objects of type T to internal rows)

  • Deserializer expression (to convert internal rows to objects of type T)

  • Scala’s ClassTag for the JVM type T

Creating Deserialize Expression — ScalaReflection.deserializerFor Method

deserializerFor creates an expression to deserialize from internal binary row format to a Scala object of type T.

Internally, deserializerFor calls the recursive internal variant of deserializerFor with a single-element walked type path with - root class: "[clsName]"

Tip
Read up on Scala’s TypeTags in TypeTags and Manifests.
Note
deserializerFor is used exclusively when ExpressionEncoder is created for a Scala type T.

Recursive Internal deserializerFor Method

Table 2. JVM Types and Deserialize Expressions (in evaluation order)
JVM Type (Scala or Java) Deserialize Expressions

Option[T]

java.lang.Integer

java.lang.Long

java.lang.Double

java.lang.Float

java.lang.Short

java.lang.Byte

java.lang.Boolean

java.sql.Date

java.sql.Timestamp

java.lang.String

java.math.BigDecimal

scala.BigDecimal

java.math.BigInteger

scala.math.BigInt

Array[T]

Seq[T]

Map[K, V]

SQLUserDefinedType

User Defined Types (UDTs)

Product (including Tuple) or DefinedByConstructorParams

Creating Serialize Expression — ScalaReflection.serializerFor Method

serializerFor creates a CreateNamedStruct expression to serialize a Scala object of type T to internal binary row format.

Internally, serializerFor calls the recursive internal variant of serializerFor with a single-element walked type path with - root class: "[clsName]" and pattern match on the result expression.

Caution
FIXME the pattern match part
Tip
Read up on Scala’s TypeTags in TypeTags and Manifests.
Note
serializerFor is used exclusively when ExpressionEncoder is created for a Scala type T.

Recursive Internal serializerFor Method

serializerFor creates an expression for serializing an object of type T to an internal row.

Caution
FIXME

Encoding JVM Object to Internal Binary Row Format — toRow Method

toRow encodes (aka serializes) a JVM object t as an internal binary row.

Internally, toRow sets the only JVM object to be t in inputRow and converts the inputRow to a unsafe binary row (using extractProjection).

In case of any exception while serializing, toRow reports a RuntimeException:

Note

toRow is mostly used when SparkSession is requested for:

Decoding JVM Object From Internal Binary Row Format — fromRow Method

fromRow decodes (aka deserializes) a JVM object from a row InternalRow (with the required values only).

Internally, fromRow uses constructProjection with row and gets the 0th element of type ObjectType that is then cast to the output type T.

In case of any exception while deserializing, fromRow reports a RuntimeException:

Note

fromRow is used for:

  • Dataset operators, i.e. head, collect, collectAsList, toLocalIterator

  • Structured Streaming’s ForeachSink

Creating ExpressionEncoder For Tuple — tuple Method

tuple…​FIXME

Note
tuple is used when…​FIXME

resolveAndBind Method

resolveAndBind…​FIXME

Note

resolveAndBind is used when:

  • RowToUnsafeRowDataReaderFactory is requested to create a DataReader

  • InternalRowDataWriterFactory is requested to create a DataWriter

  • Dataset is requested for the deserializer expression (to convert internal rows to objects of type T)

  • TypedAggregateExpression is created

  • JdbcUtils is requested to resultSetToRows

  • Spark Structured Streaming’s FlatMapGroupsWithStateExec physical operator is requested for the state deserializer (i.e. stateDeserializer)

  • Spark Structured Streaming’s ForeachSink is requested to add a streaming batch (i.e. addBatch)

Encoders Factory Object

admin阅读(1304)

Encoders Factory Object

Encoders is a factory object that…​FIXME

Creating Encoder Using Kryo — kryo Method

kryo simply creates an encoder that serializes objects of type T using Kryo (i.e. the useKryo flag is enabled).

Note
kryo is used when…​FIXME

Creating Encoder Using Java Serialization — javaSerialization Method

javaSerialization simply creates an encoder that serializes objects of type T using the generic Java serialization (i.e. the useKryo flag is disabled).

Note
javaSerialization is used when…​FIXME

Creating Generic Encoder — genericSerializer Internal Method

genericSerializer…​FIXME

Note
genericSerializer is used when Encoders is requested for a generic encoder using Kryo and Java Serialization.

Encoder — Internal Row Converter

admin阅读(2275)

Encoder — Internal Row Converter

Encoder is the fundamental concept in the serialization and deserialization (SerDe) framework in Spark SQL 2.0. Spark SQL uses the SerDe framework for IO to make it efficient time- and space-wise.

Tip
Spark has borrowed the idea from the Hive SerDe library so it might be worthwhile to get familiar with Hive a little bit, too.

Encoders are modelled in Spark SQL 2.0 as Encoder[T] trait.

The type T stands for the type of records a Encoder[T] can deal with. An encoder of type T, i.e. Encoder[T], is used to convert (encode and decode) any JVM object or primitive of type T (that could be your domain object) to and from Spark SQL’s InternalRow which is the internal binary row format representation (using Catalyst expressions and code generation).

Note
Encoder is also called “a container of serde expressions in Dataset”.
Note
The one and only implementation of the Encoder trait in Spark SQL 2 is ExpressionEncoder.

Encoders are integral (and internal) part of any Dataset[T] (of records of type T) with a Encoder[T] that is used to serialize and deserialize the records of this dataset.

Note
Dataset[T] type is a Scala type constructor with the type parameter T. So is Encoder[T] that handles serialization and deserialization of T to the internal representation.

Encoders know the schema of the records. This is how they offer significantly faster serialization and deserialization (comparing to the default Java or Kryo serializers).

You can create custom encoders using static methods of Encoders object. Note however that encoders for common Scala types and their product types are already available in implicits object.

Tip
The default encoders are already imported in spark-shell.

Encoders map columns (of your dataset) to fields (of your JVM object) by name. It is by Encoders that you can bridge JVM objects to data sources (CSV, JDBC, Parquet, Avro, JSON, Cassandra, Elasticsearch, memsql) and vice versa.

Note
In Spark SQL 2.0 DataFrame type is a mere type alias for Dataset[Row] with RowEncoder being the encoder.

Creating Custom Encoders (Encoders object)

Encoders factory object defines methods to create Encoder instances.

Import org.apache.spark.sql package to have access to the Encoders factory object.

You can find methods to create encoders for Java’s object types, e.g. Boolean, Integer, Long, Double, String, java.sql.Timestamp or Byte array, that could be composed to create more advanced encoders for Java bean classes (using bean method).

You can also create encoders based on Kryo or Java serializers.

You can create encoders for Scala’s tuples and case classes, Int, Long, Double, etc.

Further Reading and Watching

关注公众号:spark技术分享

联系我们联系我们