关注 spark技术分享,
撸spark源码 玩spark最佳实践

Netty-based RpcEnv

Netty-based RpcEnv

Tip
Read up RpcEnv — RPC Environment on the concept of RPC Environment in Spark.

The class org.apache.spark.rpc.netty.NettyRpcEnv is the implementation of RpcEnv using Netty“an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients”.

Netty-based RPC Environment is created by NettyRpcEnvFactory when spark.rpc is netty or org.apache.spark.rpc.netty.NettyRpcEnvFactory.

It uses Java’s built-in serialization (the implementation of JavaSerializerInstance).

Caution
FIXME What other choices of JavaSerializerInstance are available in Spark?

NettyRpcEnv is only started on the driver. See Client Mode.

The default port to listen to is 7077.

When NettyRpcEnv starts, the following INFO message is printed out in the logs:

Tip

Set DEBUG for org.apache.spark.network.server.TransportServer logger to know when Shuffle server/NettyRpcEnv starts listening to messages.

FIXME: The message above in TransportServer has a space before :.

Creating NettyRpcEnv — create Method

Caution
FIXME

Client Mode

Refer to Client Mode = is this an executor or the driver? for introduction about client mode. This is only for Netty-based RpcEnv.

When created, a Netty-based RpcEnv starts the RPC server and register necessary endpoints for non-client mode, i.e. when client mode is false.

Caution
FIXME What endpoints?

It means that the required services for remote communication with NettyRpcEnv are only started on the driver (not executors).

Thread Pools

shuffle-server-ID

EventLoopGroup uses a daemon thread pool called shuffle-server-ID, where ID is a unique integer for NioEventLoopGroup (NIO) or EpollEventLoopGroup (EPOLL) for the Shuffle server.

Caution
FIXME Review Netty’s NioEventLoopGroup.
Caution
FIXME Where are SO_BACKLOG, SO_RCVBUF, SO_SNDBUF channel options used?

dispatcher-event-loop-ID

NettyRpcEnv’s Dispatcher uses the daemon fixed thread pool with spark.rpc.netty.dispatcher.numThreads threads.

Thread names are formatted as dispatcher-event-loop-ID, where ID is a unique, sequentially assigned integer.

It starts the message processing loop on all of the threads.

netty-rpc-env-timeout

NettyRpcEnv uses the daemon single-thread scheduled thread pool netty-rpc-env-timeout.

netty-rpc-connection-ID

NettyRpcEnv uses the daemon cached thread pool with up to spark.rpc.connect.threads threads.

Thread names are formatted as netty-rpc-connection-ID, where ID is a unique, sequentially assigned integer.

Settings

The Netty-based implementation uses the following properties:

  • spark.rpc.io.mode (default: NIO) – NIO or EPOLL for low-level IO. NIO is always available, while EPOLL is only available on Linux. NIO uses io.netty.channel.nio.NioEventLoopGroup while EPOLL io.netty.channel.epoll.EpollEventLoopGroup.

  • spark.shuffle.io.numConnectionsPerPeer always equals 1

  • spark.rpc.io.threads (default: 0; maximum: 8) – the number of threads to use for the Netty client and server thread pools.

    • spark.shuffle.io.serverThreads (default: the value of spark.rpc.io.threads)

    • spark.shuffle.io.clientThreads (default: the value of spark.rpc.io.threads)

  • spark.rpc.netty.dispatcher.numThreads (default: the number of processors available to JVM)

  • spark.rpc.connect.threads (default: 64) – used in cluster mode to communicate with a remote RPC endpoint

  • spark.port.maxRetries (default: 16 or 100 for testing when spark.testing is set) controls the maximum number of binding attempts/retries to a port before giving up.

Endpoints

  • endpoint-verifier (RpcEndpointVerifier) – a RpcEndpoint for remote RpcEnvs to query whether an RpcEndpoint exists or not. It uses Dispatcher that keeps track of registered endpoints and responds true/false to CheckExistence message.

endpoint-verifier is used to check out whether a given endpoint exists or not before the endpoint’s reference is given back to clients.

One use case is when an AppClient connects to standalone Masters before it registers the application it acts for.

Caution
FIXME Who’d like to use endpoint-verifier and how?

Message Dispatcher

A message dispatcher is responsible for routing RPC messages to the appropriate endpoint(s).

It uses the daemon fixed thread pool dispatcher-event-loop with spark.rpc.netty.dispatcher.numThreads threads for dispatching messages.

赞(0) 打赏
未经允许不得转载:spark技术分享 » Netty-based RpcEnv
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏