LiveListenerBus
LiveListenerBus
is used to announce application-wide events in a Spark application. It asynchronously passes listener events to registered Spark listeners.
LiveListenerBus
is a single-JVM SparkListenerBus that uses listenerThread to poll events. Emitters are supposed to use post
method to post SparkListenerEvent
events.
Note
|
The event queue is java.util.concurrent.LinkedBlockingQueue with capacity of 10000 SparkListenerEvent events.
|
LiveListenerBus
takes a SparkConf when created.
LiveListenerBus
is created and started when SparkContext
is initialized.
Starting LiveListenerBus
— start
method
1 2 3 4 5 |
start(sc: SparkContext): Unit |
start
starts processing events.
Internally, it saves the input SparkContext
for later use and starts listenerThread. It makes sure that it only happens when LiveListenerBus
has not been started before (i.e. started
is disabled).
If however LiveListenerBus
has already been started, a IllegalStateException
is thrown:
1 2 3 4 5 |
[name] already started! |
Posting SparkListenerEvent Events — post
method
1 2 3 4 5 |
post(event: SparkListenerEvent): Unit |
post
puts the input event
onto the internal eventQueue
queue and releases the internal eventLock
semaphore. If the event placement was not successful (and it could happen since it is tapped at 10000 events) onDropEvent method is called.
The event publishing is only possible when stopped
flag has been enabled.
Caution
|
FIXME Who’s enabling the stopped flag and when/why?
|
If LiveListenerBus
has been stopped, the following ERROR appears in the logs:
1 2 3 4 5 |
ERROR [name] has already stopped! Dropping event [event] |
Event Dropped Callback — onDropEvent
method
1 2 3 4 5 |
onDropEvent(event: SparkListenerEvent): Unit |
onDropEvent
is called when no further events can be added to the internal eventQueue
queue (while posting a SparkListenerEvent event).
It simply prints out the following ERROR message to the logs and ensures that it happens only once.
1 2 3 4 5 |
ERROR Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler. |
Note
|
It uses the internal logDroppedEvent atomic variable to track the state.
|
Stopping LiveListenerBus
— stop
method
1 2 3 4 5 |
stop(): Unit |
stop
releases the internal eventLock
semaphore and waits until listenerThread dies. It can only happen after all events were posted (and polling eventQueue
gives nothing).
It checks that started
flag is enabled (i.e. true
) and throws a IllegalStateException
otherwise.
1 2 3 4 5 |
Attempted to stop [name] that has not yet started! |
stopped
flag is enabled.
listenerThread
for Event Polling
LiveListenerBus
uses SparkListenerBus single-daemon thread that ensures that the polling events from the event queue is only after the listener was started and only one event at a time.
Caution
|
FIXME There is some logic around no events in the queue. |
Settings
Spark Property | Default Value | Description |
---|---|---|
(empty) |
The comma-separated list of fully-qualified class names of Spark listeners that should be registered (when |
Registering SparkListenerInterface with Application Status Queue — addToStatusQueue
Method
1 2 3 4 5 |
addToStatusQueue(listener: SparkListenerInterface): Unit |
addToStatusQueue
simply adds the SparkListenerInterface to eventLog queue.
Note
|
addToStatusQueue is used when…FIXME
|