NettyBlockTransferService — Netty-Based BlockTransferService
NettyBlockTransferService
is a BlockTransferService that uses Netty for uploading or fetching blocks of data.
NettyBlockTransferService
is created when SparkEnv
is created for the driver and executors (to create the BlockManager).
BlockManager
uses NettyBlockTransferService
for the following:
-
FIXME (should it be here or in BlockManager?)
-
ShuffleClient (when
spark.shuffle.service.enabled
configuration property is off) for…FIXME
NettyBlockTransferService
simply requests the TransportServer for the port.
Tip
|
Enable Add the following line to
Refer to Logging. |
fetchBlocks
Method
1 2 3 4 5 6 7 8 9 10 |
fetchBlocks( host: String, port: Int, execId: String, blockIds: Array[String], listener: BlockFetchingListener): Unit |
Note
|
fetchBlocks is part of BlockTransferService Contract to…FIXME.
|
When executed, fetchBlocks
prints out the following TRACE message in the logs:
1 2 3 4 5 |
TRACE Fetch blocks from [host]:[port] (executor id [execId]) |
fetchBlocks
then creates a RetryingBlockFetcher.BlockFetchStarter
where createAndStart
method…FIXME
Depending on the maximum number of acceptable IO exceptions (such as connection timeouts) per request, if the number is greater than 0
, fetchBlocks
creates a RetryingBlockFetcher and starts it immediately.
Note
|
RetryingBlockFetcher is created with the RetryingBlockFetcher.BlockFetchStarter created earlier, the input blockIds and listener .
|
If however the number of retries is not greater than 0
(it could be 0
or less), the RetryingBlockFetcher.BlockFetchStarter
created earlier is started (with the input blockIds
and listener
).
In case of any Exception
, you should see the following ERROR message in the logs and the input BlockFetchingListener
gets notified (using onBlockFetchFailure
for every block id).
1 2 3 4 5 |
ERROR Exception while beginning fetchBlocks |
Application Id — appId
Property
Caution
|
FIXME |
Closing NettyBlockTransferService — close
Method
1 2 3 4 5 |
close(): Unit |
Note
|
close is part of the BlockTransferService Contract.
|
close
…FIXME
Initializing NettyBlockTransferService — init
Method
1 2 3 4 5 |
init(blockDataManager: BlockDataManager): Unit |
Note
|
init is part of the BlockTransferService Contract.
|
init
starts a server for…FIXME
Internally, init
creates a NettyBlockRpcServer
(using the application id, a JavaSerializer
and the input blockDataManager
).
Caution
|
FIXME Describe security when authEnabled is enabled.
|
init
creates a TransportContext
with the NettyBlockRpcServer
created earlier.
Caution
|
FIXME Describe transportConf and TransportContext .
|
init
creates the internal clientFactory
and a server.
Caution
|
FIXME What’s the “a server”? |
In the end, you should see the INFO message in the logs:
1 2 3 4 5 |
INFO NettyBlockTransferService: Server created on [hostName]:[port] |
Note
|
hostname is given when NettyBlockTransferService is created and is controlled by spark.driver.host Spark property for the driver and differs per deployment environment for executors (as controlled by --hostname for CoarseGrainedExecutorBackend ).
|
Uploading Block — uploadBlock
Method
1 2 3 4 5 6 7 8 9 10 11 12 |
uploadBlock( hostname: String, port: Int, execId: String, blockId: BlockId, blockData: ManagedBuffer, level: StorageLevel, classTag: ClassTag[_]): Future[Unit] |
Note
|
uploadBlock is part of the BlockTransferService Contract.
|
Internally, uploadBlock
creates a TransportClient
client to send a UploadBlock
message (to the input hostname
and port
).
Note
|
UploadBlock message is processed by NettyBlockRpcServer.
|
The UploadBlock
message holds the application id, the input execId
and blockId
. It also holds the serialized bytes for block metadata with level
and classTag
serialized (using the internal JavaSerializer
) as well as the serialized bytes for the input blockData
itself (this time however the serialization uses ManagedBuffer.nioByteBuffer
method).
The entire UploadBlock
message is further serialized before sending (using TransportClient.sendRpc
).
Caution
|
FIXME Describe TransportClient and clientFactory.createClient .
|
When blockId
block was successfully uploaded, you should see the following TRACE message in the logs:
1 2 3 4 5 |
TRACE NettyBlockTransferService: Successfully uploaded block [blockId] |
When an upload failed, you should see the following ERROR message in the logs:
1 2 3 4 5 |
ERROR Error while uploading block [blockId] |
UploadBlock
Message
UploadBlock
is a BlockTransferMessage
that describes a block being uploaded, i.e. send over the wire from a NettyBlockTransferService to a NettyBlockRpcServer.
Attribute | Description |
---|---|
|
The application id (the block belongs to) |
|
The executor id |
|
The block id |
|
|
|
The block data as an array of bytes |
As an Encodable
, UploadBlock
can calculate the encoded size and do encoding and decoding itself to or from a ByteBuf
, respectively.
createServer
Internal Method
1 2 3 4 5 |
createServer(bootstraps: List[TransportServerBootstrap]): TransportServer |
createServer
…FIXME
Note
|
createServer is used exclusively when NettyBlockTransferService is requested to init.
|
Creating NettyBlockTransferService Instance
NettyBlockTransferService
takes the following when created:
NettyBlockTransferService
initializes the internal registries and counters.