JoinEstimation
JoinEstimation
is a utility that computes statistics estimates and query hints of a Join logical operator.
JoinEstimation
is created exclusively for BasicStatsPlanVisitor
to estimate statistics of a Join logical operator.
Note
|
BasicStatsPlanVisitor is used only when cost-based optimization is enabled.
|
JoinEstimation
takes a Join logical operator when created.
When created, JoinEstimation
immediately takes the estimated statistics and query hints of the left and right sides of the Join logical operator.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
// JoinEstimation requires row count stats for join statistics estimates // With cost-based optimization off, size in bytes is available only // That would give no join estimates whatsoever (except size in bytes) // Make sure that you `--conf spark.sql.cbo.enabled=true` scala> println(spark.sessionState.conf.cboEnabled) true // Build a query with join operator // From the available data sources tables seem the best...so far val r1 = spark.range(5) scala> println(r1.queryExecution.analyzed.stats.simpleString) sizeInBytes=40.0 B, hints=none // Make the demo reproducible val db = spark.catalog.currentDatabase spark.sharedState.externalCatalog.dropTable(db, table = "t1", ignoreIfNotExists = true, purge = true) spark.sharedState.externalCatalog.dropTable(db, table = "t2", ignoreIfNotExists = true, purge = true) // FIXME What relations give row count stats? // Register tables spark.range(5).write.saveAsTable("t1") spark.range(10).write.saveAsTable("t2") // Refresh internal registries sql("REFRESH TABLE t1") sql("REFRESH TABLE t2") // Calculate row count stats val tables = Seq("t1", "t2") tables.map(t => s"ANALYZE TABLE $t COMPUTE STATISTICS").foreach(sql) val t1 = spark.table("t1") val t2 = spark.table("t2") // analyzed plan is just before withCachedData and optimizedPlan plans // where CostBasedJoinReorder kicks in and optimizes a query using statistics val t1plan = t1.queryExecution.analyzed scala> println(t1plan.numberedTreeString) 00 SubqueryAlias t1 01 +- Relation[id#45L] parquet // Show the stats of every node in the analyzed query plan val p0 = t1plan.p(0) scala> println(s"Statistics of ${p0.simpleString}: ${p0.stats.simpleString}") Statistics of SubqueryAlias t1: sizeInBytes=80.0 B, hints=none val p1 = t1plan.p(1) scala> println(s"Statistics of ${p1.simpleString}: ${p1.stats.simpleString}") Statistics of Relation[id#45L] parquet: sizeInBytes=80.0 B, rowCount=5, hints=none val t2plan = t2.queryExecution.analyzed // let's get rid of the SubqueryAlias operator import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases val t1NoAliasesPlan = EliminateSubqueryAliases(t1plan) val t2NoAliasesPlan = EliminateSubqueryAliases(t2plan) // Using Catalyst DSL import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans._ val plan = t1NoAliasesPlan.join( otherPlan = t2NoAliasesPlan, joinType = Inner, condition = Some($"id".expr)) scala> println(plan.numberedTreeString) 00 'Join Inner, 'id 01 :- Relation[id#45L] parquet 02 +- Relation[id#57L] parquet // Take Join operator off the logical plan // JoinEstimation works with Joins only import org.apache.spark.sql.catalyst.plans.logical.Join val join = plan.collect { case j: Join => j }.head // Make sure that row count stats are defined per join side scala> join.left.stats.rowCount.isDefined res1: Boolean = true scala> join.right.stats.rowCount.isDefined res2: Boolean = true // Make the example reproducible // Computing stats is once-only process and the estimates are cached join.invalidateStatsCache import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.JoinEstimation val stats = JoinEstimation(join).estimate scala> :type stats Option[org.apache.spark.sql.catalyst.plans.logical.Statistics] // Stats have to be available so Option.get should just work scala> println(stats.get.simpleString) Some(sizeInBytes=1200.0 B, rowCount=50, hints=none) |
JoinEstimation
can estimate statistics and query hints of a Join logical operator with the following join types:
-
Inner
,Cross
,LeftOuter
,RightOuter
,FullOuter
,LeftSemi
andLeftAnti
For the other join types (e.g. ExistenceJoin
), JoinEstimation
prints out a DEBUG message to the logs and returns None
(to “announce” that no statistics could be computed).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
// Demo: Unsupported join type, i.e. ExistenceJoin // Some parts were copied from the earlier demo // FIXME Make it self-contained // Using Catalyst DSL // Don't even know if such existance join could ever be possible in Spark SQL // For demo purposes it's OK, isn't it? import org.apache.spark.sql.catalyst.plans.ExistenceJoin val left = t1NoAliasesPlan val right = t2NoAliasesPlan val plan = left.join(right, joinType = ExistenceJoin(exists = 'id.long)) // Take Join operator off the logical plan // JoinEstimation works with Joins only import org.apache.spark.sql.catalyst.plans.logical.Join val join = plan.collect { case j: Join => j }.head // Enable DEBUG logging level import org.apache.log4j.{Level, Logger} Logger.getLogger("org.apache.spark.sql.catalyst.plans.logical.statsEstimation.JoinEstimation").setLevel(Level.DEBUG) scala> val stats = JoinEstimation(join).estimate 18/06/13 10:29:37 DEBUG JoinEstimation: [CBO] Unsupported join type: ExistenceJoin(id#35L) stats: Option[org.apache.spark.sql.catalyst.plans.logical.Statistics] = None |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
// FIXME Describe the purpose of the demo // Using Catalyst DSL import org.apache.spark.sql.catalyst.dsl.plans._ val t1 = table(ref = "t1") // HACK: Disable symbolToColumn implicit conversion // It is imported automatically in spark-shell (and makes demos impossible) // implicit def symbolToColumn(s: Symbol): org.apache.spark.sql.ColumnName trait ThatWasABadIdea implicit def symbolToColumn(ack: ThatWasABadIdea) = ack import org.apache.spark.sql.catalyst.dsl.expressions._ val id = 'id.long val t2 = table("t2") import org.apache.spark.sql.catalyst.plans.LeftSemi val plan = t1.join(t2, joinType = LeftSemi, condition = Some(id)) scala> println(plan.numberedTreeString) 00 'Join LeftSemi, id#2: bigint 01 :- 'UnresolvedRelation `t1` 02 +- 'UnresolvedRelation `t2` import org.apache.spark.sql.catalyst.plans.logical.Join val join = plan match { case j: Join => j } import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.JoinEstimation // FIXME java.lang.UnsupportedOperationException val stats = JoinEstimation(join).estimate |
Tip
|
Enable Add the following line to
Refer to Logging. |
estimateInnerOuterJoin
Internal Method
1 2 3 4 5 |
estimateInnerOuterJoin(): Option[Statistics] |
estimateInnerOuterJoin
destructures Join logical operator into a join type with the left and right keys.
estimateInnerOuterJoin
simply returns None
(i.e. nothing) when either side of the Join logical operator have no row count statistic.
Note
|
estimateInnerOuterJoin is used exclusively when JoinEstimation is requested to estimate statistics and query hints of a Join logical operator for Inner , Cross , LeftOuter , RightOuter and FullOuter joins.
|
computeByNdv
Internal Method
1 2 3 4 5 6 7 8 9 |
computeByNdv( leftKey: AttributeReference, rightKey: AttributeReference, newMin: Option[Any], newMax: Option[Any]): (BigInt, ColumnStat) |
computeByNdv
…FIXME
Note
|
computeByNdv is used exclusively when JoinEstimation is requested for computeCardinalityAndStats
|
computeCardinalityAndStats
Internal Method
1 2 3 4 5 6 |
computeCardinalityAndStats( keyPairs: Seq[(AttributeReference, AttributeReference)]): (BigInt, AttributeMap[ColumnStat]) |
computeCardinalityAndStats
…FIXME
Note
|
computeCardinalityAndStats is used exclusively when JoinEstimation is requested for estimateInnerOuterJoin
|
Computing Join Cardinality Using Equi-Height Histograms — computeByHistogram
Internal Method
1 2 3 4 5 6 7 8 9 10 11 |
computeByHistogram( leftKey: AttributeReference, rightKey: AttributeReference, leftHistogram: Histogram, rightHistogram: Histogram, newMin: Option[Any], newMax: Option[Any]): (BigInt, ColumnStat) |
computeByHistogram
…FIXME
Note
|
computeByHistogram is used exclusively when JoinEstimation is requested for computeCardinalityAndStats (and the histograms of both column attributes used in a join are available).
|
Estimating Statistics for Left Semi and Left Anti Joins — estimateLeftSemiAntiJoin
Internal Method
1 2 3 4 5 |
estimateLeftSemiAntiJoin(): Option[Statistics] |
estimateLeftSemiAntiJoin
estimates statistics of the Join logical operator only when estimated row count statistic is available. Otherwise, estimateLeftSemiAntiJoin
simply returns None
(i.e. no statistics estimated).
Note
|
row count statistic of a table is available only after ANALYZE TABLE COMPUTE STATISTICS SQL command. |
If available, estimateLeftSemiAntiJoin
takes the estimated row count statistic of the left side of the Join operator.
Note
|
Use ANALYZE TABLE COMPUTE STATISTICS SQL command on the left logical plan to compute row count statistics. |
Note
|
Use ANALYZE TABLE COMPUTE STATISTICS FOR COLUMNS SQL command on the left logical plan to generate column (equi-height) histograms for more accurate estimations. |
In the end, estimateLeftSemiAntiJoin
creates a new Statistics with the following estimates:
-
Total size (in bytes) is the output size for the output schema of the join, the row count statistic (aka output rows) and column histograms.
-
Row count is exactly the row count of the left side
-
Column histograms is exactly the column histograms of the left side
Note
|
estimateLeftSemiAntiJoin is used exclusively when JoinEstimation is requested to estimate statistics and query hints for LeftSemi and LeftAnti joins.
|
Estimating Statistics and Query Hints of Join Logical Operator — estimate
Method
1 2 3 4 5 |
estimate: Option[Statistics] |
-
For
Inner
,Cross
,LeftOuter
,RightOuter
andFullOuter
join types,estimate
estimateInnerOuterJoin -
For
LeftSemi
andLeftAnti
join types,estimate
estimateLeftSemiAntiJoin
For other join types, estimate
prints out the following DEBUG message to the logs and returns None
(to “announce” that no statistics could be computed).
1 2 3 4 5 |
[CBO] Unsupported join type: [joinType] |
Note
|
estimate is used exclusively when BasicStatsPlanVisitor is requested to estimate statistics and query hints of a Join logical operator.
|