ShuffleBlockFetcherIterator
ShuffleBlockFetcherIterator
is a Scala Iterator that fetches shuffle blocks (aka shuffle map outputs) from block managers.
ShuffleBlockFetcherIterator
is created exclusively when BlockStoreShuffleReader
is requested to read combined key-value records for a reduce task.
ShuffleBlockFetcherIterator
allows for iterating over a sequence of blocks as (BlockId, InputStream)
pairs so a caller can handle shuffle blocks in a pipelined fashion as they are received.
ShuffleBlockFetcherIterator
is exhausted (i.e. can provide no elements) when the number of blocks already processed is at least the total number of blocks to fetch.
ShuffleBlockFetcherIterator
throttles the remote fetches to avoid consuming too much memory.
Name | Description | ||
---|---|---|---|
|
The number of blocks fetched and consumed. |
||
|
Total number of blocks to fetch and consume.
|
||
Internal FIFO blocking queue (using Java’s java.util.concurrent.LinkedBlockingQueue) to hold Used in: 1. next to take one 2. sendRequest to put 3. fetchLocalBlocks (similarly to sendRequest) to put local fetch results, 4. cleanup to release managed buffers for |
|||
The maximum size (in bytes) of all the remote shuffle blocks to fetch. Set when |
|||
The maximum number of remote requests to fetch shuffle blocks. Set when |
|||
The bytes of fetched remote shuffle blocks in flight Starts at Incremented every sendRequest and decremented every next.
|
|||
The number of remote shuffle block fetch requests in flight. Starts at Incremented every sendRequest and decremented every next.
|
|||
Flag whether When enabled (when the task using |
|||
The currently-processed Set when |
Tip
|
Enable Add the following line to
Refer to Logging. |
fetchUpToMaxBytes
Method
Caution
|
FIXME |
Creating ShuffleBlockFetcherIterator Instance
When created, ShuffleBlockFetcherIterator
takes the following:
-
Blocks to fetch per BlockManager (as
Seq[(BlockManagerId, Seq[(BlockId, Long)])]
) -
Function to wrap the returned input stream (as
(BlockId, InputStream) ⇒ InputStream
) -
maxBytesInFlight — the maximum size (in bytes) of map outputs to fetch simultaneously from each reduce task (controlled by spark.reducer.maxSizeInFlight Spark property)
-
maxReqsInFlight — the maximum number of remote requests to fetch blocks at any given point (controlled by spark.reducer.maxReqsInFlight Spark property)
-
detectCorrupt
flag to detect any corruption in fetched blocks (controlled by spark.shuffle.detectCorrupt Spark property)
Initializing ShuffleBlockFetcherIterator — initialize
Internal Method
1 2 3 4 5 |
initialize(): Unit |
initialize
registers a task cleanup and fetches shuffle blocks from remote and local BlockManagers.
Internally, initialize
registers a TaskCompletionListener
(that will clean up right after the task finishes).
initialize
splitLocalRemoteBlocks.
As ShuffleBlockFetcherIterator
is in initialization phase, initialize
makes sure that reqsInFlight and bytesInFlight internal counters are both 0
. Otherwise, initialize
throws an exception.
initialize
fetches shuffle blocks (from remote BlockManagers).
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO ShuffleBlockFetcherIterator: Started [numFetches] remote fetches in [time] ms |
initialize
fetches local shuffle blocks.
You should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG ShuffleBlockFetcherIterator: Got local blocks in [time] ms |
Note
|
initialize is used exclusively when ShuffleBlockFetcherIterator is created.
|
Sending Remote Shuffle Block Fetch Request — sendRequest
Internal Method
1 2 3 4 5 |
sendRequest(req: FetchRequest): Unit |
Internally, when sendRequest
runs, you should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG ShuffleBlockFetcherIterator: Sending request for [blocks.size] blocks ([size] B) from [hostPort] |
sendRequest
increments bytesInFlight and reqsInFlight internal counters.
Note
|
The input FetchRequest contains the remote BlockManagerId address and the shuffle blocks to fetch (as a sequence of BlockId and their sizes).
|
sendRequest
requests ShuffleClient
to fetch shuffle blocks (from the host, the port, and the executor as defined in the input FetchRequest
).
Note
|
ShuffleClient was defined when ShuffleBlockFetcherIterator was created.
|
sendRequest
registers a BlockFetchingListener
with ShuffleClient
that:
-
For every successfully fetched shuffle block adds it as
SuccessFetchResult
to results internal queue. -
For every shuffle block fetch failure adds it as
FailureFetchResult
to results internal queue.
Note
|
sendRequest is used exclusively when ShuffleBlockFetcherIterator is requested to fetch remote shuffle blocks.
|
onBlockFetchSuccess Callback
1 2 3 4 5 |
onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit |
Internally, onBlockFetchSuccess
checks if the iterator is not zombie and does the further processing if it is not.
onBlockFetchSuccess
marks the input blockId
as received (i.e. removes it from all the blocks to fetch as requested in sendRequest).
onBlockFetchSuccess
adds the managed buf
(as SuccessFetchResult
) to results internal queue.
You should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG ShuffleBlockFetcherIterator: remainingBlocks: [blocks] |
Regardless of zombie state of ShuffleBlockFetcherIterator
, you should see the following TRACE message in the logs:
1 2 3 4 5 |
TRACE ShuffleBlockFetcherIterator: Got remote block [blockId] after [time] ms |
onBlockFetchFailure Callback
1 2 3 4 5 |
onBlockFetchFailure(blockId: String, e: Throwable): Unit |
When onBlockFetchFailure
is called, you should see the following ERROR message in the logs:
1 2 3 4 5 |
ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from [hostPort] |
onBlockFetchFailure
adds the block (as FailureFetchResult
) to results internal queue.
Throwing FetchFailedException (for ShuffleBlockId) — throwFetchFailedException
Internal Method
1 2 3 4 5 6 7 8 |
throwFetchFailedException( blockId: BlockId, address: BlockManagerId, e: Throwable): Nothing |
throwFetchFailedException
throws a FetchFailedException when the input blockId
is a ShuffleBlockId
.
Note
|
throwFetchFailedException creates a FetchFailedException passing on the root cause of a failure, i.e. the input e .
|
Otherwise, throwFetchFailedException
throws a SparkException
:
1 2 3 4 5 |
Failed to get block [blockId], which is not a shuffle block |
Note
|
throwFetchFailedException is used when ShuffleBlockFetcherIterator is requested for the next element.
|
Releasing Resources — cleanup
Internal Method
1 2 3 4 5 |
cleanup(): Unit |
Internally, cleanup
marks ShuffleBlockFetcherIterator
a zombie.
cleanup
releases the current result buffer.
cleanup
iterates over results internal queue and for every SuccessFetchResult
, increments remote bytes read and blocks fetched shuffle task metrics, and eventually releases the managed buffer.
Note
|
cleanup is used when ShuffleBlockFetcherIterator initializes itself.
|
Decrementing Reference Count Of and Releasing Result Buffer (for SuccessFetchResult) — releaseCurrentResultBuffer
Internal Method
1 2 3 4 5 |
releaseCurrentResultBuffer(): Unit |
releaseCurrentResultBuffer
decrements the currently-processed SuccessFetchResult
reference‘s buffer reference count if there is any.
releaseCurrentResultBuffer
releases currentResult.
Note
|
releaseCurrentResultBuffer is used when ShuffleBlockFetcherIterator releases resources and BufferReleasingInputStream closes.
|
fetchLocalBlocks
Internal Method
1 2 3 4 5 |
fetchLocalBlocks(): Unit |
fetchLocalBlocks
…FIXME
Note
|
fetchLocalBlocks is used when…FIXME
|
hasNext
Method
1 2 3 4 5 |
hasNext: Boolean |
Note
|
hasNext is part of Scala’s Iterator Contract to test whether this iterator can provide another element.
|
hasNext
is positive (true
) when numBlocksProcessed is less than numBlocksToFetch.
Otherwise, hasNext
is negative (false
).
splitLocalRemoteBlocks
Internal Method
1 2 3 4 5 |
splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] |
splitLocalRemoteBlocks
…FIXME
Note
|
splitLocalRemoteBlocks is used exclusively when ShuffleBlockFetcherIterator is requested to initialize.
|
next
Method
1 2 3 4 5 |
next(): (BlockId, InputStream) |
Note
|
next is part of Scala’s Iterator Contract to produce the next element of this iterator.
|
next
…FIXME