BroadcastExchangeExec Unary Physical Operator for Broadcast Joins
BroadcastExchangeExec
is a Exchange unary physical operator to collect and broadcast rows of a child relation (to worker nodes).
BroadcastExchangeExec
is created exclusively when EnsureRequirements
physical query plan optimization ensures BroadcastDistribution of the input data of a physical operator (that can really be either BroadcastHashJoinExec or BroadcastNestedLoopJoinExec operators).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
val t1 = spark.range(5) val t2 = spark.range(5) val q = t1.join(t2).where(t1("id") === t2("id")) scala> q.explain == Physical Plan == *BroadcastHashJoin [id#19L], [id#22L], Inner, BuildRight :- *Range (0, 5, step=1, splits=Some(8)) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) +- *Range (0, 5, step=1, splits=Some(8)) |
Key | Name (in web UI) | Description |
---|---|---|
time to broadcast (ms) |
||
time to build (ms) |
||
time to collect (ms) |
||
data size (bytes) |
BroadcastExchangeExec
uses BroadcastPartitioning partitioning scheme (with the input BroadcastMode).
Waiting Until Relation Has Been Broadcast — doExecuteBroadcast
Method
1 2 3 4 5 |
def doExecuteBroadcast[T](): broadcast.Broadcast[T] |
doExecuteBroadcast
waits until the rows are broadcast.
Note
|
doExecuteBroadcast waits spark.sql.broadcastTimeout (defaults to 5 minutes).
|
Note
|
doExecuteBroadcast is part of SparkPlan Contract to return the result of a structured query as a broadcast variable.
|
Lazily-Once-Initialized Asynchronously-Broadcast relationFuture
Internal Attribute
1 2 3 4 5 |
relationFuture: Future[broadcast.Broadcast[Any]] |
When “materialized” (aka executed), relationFuture
finds the current execution id and sets it to the Future
thread.
relationFuture
requests child physical operator to executeCollectIterator.
relationFuture
records the time for executeCollectIterator
in collectTime metrics.
Note
|
relationFuture accepts a relation with up to 512 millions rows and 8GB in size, and reports a SparkException if the conditions are violated.
|
relationFuture
requests the input BroadcastMode to transform
the internal rows to create a relation, e.g. HashedRelation or a Array[InternalRow]
.
relationFuture
calculates the data size:
-
For a
HashedRelation
,relationFuture
requests it to estimatedSize -
For a
Array[InternalRow]
,relationFuture
transforms theInternalRows
to UnsafeRows and requests each to getSizeInBytes that it sums all up.
relationFuture
records the data size as the dataSize metric.
relationFuture
records the buildTime metric.
relationFuture
requests the SparkContext to broadcast
the relation and records the time in broadcastTime metrics.
In the end, relationFuture
requests SQLMetrics
to post a SparkListenerDriverAccumUpdates (with the execution id and the SQL metrics) and returns the broadcast internal rows.
Note
|
Since initialization of relationFuture happens on the driver, posting a SparkListenerDriverAccumUpdates is the only way how all the SQL metrics could be accessible to other subsystems using SparkListener listeners (incl. web UI).
|
In case of OutOfMemoryError
, relationFuture
reports another OutOfMemoryError
with the following message:
1 2 3 4 5 |
Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value |
Note
|
relationFuture is executed on a separate thread from a custom scala.concurrent.ExecutionContext (built from a cached java.util.concurrent.ThreadPoolExecutor with the prefix broadcast-exchange and up to 128 threads).
|
Note
|
relationFuture is used when BroadcastExchangeExec is requested to prepare for execution (that triggers asynchronous execution of the child operator and broadcasting the result) and execute broadcast (that waits until the broadcasting has finished).
|
Broadcasting Relation (Rows) Asynchronously — doPrepare
Method
1 2 3 4 5 |
doPrepare(): Unit |
Note
|
doPrepare is part of SparkPlan Contract to prepare a physical operator for execution.
|
doPrepare
simply “materializes” the internal lazily-once-initialized asynchronous broadcast.
Creating BroadcastExchangeExec Instance
BroadcastExchangeExec
takes the following when created:
-
Child logical plan