MapOutputTracker — Shuffle Map Output Registry
MapOutputTracker
is a Spark service that runs on the driver and executors that tracks the shuffle map outputs (with information about the BlockManager
and estimated size of the reduce blocks per shuffle).
Note
|
MapOutputTracker is registered as the MapOutputTracker RPC Endpoint in the RPC Environment when SparkEnv is created.
|
There are two concrete MapOutputTrackers
, i.e. one for the driver and another for executors:
-
MapOutputTrackerMaster for the driver
-
MapOutputTrackerWorker for executors
Given the different runtime environments of the driver and executors, accessing the current MapOutputTracker
is possible using SparkEnv.
1 2 3 4 5 |
SparkEnv.get.mapOutputTracker |
Name | Description |
---|---|
Internal cache with MapStatus array (indexed by partition id) per shuffle id. Used when |
|
Tracks the epoch in a Spark application. Starts from Can be updated (on |
|
MapOutputTracker
is also used for mapOutputTracker.containsShuffle
and MapOutputTrackerMaster.registerShuffle when a new ShuffleMapStage is created.
MapOutputTrackerMaster.getStatistics(dependency) returns MapOutputStatistics
that becomes the result of JobWaiter.taskSucceeded for ShuffleMapStage if it’s the final stage in a job.
MapOutputTrackerMaster.registerMapOutputs for a shuffle id and a list of MapStatus
when a ShuffleMapStage is finished.
Note
|
MapOutputTracker is used in BlockStoreShuffleReader and when creating BlockManager and BlockManagerSlaveEndpoint.
|
trackerEndpoint
Property
trackerEndpoint
is a RpcEndpointRef that MapOutputTracker
uses to send RPC messages.
trackerEndpoint
is initialized when SparkEnv
is created for the driver and executors and cleared when MapOutputTrackerMaster
is stopped.
Creating MapOutputTracker Instance
Caution
|
FIXME |
deserializeMapStatuses
Method
Caution
|
FIXME |
sendTracker
Method
Caution
|
FIXME |
serializeMapStatuses
Method
Caution
|
FIXME |
Computing Statistics for ShuffleDependency — getStatistics
Method
1 2 3 4 5 |
getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics |
getStatistics
returns a MapOutputStatistics
which is simply a pair of the shuffle id (of the input ShuffleDependency
) and the total sums of estimated sizes of the reduce shuffle blocks from all the BlockManagers.
Internally, getStatistics
finds map outputs for the input ShuffleDependency and calculates the total sizes for the estimated sizes of the reduce block (in bytes) for every MapStatus and partition.
Note
|
The internal totalSizes array has the number of elements as specified by the number of partitions of the Partitioner of the input ShuffleDependency . totalSizes contains elements as a sum of the estimated size of the block for partition in a BlockManager (for a MapStatus ).
|
Note
|
getStatistics is used when DAGScheduler accepts a ShuffleDependency for execution (and the corresponding ShuffleMapStage has already been computed) and gets notified that a ShuffleMapTask has completed (and map-stage jobs waiting for the stage are then marked as finished).
|
Computing BlockManagerIds with Their Blocks and Sizes — getMapSizesByExecutorId
Methods
1 2 3 4 5 6 7 8 9 |
getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) : Seq[(BlockManagerId, Seq[(BlockId, Long)])] getMapSizesByExecutorId(shuffleId: Int, reduceId: Int) : Seq[(BlockManagerId, Seq[(BlockId, Long)])] (1) |
-
Calls the other
getMapSizesByExecutorId
withendPartition
asreduceId + 1
and is used exclusively in tests.
Caution
|
FIXME How do the start and end partitions influence the return value? |
getMapSizesByExecutorId
returns a collection of BlockManagerIds with their blocks and sizes.
When executed, you should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG Fetching outputs for shuffle [id], partitions [startPartition]-[endPartition] |
getMapSizesByExecutorId
finds map outputs for the input shuffleId
.
Note
|
getMapSizesByExecutorId gets the map outputs for all the partitions (despite the method’s signature).
|
In the end, getMapSizesByExecutorId
converts shuffle map outputs (as MapStatuses
) into the collection of BlockManagerIds with their blocks and sizes.
Note
|
getMapSizesByExecutorId is exclusively used when BlockStoreShuffleReader reads combined records for a reduce task.
|
Returning Current Epoch — getEpoch
Method
1 2 3 4 5 |
getEpoch: Long |
getEpoch
returns the current epoch.
Note
|
getEpoch is used when DAGScheduler is notified that an executor was lost and when TaskSetManager is created (and sets the epoch for the tasks in a TaskSet).
|
Updating Epoch — updateEpoch
Method
1 2 3 4 5 |
updateEpoch(newEpoch: Long): Unit |
updateEpoch
updates epoch when the input newEpoch
is greater (and hence more recent) and clears the mapStatuses
internal cache.
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO MapOutputTrackerWorker: Updating epoch to [newEpoch] and clearing cache |
Note
|
updateEpoch is exclusively used when TaskRunner runs (for a task).
|
Unregistering Shuffle — unregisterShuffle
Method
1 2 3 4 5 |
unregisterShuffle(shuffleId: Int): Unit |
unregisterShuffle
unregisters shuffleId
, i.e. removes shuffleId
entry from the mapStatuses internal cache.
Note
|
unregisterShuffle is used when ContextCleaner removes a shuffle (blocks) from MapOutputTrackerMaster and BlockManagerMaster (aka shuffle cleanup) and when BlockManagerSlaveEndpoint handles RemoveShuffle message.
|
stop
Method
1 2 3 4 5 |
stop(): Unit |
stop
does nothing at all.
Note
|
stop is used exclusively when SparkEnv stops (and stops all the services, MapOutputTracker including).
|
Note
|
stop is overriden by MapOutputTrackerMaster.
|
Finding Map Outputs For ShuffleDependency
in Cache or Fetching Remotely — getStatuses
Internal Method
1 2 3 4 5 |
getStatuses(shuffleId: Int): Array[MapStatus] |
getStatuses
finds MapStatuses for the input shuffleId
in the mapStatuses internal cache and, when not available, fetches them from a remote MapOutputTrackerMaster (using RPC).
Internally, getStatuses
first queries the mapStatuses
internal cache and returns the map outputs if found.
If not found (in the mapStatuses
internal cache), you should see the following INFO message in the logs:
1 2 3 4 5 |
INFO Don't have map outputs for shuffle [id], fetching them |
If some other process fetches the map outputs for the shuffleId
(as recorded in fetching
internal registry), getStatuses
waits until it is done.
When no other process fetches the map outputs, getStatuses
registers the input shuffleId
in fetching
internal registry (of shuffle map outputs being fetched).
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO Doing the fetch; tracker endpoint = [trackerEndpoint] |
getStatuses
sends a GetMapOutputStatuses
RPC remote message for the input shuffleId
to the trackerEndpoint
expecting a Array[Byte]
.
Note
|
getStatuses requests shuffle map outputs remotely within a timeout and with retries. Refer to RpcEndpointRef.
|
getStatuses
deserializes the map output statuses and records the result in the mapStatuses
internal cache.
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO Got the output locations |
getStatuses
removes the input shuffleId
from fetching
internal registry.
You should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG Fetching map output statuses for shuffle [id] took [time] ms |
If getStatuses
could not find the map output locations for the input shuffleId
(locally and remotely), you should see the following ERROR message in the logs and throws a MetadataFetchFailedException
.
1 2 3 4 5 |
ERROR Missing all output locations for shuffle [id] |
Note
|
getStatuses is used when MapOutputTracker getMapSizesByExecutorId and computes statistics for ShuffleDependency .
|
Converting MapStatuses To BlockManagerIds with ShuffleBlockIds and Their Sizes — convertMapStatuses
Internal Method
1 2 3 4 5 6 7 8 9 |
convertMapStatuses( shuffleId: Int, startPartition: Int, endPartition: Int, statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])] |
convertMapStatuses
iterates over the input statuses
array (of MapStatus entries indexed by map id) and creates a collection of BlockManagerId (for each MapStatus
entry) with a ShuffleBlockId (with the input shuffleId
, a mapId
, and partition
ranging from the input startPartition
and endPartition
) and estimated size for the reduce block for every status and partitions.
For any empty MapStatus
, you should see the following ERROR message in the logs:
1 2 3 4 5 |
ERROR Missing an output location for shuffle [id] |
And convertMapStatuses
throws a MetadataFetchFailedException
(with shuffleId
, startPartition
, and the above error message).
Note
|
convertMapStatuses is exclusively used when MapOutputTracker computes BlockManagerId s with their ShuffleBlockId s and sizes.
|
Sending Blocking Messages To trackerEndpoint RpcEndpointRef — askTracker
Method
1 2 3 4 5 |
askTracker[T](message: Any): T |
askTracker
sends the message
to trackerEndpoint
RpcEndpointRef and waits for a result.
When an exception happens, you should see the following ERROR message in the logs and askTracker
throws a SparkException
.
1 2 3 4 5 |
ERROR Error communicating with MapOutputTracker |
Note
|
askTracker is used when MapOutputTracker fetches map outputs for ShuffleDependency remotely and sends a one-way message.
|