ResolveBroadcastHints Logical Resolution Rule — Resolving UnresolvedHint Operators with BROADCAST, BROADCASTJOIN and MAPJOIN Hint Names
ResolveBroadcastHints is a logical resolution rule that the Spark Analyzer uses to resolve UnresolvedHint logical operators with BROADCAST, BROADCASTJOIN or MAPJOIN hints (case-insensitive) to ResolvedHint operators.
Technically, ResolveBroadcastHints is a Catalyst rule for transforming logical plans, i.e. Rule[LogicalPlan].
ResolveBroadcastHints is part of Hints fixed-point batch of rules (that is executed before any other rule).
ResolveBroadcastHints takes a SQLConf when created.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
// Use Catalyst DSL to create a logical plan import org.apache.spark.sql.catalyst.dsl.plans._ val plan = table("t1").join(table("t2")).hint(name = "broadcast", "t1", "table2") scala> println(plan.numberedTreeString) 00 'UnresolvedHint broadcast, [t1, table2] 01 +- 'Join Inner 02 :- 'UnresolvedRelation `t1` 03 +- 'UnresolvedRelation `t2` import org.apache.spark.sql.catalyst.analysis.ResolveHints.ResolveBroadcastHints val resolver = new ResolveBroadcastHints(spark.sessionState.conf) val analyzedPlan = resolver(plan) scala> println(analyzedPlan.numberedTreeString) 00 'Join Inner 01 :- 'ResolvedHint (broadcast) 02 : +- 'UnresolvedRelation `t1` 03 +- 'UnresolvedRelation `t2` |
Resolving UnresolvedHint with BROADCAST, BROADCASTJOIN or MAPJOIN Hint Names (Applying ResolveBroadcastHints to Logical Plan) — apply Method
|
1 2 3 4 5 |
apply(plan: LogicalPlan): LogicalPlan |
|
Note
|
apply is part of Rule Contract to apply a rule to a logical plan.
|
apply transforms UnresolvedHint operators into ResolvedHint for the hint names as BROADCAST, BROADCASTJOIN or MAPJOIN (case-insensitive).
For UnresolvedHints with no parameters, apply marks the entire child logical plan as eligible for broadcast, i.e. creates a ResolvedHint with the child operator and HintInfo with broadcast flag on.
For UnresolvedHints with parameters defined, apply considers the parameters the names of the tables to apply broadcast hint to.
|
Note
|
The table names can be of String or UnresolvedAttribute types.
|
apply reports an AnalysisException for the parameters that are not of String or UnresolvedAttribute types.
|
1 2 3 4 5 |
org.apache.spark.sql.AnalysisException: Broadcast hint parameter should be an identifier or string but was [unsupported] ([className] |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
// Use Catalyst DSL to create a logical plan import org.apache.spark.sql.catalyst.dsl.plans._ // !!! IT WON'T WORK !!! // 1 is not a table name or of type `UnresolvedAttribute` val plan = table("t1").hint(name = "broadcast", 1) scala> println(plan.numberedTreeString) 00 'UnresolvedHint broadcast, [1] 01 +- 'UnresolvedRelation `t1` // Resolve hints import org.apache.spark.sql.catalyst.analysis.ResolveHints val broadcastHintResolver = new ResolveHints.ResolveBroadcastHints(spark.sessionState.conf) scala> broadcastHintResolver(plan) org.apache.spark.sql.AnalysisException: Broadcast hint parameter should be an identifier or string but was 1 (class java.lang.Integer; at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints$$anonfun$apply$1$$anonfun$applyOrElse$1.apply(ResolveHints.scala:98) at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints$$anonfun$apply$1$$anonfun$applyOrElse$1.apply(ResolveHints.scala:95) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints$$anonfun$apply$1.applyOrElse(ResolveHints.scala:95) at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints$$anonfun$apply$1.applyOrElse(ResolveHints.scala:88) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288) at org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints.apply(ResolveHints.scala:88) ... 51 elided |
applyBroadcastHint Internal Method
|
1 2 3 4 5 |
applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan |
applyBroadcastHint…FIXME
|
Note
|
applyBroadcastHint is used exclusively when ResolveBroadcastHints is requested to execute.
|
spark技术分享