UnsafeShuffleWriter — ShuffleWriter for SerializedShuffleHandle
UnsafeShuffleWriter
is a ShuffleWriter that is used to write records (i.e. key-value pairs).
UnsafeShuffleWriter
is chosen when SortShuffleManager
is requested for a ShuffleWriter
for a SerializedShuffleHandle.
UnsafeShuffleWriter
can use a specialized NIO-based merge procedure that avoids extra serialization/deserialization.
Name | Initial Value | Description |
---|---|---|
|
(uninitialized) |
Initialized when Used when |
Tip
|
Enable Add the following line to
Refer to Logging. |
mergeSpillsWithTransferTo
Method
Caution
|
FIXME |
forceSorterToSpill
Method
Caution
|
FIXME |
mergeSpills
Method
Caution
|
FIXME |
updatePeakMemoryUsed
Method
Caution
|
FIXME |
Writing Records — write
Method
1 2 3 4 5 |
void write(Iterator<Product2<K, V>> records) throws IOException |
Note
|
write is part of ShuffleWriter contract.
|
Internally, write
traverses the input sequence of records (for a RDD partition) and insertRecordIntoSorter one by one. When all the records have been processed, write
closes internal resources and writes spill files merged.
In the end, write
requests ShuffleExternalSorter
to clean after itself.
Caution
|
FIXME |
Stopping UnsafeShuffleWriter — stop
Method
1 2 3 4 5 |
Option<MapStatus> stop(boolean success) |
Caution
|
FIXME |
Note
|
stop is part of ShuffleWriter contract.
|
Creating UnsafeShuffleWriter Instance
UnsafeShuffleWriter
takes the following when created:
-
IndexShuffleBlockResolver
-
mapId
UnsafeShuffleWriter
makes sure that the number of shuffle output partitions (of the ShuffleDependency
of the input SerializedShuffleHandle
) is at most (1 << 24) - 1
, i.e. 16777215
.
Note
|
The number of shuffle output partitions is first enforced when SortShuffleManager checks if SerializedShuffleHandle can be used for ShuffleHandle (that eventually leads to UnsafeShuffleWriter ).
|
UnsafeShuffleWriter
uses spark.file.transferTo and spark.shuffle.sort.initialBufferSize Spark properties to initialize transferToEnabled
and initialSortBufferSize
attributes, respectively.
If the number of shuffle output partitions is greater than the maximum, UnsafeShuffleWriter
throws a IllegalArgumentException
.
1 2 3 4 5 |
UnsafeShuffleWriter can only be used for shuffles with at most 16777215 reduce partitions |
Note
|
UnsafeShuffleWriter is created exclusively when SortShuffleManager selects a ShuffleWriter (for a SerializedShuffleHandle).
|
Opening UnsafeShuffleWriter (i.e. Creating ShuffleExternalSorter and SerializationStream) — open
Internal Method
1 2 3 4 5 |
void open() throws IOException |
open
makes sure that the internal reference to ShuffleExternalSorter (as sorter
) is not defined and creates one itself.
open
creates a new byte array output stream (as serBuffer
) with the buffer capacity of 1M
.
open
creates a new SerializationStream for the new byte array output stream using SerializerInstance.
Note
|
SerializerInstance was defined when UnsafeShuffleWriter was created (and is exactly the one used to create the ShuffleDependency ).
|
Note
|
open is used exclusively when UnsafeShuffleWriter is created.
|
Inserting Record Into ShuffleExternalSorter — insertRecordIntoSorter
Method
1 2 3 4 5 6 |
void insertRecordIntoSorter(Product2<K, V> record) throws IOException |
insertRecordIntoSorter
calculates the partition for the key of the input record
.
Note
|
Partitioner is defined when UnsafeShuffleWriter is created.
|
insertRecordIntoSorter
then writes the key and the value of the input record
to SerializationStream and calculates the size of the serialized buffer.
Note
|
SerializationStream is created when UnsafeShuffleWriter opens.
|
In the end, insertRecordIntoSorter
inserts the serialized buffer to ShuffleExternalSorter
(as Platform.BYTE_ARRAY_OFFSET
).
Note
|
ShuffleExternalSorter is created when UnsafeShuffleWriter opens.
|
Note
|
insertRecordIntoSorter is used exclusively when UnsafeShuffleWriter writes records.
|
Closing Internal Resources and Writing Spill Files Merged — closeAndWriteOutput
Method
1 2 3 4 5 |
void closeAndWriteOutput() throws IOException |
closeAndWriteOutput
first updates peak memory used.
closeAndWriteOutput
removes the internal ByteArrayOutputStream
and SerializationStream.
closeAndWriteOutput
requests ShuffleExternalSorter
to close itself and return SpillInfo
metadata.
closeAndWriteOutput
removes the internal ShuffleExternalSorter
.
closeAndWriteOutput
requests IndexShuffleBlockResolver
for the data file for the shuffleId
and mapId
.
closeAndWriteOutput
creates a temporary file to merge spill files, deletes them afterwards, and requests IndexShuffleBlockResolver
to write index file and commit.
closeAndWriteOutput
creates a MapStatus with the location of the executor’s BlockManager
and partition lengths in the merged file.
If there is an issue with deleting spill files, you should see the following ERROR message in the logs:
1 2 3 4 5 |
ERROR Error while deleting spill file [path] |
If there is an issue with deleting the temporary file, you should see the following ERROR message in the logs:
1 2 3 4 5 |
ERROR Error while deleting temp file [path] |
Note
|
closeAndWriteOutput is used exclusively when UnsafeShuffleWriter writes records.
|
Settings
Spark Property | Default Value | Description |
---|---|---|
|
Controls whether…FIXME |
|
|
Default initial sort buffer size |