UnsafeFixedWidthAggregationMap
UnsafeFixedWidthAggregationMap is a tiny layer (extension) around Spark Core’s BytesToBytesMap to allow for UnsafeRow keys and values.
Whenever requested for performance metrics (i.e. average number of probes per key lookup and peak memory used), UnsafeFixedWidthAggregationMap simply requests the underlying BytesToBytesMap.
UnsafeFixedWidthAggregationMap is created when:
-
HashAggregateExecphysical operator is requested to create a new UnsafeFixedWidthAggregationMap (whenHashAggregateExecphysical operator is requested to generate the Java source code for “produce” path in Whole-Stage Code Generation) -
TungstenAggregationIteratoris created (whenHashAggregateExecphysical operator is requested to execute in traditional / non-Whole-Stage-Code-Generation execution path)
| Name | Description |
|---|---|
|
|
Re-used pointer (as an UnsafeRow with the number of fields to match the aggregationBufferSchema) to the current aggregation buffer Used exclusively when |
|
|
|
|
|
UnsafeProjection for the groupingKeySchema (to encode grouping keys as UnsafeRows) |
|
|
Spark Core’s |
supportsAggregationBufferSchema Static Method
|
1 2 3 4 5 |
boolean supportsAggregationBufferSchema(StructType schema) |
supportsAggregationBufferSchema is a predicate that is enabled (true) unless there is a field (in the fields of the input schema) whose data type is not mutable.
|
Note
|
The mutable data types: BooleanType, ByteType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, NullType, ShortType and TimestampType. Examples (possibly all) of data types that are not mutable: ArrayType, BinaryType, StringType, CalendarIntervalType, MapType, ObjectType and StructType. |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap import org.apache.spark.sql.types._ val schemaWithImmutableField = StructType(StructField("string", StringType) :: Nil) assert(UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(schemaWithImmutableField) == false) val schemaWithMutableFields = StructType( StructField("int", IntegerType) :: StructField("bool", BooleanType) :: Nil) assert(UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(schemaWithMutableFields)) |
|
Note
|
supportsAggregationBufferSchema is used exclusively when HashAggregateExec is requested to supportsAggregate.
|
Creating UnsafeFixedWidthAggregationMap Instance
UnsafeFixedWidthAggregationMap takes the following when created:
-
Empty aggregation buffer (as an InternalRow)
-
Aggregation buffer schema
-
Grouping key schema
UnsafeFixedWidthAggregationMap initializes the internal registries and counters.
getAggregationBufferFromUnsafeRow Method
|
1 2 3 4 5 6 |
UnsafeRow getAggregationBufferFromUnsafeRow(UnsafeRow key) (1) UnsafeRow getAggregationBufferFromUnsafeRow(UnsafeRow key, int hash) |
-
Uses the hash code of the key
getAggregationBufferFromUnsafeRow requests the BytesToBytesMap to lookup the input key (to get a BytesToBytesMap.Location).
getAggregationBufferFromUnsafeRow…FIXME
|
Note
|
|
getAggregationBuffer Method
|
1 2 3 4 5 |
UnsafeRow getAggregationBuffer(InternalRow groupingKey) |
getAggregationBuffer…FIXME
|
Note
|
getAggregationBuffer seems to be used exclusively for testing.
|
Getting KVIterator — iterator Method
|
1 2 3 4 5 |
KVIterator<UnsafeRow, UnsafeRow> iterator() |
iterator…FIXME
|
Note
|
|
getPeakMemoryUsedBytes Method
|
1 2 3 4 5 |
long getPeakMemoryUsedBytes() |
getPeakMemoryUsedBytes…FIXME
|
Note
|
|
getAverageProbesPerLookup Method
|
1 2 3 4 5 |
double getAverageProbesPerLookup() |
getAverageProbesPerLookup…FIXME
|
Note
|
|
free Method
|
1 2 3 4 5 |
void free() |
free…FIXME
|
Note
|
|
destructAndCreateExternalSorter Method
|
1 2 3 4 5 |
UnsafeKVExternalSorter destructAndCreateExternalSorter() throws IOException |
destructAndCreateExternalSorter…FIXME
|
Note
|
|
spark技术分享