ResolveBroadcastHints Logical Resolution Rule — Resolving UnresolvedHint Operators with BROADCAST, BROADCASTJOIN and MAPJOIN Hint Names
ResolveBroadcastHints
is a logical resolution rule that the Spark Analyzer uses to resolve UnresolvedHint logical operators with BROADCAST
, BROADCASTJOIN
or MAPJOIN
hints (case-insensitive) to ResolvedHint operators.
Technically, ResolveBroadcastHints
is a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan]
.
ResolveBroadcastHints
is part of Hints fixed-point batch of rules (that is executed before any other rule).
ResolveBroadcastHints
takes a SQLConf when created.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
// Use Catalyst DSL to create a logical plan import org.apache.spark.sql.catalyst.dsl.plans._ val plan = table("t1").join(table("t2")).hint(name = "broadcast", "t1", "table2") scala> println(plan.numberedTreeString) 00 'UnresolvedHint broadcast, [t1, table2] 01 +- 'Join Inner 02 :- 'UnresolvedRelation `t1` 03 +- 'UnresolvedRelation `t2` import org.apache.spark.sql.catalyst.analysis.ResolveHints.ResolveBroadcastHints val resolver = new ResolveBroadcastHints(spark.sessionState.conf) val analyzedPlan = resolver(plan) scala> println(analyzedPlan.numberedTreeString) 00 'Join Inner 01 :- 'ResolvedHint (broadcast) 02 : +- 'UnresolvedRelation `t1` 03 +- 'UnresolvedRelation `t2` |
Resolving UnresolvedHint with BROADCAST, BROADCASTJOIN or MAPJOIN Hint Names (Applying ResolveBroadcastHints to Logical Plan) — apply
Method
1 2 3 4 5 |
apply(plan: LogicalPlan): LogicalPlan |
Note
|
apply is part of Rule Contract to apply a rule to a logical plan.
|
apply
transforms UnresolvedHint operators into ResolvedHint for the hint names as BROADCAST
, BROADCASTJOIN
or MAPJOIN
(case-insensitive).
For UnresolvedHints
with no parameters, apply
marks the entire child logical plan as eligible for broadcast, i.e. creates a ResolvedHint
with the child operator and HintInfo
with broadcast flag on.
For UnresolvedHints
with parameters defined, apply
considers the parameters the names of the tables to apply broadcast hint to.
Note
|
The table names can be of String or UnresolvedAttribute types.
|
apply
reports an AnalysisException
for the parameters that are not of String
or UnresolvedAttribute
types.
1 2 3 4 5 |
org.apache.spark.sql.AnalysisException: Broadcast hint parameter should be an identifier or string but was [unsupported] ([className] |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
// Use Catalyst DSL to create a logical plan import org.apache.spark.sql.catalyst.dsl.plans._ // !!! IT WON'T WORK !!! // 1 is not a table name or of type `UnresolvedAttribute` val plan = table("t1").hint(name = "broadcast", 1) scala> println(plan.numberedTreeString) 00 'UnresolvedHint broadcast, [1] 01 +- 'UnresolvedRelation `t1` // Resolve hints import org.apache.spark.sql.catalyst.analysis.ResolveHints val broadcastHintResolver = new ResolveHints.ResolveBroadcastHints(spark.sessionState.conf) scala> broadcastHintResolver(plan) org.apache.spark.sql.AnalysisException: Broadcast hint parameter should be an identifier or string but was 1 (class java.lang.Integer; at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints$$anonfun$apply$1$$anonfun$applyOrElse$1.apply(ResolveHints.scala:98) at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints$$anonfun$apply$1$$anonfun$applyOrElse$1.apply(ResolveHints.scala:95) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints$$anonfun$apply$1.applyOrElse(ResolveHints.scala:95) at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints$$anonfun$apply$1.applyOrElse(ResolveHints.scala:88) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288) at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints.apply(ResolveHints.scala:88) ... 51 elided |
applyBroadcastHint
Internal Method
1 2 3 4 5 |
applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan |
applyBroadcastHint
…FIXME
Note
|
applyBroadcastHint is used exclusively when ResolveBroadcastHints is requested to execute.
|