TorrentBroadcast — BroadcastFactory With BitTorrent-Like Protocol For Block Distribution
TorrentBroadcast
is a BroadcastFactory that uses a BitTorrent-like protocol for block distribution (that only happens when tasks access broadcast variables on executors).
TorrentBroadcast
is created exclusively when TorrentBroadcastFactory
is requested to newBroadcast.
When a broadcast variable is created (using SparkContext.broadcast
) on the driver, a new instance of TorrentBroadcast
is created.
1 2 3 4 5 6 7 8 |
// On the driver val sc: SparkContext = ??? val anyScalaValue = ??? val b = sc.broadcast(anyScalaValue) // <-- TorrentBroadcast is created |
A broadcast variable is stored on the driver’s BlockManager as a single value and separately as broadcast blocks (after it was divided into broadcast blocks, i.e. blockified). The broadcast block size is the value of spark.broadcast.blockSize Spark property.
Note
|
TorrentBroadcast -based broadcast variables are created using TorrentBroadcastFactory.
|
Note
|
TorrentBroadcast belongs to org.apache.spark.broadcast package.
|
TorrentBroadcast
uses _value
internal registry for the value that is computed by readBroadcastBlock when first requested.
1 2 3 4 5 |
_value: T |
Note
|
_value is a @transient private lazy val which means that it is not serialized to be sent remotely and instantiated when first requested.
|
Tip
|
Enable Add the following line to
Refer to Logging. |
unBlockifyObject
Method
Caution
|
FIXME |
releaseLock
Method
Caution
|
FIXME |
Getting Value of Broadcast Variable — getValue
Method
1 2 3 4 5 |
def getValue(): T |
getValue
returns the value of a broadcast variable.
Note
|
getValue is part of the Broadcast Variable Contract and is the only way to access the value of a broadcast variable.
|
Internaly, getValue
reads the internal _value
that, once accessed, reads broadcast blocks from the local or remote BlockManagers.
Note
|
The internal _value is transient and lazy, i.e. it is not preserved when serialized and (re)created only when requested, respectively. That “trick” allows for serializing broadcast values on the driver before they are transferred to executors over the wire.
|
readBroadcastBlock
Internal Method
1 2 3 4 5 |
readBroadcastBlock(): T |
Internally, readBroadcastBlock
sets the SparkConf
Note
|
The current SparkConf is available using SparkEnv.get.conf. |
readBroadcastBlock
requests the local BlockManager
for values of the broadcast.
Note
|
The current BlockManager is available using SparkEnv.get.blockManager. |
If the broadcast was available locally, readBroadcastBlock
releases a lock for the broadcast and returns the value.
If however the broadcast was not found locally, you should see the following INFO message in the logs:
1 2 3 4 5 |
INFO Started reading broadcast variable [id] |
readBroadcastBlock
reads blocks (as chunks) of the broadcast.
You should see the following INFO message in the logs:
1 2 3 4 5 |
INFO Reading broadcast variable [id] took [usedTimeMs] |
readBroadcastBlock
unblockifies the collection of ByteBuffer
blocks
Note
|
readBroadcastBlock uses the current Serializer and the internal CompressionCodec to bring all the blocks together as one single broadcast variable.
|
readBroadcastBlock
stores the broadcast variable with MEMORY_AND_DISK
storage level to the local BlockManager
. When storing the broadcast variable was unsuccessful, a SparkException
is thrown.
1 2 3 4 5 |
Failed to store [broadcastId] in BlockManager |
The broadcast variable is returned.
Note
|
readBroadcastBlock is used exclusively when TorrentBroadcast is requested for the value.
|
setConf
Internal Method
1 2 3 4 5 |
setConf(conf: SparkConf): Unit |
setConf
uses the input conf
SparkConf to set compression codec and the block size.
Internally, setConf
reads spark.broadcast.compress Spark property and if enabled (which it is by default) sets a CompressionCodec (as an internal compressionCodec
property).
setConf
also reads spark.broadcast.blockSize Spark property and sets the block size (as the internal blockSize
property).
Note
|
setConf is executed when TorrentBroadcast is created or re-created when deserialized on executors.
|
Storing Broadcast and Its Blocks in Local BlockManager — writeBlocks
Internal Method
1 2 3 4 5 |
writeBlocks(value: T): Int |
writeBlocks
stores the broadcast’s value
and blocks in the driver’s BlockManager. It returns the number of the broadcast blocks the broadcast was divided into.
Internally, writeBlocks
stores the block for value
broadcast to the local BlockManager
(using a new BroadcastBlockId, value
, MEMORY_AND_DISK
storage level and without telling the driver).
If storing the broadcast block fails, you should see the following SparkException
in the logs:
1 2 3 4 5 |
Failed to store [broadcastId] in BlockManager |
writeBlocks
divides value
into blocks (of spark.broadcast.blockSize size) using the Serializer and an optional CompressionCodec (enabled by spark.broadcast.compress). Every block gets its own BroadcastBlockId
(with piece
and an index) that is wrapped inside a ChunkedByteBuffer
. Blocks are stored in the local BlockManager
(using the piece
block id, MEMORY_AND_DISK_SER
storage level and informing the driver).
Note
|
The entire broadcast value is stored in the local BlockManager with MEMORY_AND_DISK storage level, and the pieces with MEMORY_AND_DISK_SER storage level.
|
If storing any of the broadcast pieces fails, you should see the following SparkException
in the logs:
1 2 3 4 5 |
Failed to store [pieceId] of [broadcastId] in local BlockManager |
Note
|
writeBlocks is used exclusively when TorrentBroadcast is created (that happens on the driver only).
|
Chunking Broadcast Into Blocks — blockifyObject
Method
1 2 3 4 5 6 7 8 9 |
blockifyObject[T]( obj: T, blockSize: Int, serializer: Serializer, compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] |
blockifyObject
divides (aka blockifies) the input obj
broadcast variable into blocks (of ByteBuffer
). blockifyObject
uses the input serializer
Serializer
to write obj
in a serialized format to a ChunkedByteBufferOutputStream
(of blockSize
size) with the optional CompressionCodec.
Note
|
blockifyObject is executed when TorrentBroadcast stores a broadcast and its blocks to a local BlockManager .
|
doUnpersist
Method
1 2 3 4 5 |
doUnpersist(blocking: Boolean): Unit |
Note
|
doUnpersist is part of the Broadcast Variable Contract and is executed from unpersist method.
|
doDestroy
Method
1 2 3 4 5 |
doDestroy(blocking: Boolean): Unit |
doDestroy
removes all the persisted state associated with a broadcast variable on all the nodes in a Spark application, i.e. the driver and executors.
Note
|
doDestroy is executed when Broadcast removes the persisted data and metadata related to a broadcast variable.
|
unpersist
Internal Method
1 2 3 4 5 6 7 8 |
unpersist( id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit |
unpersist
removes all broadcast blocks from executors and possibly the driver (only when removeFromDriver
flag is enabled).
Note
|
unpersist belongs to TorrentBroadcast private object and is executed when TorrentBroadcast unpersists a broadcast variable and removes a broadcast variable completely.
|
When executed, you should see the following DEBUG message in the logs:
1 2 3 4 5 |
DEBUG TorrentBroadcast: Unpersisting TorrentBroadcast [id] |
unpersist
requests BlockManagerMaster
to remove the id
broadcast.
Note
|
unpersist uses SparkEnv to get the BlockManagerMaster (through blockManager property).
|
Creating TorrentBroadcast Instance
TorrentBroadcast
takes the following when created:
TorrentBroadcast
initializes the internal registries and counters.