NettyBlockTransferService — Netty-Based BlockTransferService
NettyBlockTransferService is a BlockTransferService that uses Netty for uploading or fetching blocks of data.
NettyBlockTransferService is created when SparkEnv is created for the driver and executors (to create the BlockManager).
BlockManager uses NettyBlockTransferService for the following:
-
FIXME (should it be here or in BlockManager?)
-
ShuffleClient (when
spark.shuffle.service.enabledconfiguration property is off) for…FIXME
NettyBlockTransferService simply requests the TransportServer for the port.
|
Tip
|
Enable Add the following line to
Refer to Logging. |
fetchBlocks Method
|
1 2 3 4 5 6 7 8 9 10 |
fetchBlocks( host: String, port: Int, execId: String, blockIds: Array[String], listener: BlockFetchingListener): Unit |
|
Note
|
fetchBlocks is part of BlockTransferService Contract to…FIXME.
|
When executed, fetchBlocks prints out the following TRACE message in the logs:
|
1 2 3 4 5 |
TRACE Fetch blocks from [host]:[port] (executor id [execId]) |
fetchBlocks then creates a RetryingBlockFetcher.BlockFetchStarter where createAndStart method…FIXME
Depending on the maximum number of acceptable IO exceptions (such as connection timeouts) per request, if the number is greater than 0, fetchBlocks creates a RetryingBlockFetcher and starts it immediately.
|
Note
|
RetryingBlockFetcher is created with the RetryingBlockFetcher.BlockFetchStarter created earlier, the input blockIds and listener.
|
If however the number of retries is not greater than 0 (it could be 0 or less), the RetryingBlockFetcher.BlockFetchStarter created earlier is started (with the input blockIds and listener).
In case of any Exception, you should see the following ERROR message in the logs and the input BlockFetchingListener gets notified (using onBlockFetchFailure for every block id).
|
1 2 3 4 5 |
ERROR Exception while beginning fetchBlocks |
Application Id — appId Property
|
Caution
|
FIXME |
Closing NettyBlockTransferService — close Method
|
1 2 3 4 5 |
close(): Unit |
|
Note
|
close is part of the BlockTransferService Contract.
|
close…FIXME
Initializing NettyBlockTransferService — init Method
|
1 2 3 4 5 |
init(blockDataManager: BlockDataManager): Unit |
|
Note
|
init is part of the BlockTransferService Contract.
|
init starts a server for…FIXME
Internally, init creates a NettyBlockRpcServer (using the application id, a JavaSerializer and the input blockDataManager).
|
Caution
|
FIXME Describe security when authEnabled is enabled.
|
init creates a TransportContext with the NettyBlockRpcServer created earlier.
|
Caution
|
FIXME Describe transportConf and TransportContext.
|
init creates the internal clientFactory and a server.
|
Caution
|
FIXME What’s the “a server”? |
In the end, you should see the INFO message in the logs:
|
1 2 3 4 5 |
INFO NettyBlockTransferService: Server created on [hostName]:[port] |
|
Note
|
hostname is given when NettyBlockTransferService is created and is controlled by spark.driver.host Spark property for the driver and differs per deployment environment for executors (as controlled by --hostname for CoarseGrainedExecutorBackend).
|
Uploading Block — uploadBlock Method
|
1 2 3 4 5 6 7 8 9 10 11 12 |
uploadBlock( hostname: String, port: Int, execId: String, blockId: BlockId, blockData: ManagedBuffer, level: StorageLevel, classTag: ClassTag[_]): Future[Unit] |
|
Note
|
uploadBlock is part of the BlockTransferService Contract.
|
Internally, uploadBlock creates a TransportClient client to send a UploadBlock message (to the input hostname and port).
|
Note
|
UploadBlock message is processed by NettyBlockRpcServer.
|
The UploadBlock message holds the application id, the input execId and blockId. It also holds the serialized bytes for block metadata with level and classTag serialized (using the internal JavaSerializer) as well as the serialized bytes for the input blockData itself (this time however the serialization uses ManagedBuffer.nioByteBuffer method).
The entire UploadBlock message is further serialized before sending (using TransportClient.sendRpc).
|
Caution
|
FIXME Describe TransportClient and clientFactory.createClient.
|
When blockId block was successfully uploaded, you should see the following TRACE message in the logs:
|
1 2 3 4 5 |
TRACE NettyBlockTransferService: Successfully uploaded block [blockId] |
When an upload failed, you should see the following ERROR message in the logs:
|
1 2 3 4 5 |
ERROR Error while uploading block [blockId] |
UploadBlock Message
UploadBlock is a BlockTransferMessage that describes a block being uploaded, i.e. send over the wire from a NettyBlockTransferService to a NettyBlockRpcServer.
| Attribute | Description |
|---|---|
|
|
The application id (the block belongs to) |
|
|
The executor id |
|
|
The block id |
|
|
|
|
|
The block data as an array of bytes |
As an Encodable, UploadBlock can calculate the encoded size and do encoding and decoding itself to or from a ByteBuf, respectively.
createServer Internal Method
|
1 2 3 4 5 |
createServer(bootstraps: List[TransportServerBootstrap]): TransportServer |
createServer…FIXME
|
Note
|
createServer is used exclusively when NettyBlockTransferService is requested to init.
|
Creating NettyBlockTransferService Instance
NettyBlockTransferService takes the following when created:
NettyBlockTransferService initializes the internal registries and counters.
spark技术分享