UnsafeFixedWidthAggregationMap
UnsafeFixedWidthAggregationMap
is a tiny layer (extension) around Spark Core’s BytesToBytesMap to allow for UnsafeRow keys and values.
Whenever requested for performance metrics (i.e. average number of probes per key lookup and peak memory used), UnsafeFixedWidthAggregationMap
simply requests the underlying BytesToBytesMap.
UnsafeFixedWidthAggregationMap
is created when:
-
HashAggregateExec
physical operator is requested to create a new UnsafeFixedWidthAggregationMap (whenHashAggregateExec
physical operator is requested to generate the Java source code for “produce” path in Whole-Stage Code Generation) -
TungstenAggregationIterator
is created (whenHashAggregateExec
physical operator is requested to execute in traditional / non-Whole-Stage-Code-Generation execution path)
Name | Description |
---|---|
|
Re-used pointer (as an UnsafeRow with the number of fields to match the aggregationBufferSchema) to the current aggregation buffer Used exclusively when |
|
|
|
UnsafeProjection for the groupingKeySchema (to encode grouping keys as UnsafeRows) |
|
Spark Core’s |
supportsAggregationBufferSchema
Static Method
1 2 3 4 5 |
boolean supportsAggregationBufferSchema(StructType schema) |
supportsAggregationBufferSchema
is a predicate that is enabled (true
) unless there is a field (in the fields of the input schema) whose data type is not mutable.
Note
|
The mutable data types: BooleanType, ByteType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, NullType, ShortType and TimestampType. Examples (possibly all) of data types that are not mutable: ArrayType, BinaryType, StringType, CalendarIntervalType, MapType, ObjectType and StructType. |
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap import org.apache.spark.sql.types._ val schemaWithImmutableField = StructType(StructField("string", StringType) :: Nil) assert(UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(schemaWithImmutableField) == false) val schemaWithMutableFields = StructType( StructField("int", IntegerType) :: StructField("bool", BooleanType) :: Nil) assert(UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(schemaWithMutableFields)) |
Note
|
supportsAggregationBufferSchema is used exclusively when HashAggregateExec is requested to supportsAggregate.
|
Creating UnsafeFixedWidthAggregationMap Instance
UnsafeFixedWidthAggregationMap
takes the following when created:
-
Empty aggregation buffer (as an InternalRow)
-
Aggregation buffer schema
-
Grouping key schema
UnsafeFixedWidthAggregationMap
initializes the internal registries and counters.
getAggregationBufferFromUnsafeRow
Method
1 2 3 4 5 6 |
UnsafeRow getAggregationBufferFromUnsafeRow(UnsafeRow key) (1) UnsafeRow getAggregationBufferFromUnsafeRow(UnsafeRow key, int hash) |
-
Uses the hash code of the key
getAggregationBufferFromUnsafeRow
requests the BytesToBytesMap to lookup
the input key
(to get a BytesToBytesMap.Location
).
getAggregationBufferFromUnsafeRow
…FIXME
Note
|
|
getAggregationBuffer
Method
1 2 3 4 5 |
UnsafeRow getAggregationBuffer(InternalRow groupingKey) |
getAggregationBuffer
…FIXME
Note
|
getAggregationBuffer seems to be used exclusively for testing.
|
Getting KVIterator — iterator
Method
1 2 3 4 5 |
KVIterator<UnsafeRow, UnsafeRow> iterator() |
iterator
…FIXME
Note
|
|
getPeakMemoryUsedBytes
Method
1 2 3 4 5 |
long getPeakMemoryUsedBytes() |
getPeakMemoryUsedBytes
…FIXME
Note
|
|
getAverageProbesPerLookup
Method
1 2 3 4 5 |
double getAverageProbesPerLookup() |
getAverageProbesPerLookup
…FIXME
Note
|
|
free
Method
1 2 3 4 5 |
void free() |
free
…FIXME
Note
|
|
destructAndCreateExternalSorter
Method
1 2 3 4 5 |
UnsafeKVExternalSorter destructAndCreateExternalSorter() throws IOException |
destructAndCreateExternalSorter
…FIXME
Note
|
|