关注 spark技术分享,
撸spark源码 玩spark最佳实践

UnsafeFixedWidthAggregationMap

admin阅读(1595)

UnsafeFixedWidthAggregationMap

UnsafeFixedWidthAggregationMap is a tiny layer (extension) around Spark Core’s BytesToBytesMap to allow for UnsafeRow keys and values.

Whenever requested for performance metrics (i.e. average number of probes per key lookup and peak memory used), UnsafeFixedWidthAggregationMap simply requests the underlying BytesToBytesMap.

UnsafeFixedWidthAggregationMap is created when:

Table 1. UnsafeFixedWidthAggregationMap’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

currentAggregationBuffer

Re-used pointer (as an UnsafeRow with the number of fields to match the aggregationBufferSchema) to the current aggregation buffer

Used exclusively when UnsafeFixedWidthAggregationMap is requested to getAggregationBufferFromUnsafeRow.

emptyAggregationBuffer

Empty aggregation buffer (encoded in UnsafeRow format)

groupingKeyProjection

UnsafeProjection for the groupingKeySchema (to encode grouping keys as UnsafeRows)

map

Spark Core’s BytesToBytesMap with the taskMemoryManager, initialCapacity, pageSizeBytes and performance metrics enabled

supportsAggregationBufferSchema Static Method

supportsAggregationBufferSchema is a predicate that is enabled (true) unless there is a field (in the fields of the input schema) whose data type is not mutable.

Note
supportsAggregationBufferSchema is used exclusively when HashAggregateExec is requested to supportsAggregate.

Creating UnsafeFixedWidthAggregationMap Instance

UnsafeFixedWidthAggregationMap takes the following when created:

  • Empty aggregation buffer (as an InternalRow)

  • Aggregation buffer schema

  • Grouping key schema

  • Spark Core’s TaskMemoryManager

  • Initial capacity

  • Page size (in bytes)

UnsafeFixedWidthAggregationMap initializes the internal registries and counters.

getAggregationBufferFromUnsafeRow Method

  1. Uses the hash code of the key

getAggregationBufferFromUnsafeRow requests the BytesToBytesMap to lookup the input key (to get a BytesToBytesMap.Location).

getAggregationBufferFromUnsafeRow…​FIXME

Note

getAggregationBufferFromUnsafeRow is used when:

  • TungstenAggregationIterator is requested to processInputs (exclusively when TungstenAggregationIterator is created)

  • (for testing only) UnsafeFixedWidthAggregationMap is requested to getAggregationBuffer

getAggregationBuffer Method

getAggregationBuffer…​FIXME

Note
getAggregationBuffer seems to be used exclusively for testing.

Getting KVIterator — iterator Method

iterator…​FIXME

Note

iterator is used when:

  • HashAggregateExec physical operator is requested to finishAggregate

  • TungstenAggregationIterator is created (and pre-loads the first key-value pair from the map)

getPeakMemoryUsedBytes Method

getPeakMemoryUsedBytes…​FIXME

Note

getPeakMemoryUsedBytes is used when:

getAverageProbesPerLookup Method

getAverageProbesPerLookup…​FIXME

Note

getAverageProbesPerLookup is used when:

free Method

free…​FIXME

Note

free is used when:

destructAndCreateExternalSorter Method

destructAndCreateExternalSorter…​FIXME

Note

destructAndCreateExternalSorter is used when:

ExternalAppendOnlyUnsafeRowArray — Append-Only Array for UnsafeRows (with Disk Spill Threshold)

admin阅读(2012)

ExternalAppendOnlyUnsafeRowArray — Append-Only Array for UnsafeRows (with Disk Spill Threshold)

ExternalAppendOnlyUnsafeRowArray is an append-only array for UnsafeRows that spills content to disk when a predefined spill threshold of rows is reached.

Note
Choosing a proper spill threshold of rows is a performance optimization.

ExternalAppendOnlyUnsafeRowArray is created when:

  • WindowExec physical operator is executed (and creates an internal buffer for window frames)

  • WindowFunctionFrame is prepared

  • SortMergeJoinExec physical operator is executed (and creates a RowIterator for INNER and CROSS joins) and for getBufferedMatches

  • SortMergeJoinScanner creates an internal bufferedMatches

  • UnsafeCartesianRDD is computed

Table 1. ExternalAppendOnlyUnsafeRowArray’s Internal Registries and Counters
Name Description

initialSizeOfInMemoryBuffer

FIXME

Used when…​FIXME

inMemoryBuffer

FIXME

Can grow up to numRowsSpillThreshold rows (i.e. new UnsafeRows are added)

Used when…​FIXME

spillableArray

UnsafeExternalSorter

Used when…​FIXME

numRows

Used when…​FIXME

modificationsCount

Used when…​FIXME

numFieldsPerRow

Used when…​FIXME

Tip

Enable INFO logging level for org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

generateIterator Method

Caution
FIXME

add Method

Caution
FIXME
Note

add is used when:

clear Method

Caution
FIXME

Creating ExternalAppendOnlyUnsafeRowArray Instance

ExternalAppendOnlyUnsafeRowArray takes the following when created:

ExternalAppendOnlyUnsafeRowArray initializes the internal registries and counters.

CatalystSerde

admin阅读(2341)

CatalystSerde Helper Object

CatalystSerde is a Scala object that consists of three utility methods:

  1. deserialize to create a new logical plan with the input logical plan wrapped inside DeserializeToObject logical operator.

  2. serialize

  3. generateObjAttr

CatalystSerde and belongs to org.apache.spark.sql.catalyst.plans.logical package.

Creating Logical Plan with DeserializeToObject Logical Operator for Logical Plan — deserialize Method

deserialize creates a DeserializeToObject logical operator for the input child logical plan.

Internally, deserialize creates a UnresolvedDeserializer for the deserializer for the type T first and passes it on to a DeserializeToObject with a AttributeReference (being the result of generateObjAttr).

serialize Method

generateObjAttr Method

TungstenAggregationIterator — Iterator of UnsafeRows for HashAggregateExec Physical Operator

admin阅读(1424)

TungstenAggregationIterator — Iterator of UnsafeRows for HashAggregateExec Physical Operator

TungstenAggregationIterator is a AggregationIterator that the HashAggregateExec aggregate physical operator uses when executed (to process UnsafeRows per partition and calculate aggregations).

TungstenAggregationIterator prefers hash-based aggregation (before switching to sort-based aggregation).

When created, TungstenAggregationIterator gets SQL metrics from the HashAggregateExec aggregate physical operator being executed, i.e. numOutputRows, peakMemory, spillSize and avgHashProbe metrics.

The metrics are then displayed as part of HashAggregateExec aggregate physical operator (e.g. in web UI in Details for Query).

spark sql HashAggregateExec webui details for query.png
Figure 1. HashAggregateExec in web UI (Details for Query)
Table 1. TungstenAggregationIterator’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

aggregationBufferMapIterator

KVIterator[UnsafeRow, UnsafeRow]

Used when…​FIXME

hashMap

UnsafeFixedWidthAggregationMap with the following:

Used when TungstenAggregationIterator is requested for the next UnsafeRow, to outputForEmptyGroupingKeyWithoutInput, processInputs, to initialize the aggregationBufferMapIterator and every time a partition has been processed.

initialAggregationBuffer

UnsafeRow that is the aggregation buffer containing initial buffer values.

Used when…​FIXME

externalSorter

UnsafeKVExternalSorter used for sort-based aggregation

sortBased

Flag to indicate whether TungstenAggregationIterator uses sort-based aggregation (not hash-based aggregation).

sortBased flag is disabled (false) by default.

Enabled (true) when TungstenAggregationIterator is requested to switch to sort-based aggregation.

Used when…​FIXME

processInputs Internal Method

processInputs…​FIXME

Note
processInputs is used exclusively when TungstenAggregationIterator is created (and sets the internal flags to indicate whether to use a hash-based aggregation or, in the worst case, a sort-based aggregation when there is not enough memory for groups and their buffers).

Switching to Sort-Based Aggregation (From Preferred Hash-Based Aggregation) — switchToSortBasedAggregation Internal Method

switchToSortBasedAggregation…​FIXME

Note
switchToSortBasedAggregation is used exclusively when TungstenAggregationIterator is requested to processInputs (and the externalSorter is used).

Getting Next UnsafeRow — next Method

Note
next is part of Scala’s scala.collection.Iterator interface that returns the next element and discards it from the iterator.

next…​FIXME

hasNext Method

Note
hasNext is part of Scala’s scala.collection.Iterator interface that tests whether this iterator can provide another element.

hasNext…​FIXME

Creating TungstenAggregationIterator Instance

TungstenAggregationIterator takes the following when created:

Note
The SQL metrics (numOutputRows, peakMemory, spillSize and avgHashProbe) belong to the HashAggregateExec physical operator that created the TungstenAggregationIterator.

TungstenAggregationIterator initializes the internal registries and counters.

TungstenAggregationIterator starts processing input rows and pre-loads the first key-value pair from the UnsafeFixedWidthAggregationMap if did not switch to sort-based aggregation.

generateResultProjection Method

Note
generateResultProjection is part of the AggregationIterator Contract to…​FIXME.

generateResultProjection…​FIXME

Creating UnsafeRow — outputForEmptyGroupingKeyWithoutInput Method

outputForEmptyGroupingKeyWithoutInput…​FIXME

Note
outputForEmptyGroupingKeyWithoutInput is used when…​FIXME

TaskCompletionListener

TungstenAggregationIterator registers a TaskCompletionListener that is executed on task completion (for every task that processes a partition).

When executed (once per partition), the TaskCompletionListener updates the following metrics:

SortBasedAggregationIterator

admin阅读(1619)

SortBasedAggregationIterator

SortBasedAggregationIterator is…​FIXME

next Method

Note
next is part of Scala’s scala.collection.Iterator interface that returns the next element and discards it from the iterator.

next…​FIXME

outputForEmptyGroupingKeyWithoutInput Method

outputForEmptyGroupingKeyWithoutInput…​FIXME

Note
outputForEmptyGroupingKeyWithoutInput is used when…​FIXME

newBuffer Internal Method

newBuffer…​FIXME

Note
newBuffer is used when…​FIXME

ObjectAggregationIterator

admin阅读(1621)

ObjectAggregationIterator

ObjectAggregationIterator is…​FIXME

next Method

Note
next is part of Scala’s scala.collection.Iterator interface that returns the next element and discards it from the iterator.

next…​FIXME

outputForEmptyGroupingKeyWithoutInput Method

outputForEmptyGroupingKeyWithoutInput…​FIXME

Note
outputForEmptyGroupingKeyWithoutInput is used when…​FIXME

AggregationIterator — Generic Iterator of UnsafeRows for Aggregate Physical Operators

admin阅读(1503)

AggregationIterator — Generic Iterator of UnsafeRows for Aggregate Physical Operators

AggregationIterator is the base for iterators of UnsafeRows that…​FIXME

Iterators are data structures that allow to iterate over a sequence of elements. They have a hasNext method for checking if there is a next element available, and a next method which returns the next element and discards it from the iterator.

Table 1. AggregationIterator’s Implementations
Name Description

ObjectAggregationIterator

Used exclusively when ObjectHashAggregateExec physical operator is executed.

SortBasedAggregationIterator

Used exclusively when SortAggregateExec physical operator is executed.

TungstenAggregationIterator

Used exclusively when HashAggregateExec physical operator is executed.

Note
HashAggregateExec operator is the preferred aggregate physical operator for Aggregation execution planning strategy (over ObjectHashAggregateExec and SortAggregateExec).
Table 2. AggregationIterator’s Internal Registries and Counters
Name Description

aggregateFunctions

Aggregate functions

Used when…​FIXME

allImperativeAggregateFunctions

ImperativeAggregate functions

Used when…​FIXME

allImperativeAggregateFunctionPositions

Positions

Used when…​FIXME

expressionAggInitialProjection

MutableProjection

Used when…​FIXME

generateOutput

Function used to generate an unsafe row (i.e. (UnsafeRow, InternalRow) ⇒ UnsafeRow)

Used when:

groupingAttributes

Grouping attributes

Used when…​FIXME

groupingProjection

UnsafeProjection

Used when…​FIXME

processRow

(InternalRow, InternalRow) ⇒ Unit

Used when…​FIXME

Creating AggregationIterator Instance

AggregationIterator takes the following when created:

AggregationIterator initializes the internal registries and counters.

Note
AggregationIterator is a Scala abstract class and cannot be created directly. It is created indirectly for the concrete AggregationIterators.

initializeAggregateFunctions Internal Method

initializeAggregateFunctions…​FIXME

Note
initializeAggregateFunctions is used when…​FIXME

generateProcessRow Internal Method

generateProcessRow…​FIXME

Note
generateProcessRow is used when…​FIXME

generateResultProjection Method

generateResultProjection…​FIXME

Note

generateResultProjection is used when:

UnsafeRow — Mutable Raw-Memory Unsafe Binary Row Format

admin阅读(1661)

UnsafeRow — Mutable Raw-Memory Unsafe Binary Row Format

UnsafeRow is a concrete InternalRow that represents a mutable internal raw-memory (and hence unsafe) binary row format.

In other words, UnsafeRow is an InternalRow that is backed by raw memory instead of Java objects.

UnsafeRow knows its size in bytes.

UnsafeRow supports Java’s Externalizable and Kryo’s KryoSerializable serialization/deserialization protocols.

The fields of a data row are placed using field offsets.

UnsafeRow considers a data type mutable if it is one of the following:

UnsafeRow is composed of three regions:

  1. Null Bit Set Bitmap Region (1 bit/field) for tracking null values

  2. Fixed-Length 8-Byte Values Region

  3. Variable-Length Data Section

That gives the property of rows being always 8-byte word aligned and so their size is always a multiple of 8 bytes.

Equality comparision and hashing of rows can be performed on raw bytes since if two rows are identical so should be their bit-wise representation. No type-specific interpretation is required.

isMutable Static Predicate

isMutable is enabled (true) when the input DataType is among the mutable field types or a DecimalType.

Otherwise, isMutable is disabled (false).

Note

isMutable is used when:

Kryo’s KryoSerializable SerDe Protocol

Tip
Read up on KryoSerializable.

Serializing JVM Object — KryoSerializable’s write Method

Deserializing Kryo-Managed Object — KryoSerializable’s read Method

Java’s Externalizable SerDe Protocol

Tip
Read up on java.io.Externalizable.

Serializing JVM Object — Externalizable’s writeExternal Method

Deserializing Java-Externalized Object — Externalizable’s readExternal Method

pointTo Method

pointTo…​FIXME

Note
pointTo is used when…​FIXME

InternalRow — Abstract Binary Row Format

admin阅读(1893)

InternalRow — Abstract Binary Row Format

Note
InternalRow is also called Catalyst row or Spark SQL row.
Note
UnsafeRow is a concrete InternalRow.

There are methods to create InternalRow objects using the factory methods in the InternalRow object.

getString Method

Caution
FIXME

Tungsten Execution Backend (Project Tungsten)

admin阅读(1518)

Tungsten Execution Backend (Project Tungsten)

The goal of Project Tungsten is to improve Spark execution by optimizing Spark jobs for CPU and memory efficiency (as opposed to network and disk I/O which are considered fast enough). Tungsten focuses on the hardware architecture of the platform Spark runs on, including but not limited to JVM, LLVM, GPU, NVRAM, etc. It does so by offering the following optimization features:

  1. Off-Heap Memory Management using binary in-memory data representation aka Tungsten row format and managing memory explicitly,

  2. Cache Locality which is about cache-aware computations with cache-aware layout for high cache hit rates,

  3. Whole-Stage Code Generation (aka CodeGen).

Important
Project Tungsten uses sun.misc.unsafe API for direct memory access to bypass the JVM in order to avoid garbage collection.

spark sql tungsten webui storage.png
Figure 1. RDD vs DataFrame Size in Memory in web UI — Thank you, Tungsten!

Off-Heap Memory Management

Project Tungsten aims at substantially reducing the usage of JVM objects (and therefore JVM garbage collection) by introducing its own off-heap binary memory management. Instead of working with Java objects, Tungsten uses sun.misc.Unsafe to manipulate raw memory.

Tungsten uses the compact storage format called UnsafeRow for data representation that further reduces memory footprint.

Since Datasets have known schema, Tungsten properly and in a more compact and efficient way lays out the objects on its own. That brings benefits similar to using extensions written in low-level and hardware-aware languages like C or assembler.

It is possible immediately with the data being already serialized (that further reduces or completely avoids serialization between JVM object representation and Spark’s internal one).

Cache Locality

Tungsten uses algorithms and cache-aware data structures that exploit the physical machine caches at different levels – L1, L2, L3.

Whole-Stage Java Code Generation

Tungsten does code generation at compile time and generates JVM bytecode to access Tungsten-managed memory structures that gives a very fast access. It uses the Janino compiler — a super-small, super-fast Java compiler.

Note
The code generation was tracked under SPARK-8159 Improve expression function coverage (Spark 1.5).

关注公众号:spark技术分享

联系我们联系我们