关注 spark技术分享,
撸spark源码 玩spark最佳实践

BroadcastExchangeExec

BroadcastExchangeExec Unary Physical Operator for Broadcast Joins

BroadcastExchangeExec is a Exchange unary physical operator to collect and broadcast rows of a child relation (to worker nodes).

BroadcastExchangeExec is created exclusively when EnsureRequirements physical query plan optimization ensures BroadcastDistribution of the input data of a physical operator (that can really be either BroadcastHashJoinExec or BroadcastNestedLoopJoinExec operators).

Table 1. BroadcastExchangeExec’s Performance Metrics
Key Name (in web UI) Description

broadcastTime

time to broadcast (ms)

buildTime

time to build (ms)

collectTime

time to collect (ms)

dataSize

data size (bytes)

spark sql BroadcastExchangeExec webui details for query.png
Figure 1. BroadcastExchangeExec in web UI (Details for Query)

BroadcastExchangeExec uses BroadcastPartitioning partitioning scheme (with the input BroadcastMode).

Waiting Until Relation Has Been Broadcast — doExecuteBroadcast Method

doExecuteBroadcast waits until the rows are broadcast.

Note
doExecuteBroadcast waits spark.sql.broadcastTimeout (defaults to 5 minutes).
Note
doExecuteBroadcast is part of SparkPlan Contract to return the result of a structured query as a broadcast variable.

Lazily-Once-Initialized Asynchronously-Broadcast relationFuture Internal Attribute

When “materialized” (aka executed), relationFuture finds the current execution id and sets it to the Future thread.

relationFuture requests child physical operator to executeCollectIterator.

relationFuture records the time for executeCollectIterator in collectTime metrics.

Note
relationFuture accepts a relation with up to 512 millions rows and 8GB in size, and reports a SparkException if the conditions are violated.

relationFuture requests the input BroadcastMode to transform the internal rows to create a relation, e.g. HashedRelation or a Array[InternalRow].

relationFuture calculates the data size:

  • For a HashedRelation, relationFuture requests it to estimatedSize

  • For a Array[InternalRow], relationFuture transforms the InternalRows to UnsafeRows and requests each to getSizeInBytes that it sums all up.

relationFuture records the data size as the dataSize metric.

relationFuture records the buildTime metric.

relationFuture requests the SparkContext to broadcast the relation and records the time in broadcastTime metrics.

In the end, relationFuture requests SQLMetrics to post a SparkListenerDriverAccumUpdates (with the execution id and the SQL metrics) and returns the broadcast internal rows.

Note
Since initialization of relationFuture happens on the driver, posting a SparkListenerDriverAccumUpdates is the only way how all the SQL metrics could be accessible to other subsystems using SparkListener listeners (incl. web UI).

In case of OutOfMemoryError, relationFuture reports another OutOfMemoryError with the following message:

Note
relationFuture is executed on a separate thread from a custom scala.concurrent.ExecutionContext (built from a cached java.util.concurrent.ThreadPoolExecutor with the prefix broadcast-exchange and up to 128 threads).
Note
relationFuture is used when BroadcastExchangeExec is requested to prepare for execution (that triggers asynchronous execution of the child operator and broadcasting the result) and execute broadcast (that waits until the broadcasting has finished).

Broadcasting Relation (Rows) Asynchronously — doPrepare Method

Note
doPrepare is part of SparkPlan Contract to prepare a physical operator for execution.

doPrepare simply “materializes” the internal lazily-once-initialized asynchronous broadcast.

Creating BroadcastExchangeExec Instance

BroadcastExchangeExec takes the following when created:

赞(0) 打赏
未经允许不得转载:spark技术分享 » BroadcastExchangeExec
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏