Cost-Based Optimization (CBO) of Logical Query Plan
Cost-Based Optimization (aka Cost-Based Query Optimization or CBO Optimizer) is an optimization technique in Spark SQL that uses table statistics to determine the most efficient query execution plan of a structured query (given the logical query plan).
Cost-based optimization is disabled by default. Spark SQL uses spark.sql.cbo.enabled configuration property to control whether the CBO should be enabled and used for query optimization or not.
Cost-Based Optimization uses logical optimization rules (e.g. CostBasedJoinReorder) to optimize the logical plan of a structured query based on statistics.
You first use ANALYZE TABLE COMPUTE STATISTICS SQL command to compute table statistics. Use DESCRIBE EXTENDED SQL command to inspect the statistics.
Logical operators have statistics support that is used for query planning.
There is also support for equi-height column histograms.
Table Statistics
The table statistics can be computed for tables, partitions and columns and are as follows:
-
Total size (in bytes) of a table or table partitions
-
Row count of a table or table partitions
-
Column statistics, i.e. min, max, num_nulls, distinct_count, avg_col_len, max_col_len, histogram
spark.sql.cbo.enabled Spark SQL Configuration Property
Cost-based optimization is enabled when spark.sql.cbo.enabled configuration property is turned on, i.e. true
.
Note
|
spark.sql.cbo.enabled configuration property is turned off, i.e. false , by default.
|
Tip
|
Use SQLConf.cboEnabled to access the current value of spark.sql.cbo.enabled property.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
// CBO is disabled by default val sqlConf = spark.sessionState.conf scala> println(sqlConf.cboEnabled) false // Create a new SparkSession with CBO enabled // You could spark-submit -c spark.sql.cbo.enabled=true val sparkCboEnabled = spark.newSession import org.apache.spark.sql.internal.SQLConf.CBO_ENABLED sparkCboEnabled.conf.set(CBO_ENABLED.key, true) val isCboEnabled = sparkCboEnabled.conf.get(CBO_ENABLED.key) println(s"Is CBO enabled? $isCboEnabled") |
Note
|
CBO is disabled explicitly in Spark Structured Streaming. |
ANALYZE TABLE COMPUTE STATISTICS SQL Command
Cost-Based Optimization uses the statistics stored in a metastore (aka external catalog) using ANALYZE TABLE SQL command.
1 2 3 4 5 6 |
ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS (NOSCAN | FOR COLUMNS identifierSeq)? |
Depending on the variant, ANALYZE TABLE
computes different statistics, i.e. of a table, partitions or columns.
-
ANALYZE TABLE
with neitherPARTITION
specification norFOR COLUMNS
clause -
ANALYZE TABLE
withPARTITION
specification (but noFOR COLUMNS
clause) -
ANALYZE TABLE
withFOR COLUMNS
clause (but noPARTITION
specification)
Tip
|
Use spark.sql.statistics.histogram.enabled configuration property to enable column (equi-height) histograms that can provide better estimation accuracy but cause an extra table scan).
|
Note
|
In such a case,
|
When executed, the above ANALYZE TABLE
variants are translated to the following logical commands (in a logical query plan), respectively:
DESCRIBE EXTENDED SQL Command
You can view the statistics of a table, partitions or a column (stored in a metastore) using DESCRIBE EXTENDED SQL command.
1 2 3 4 5 6 |
(DESC | DESCRIBE) TABLE? (EXTENDED | FORMATTED)? tableIdentifier partitionSpec? describeColName? |
Table-level statistics are in Statistics row while partition-level statistics are in Partition Statistics row.
Tip
|
Use DESC EXTENDED tableName for table-level statistics and DESC EXTENDED tableName PARTITION (p1, p2, …) for partition-level statistics only.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
// table-level statistics are in Statistics row scala> sql("DESC EXTENDED t1").show(numRows = 30, truncate = false) +----------------------------+--------------------------------------------------------------+-------+ |col_name |data_type |comment| +----------------------------+--------------------------------------------------------------+-------+ |id |int |null | |p1 |int |null | |p2 |string |null | |# Partition Information | | | |# col_name |data_type |comment| |p1 |int |null | |p2 |string |null | | | | | |# Detailed Table Information| | | |Database |default | | |Table |t1 | | |Owner |jacek | | |Created Time |Wed Dec 27 14:10:44 CET 2017 | | |Last Access |Thu Jan 01 01:00:00 CET 1970 | | |Created By |Spark 2.3.0 | | |Type |MANAGED | | |Provider |parquet | | |Table Properties |[transient_lastDdlTime=1514453141] | | |Statistics |714 bytes, 2 rows | | |Location |file:/Users/jacek/dev/oss/spark/spark-warehouse/t1 | | |Serde Library |org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe | | |InputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | | |OutputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat| | |Storage Properties |[serialization.format=1] | | |Partition Provider |Catalog | | +----------------------------+--------------------------------------------------------------+-------+ scala> spark.table("t1").show +---+---+----+ | id| p1| p2| +---+---+----+ | 0| 0|zero| | 1| 1| one| +---+---+----+ // partition-level statistics are in Partition Statistics row scala> sql("DESC EXTENDED t1 PARTITION (p1=0, p2='zero')").show(numRows = 30, truncate = false) +--------------------------------+---------------------------------------------------------------------------------+-------+ |col_name |data_type |comment| +--------------------------------+---------------------------------------------------------------------------------+-------+ |id |int |null | |p1 |int |null | |p2 |string |null | |# Partition Information | | | |# col_name |data_type |comment| |p1 |int |null | |p2 |string |null | | | | | |# Detailed Partition Information| | | |Database |default | | |Table |t1 | | |Partition Values |[p1=0, p2=zero] | | |Location |file:/Users/jacek/dev/oss/spark/spark-warehouse/t1/p1=0/p2=zero | | |Serde Library |org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe | | |InputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | | |OutputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat | | |Storage Properties |[path=file:/Users/jacek/dev/oss/spark/spark-warehouse/t1, serialization.format=1]| | |Partition Parameters |{numFiles=1, transient_lastDdlTime=1514469540, totalSize=357} | | |Partition Statistics |357 bytes, 1 rows | | | | | | |# Storage Information | | | |Location |file:/Users/jacek/dev/oss/spark/spark-warehouse/t1 | | |Serde Library |org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe | | |InputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | | |OutputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat | | |Storage Properties |[serialization.format=1] | | +--------------------------------+---------------------------------------------------------------------------------+-------+ |
You can view the statistics of a single column using DESC EXTENDED tableName columnName
that are in a Dataset with two columns, i.e. info_name
and info_value
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
scala> sql("DESC EXTENDED t1 id").show +--------------+----------+ |info_name |info_value| +--------------+----------+ |col_name |id | |data_type |int | |comment |NULL | |min |0 | |max |1 | |num_nulls |0 | |distinct_count|2 | |avg_col_len |4 | |max_col_len |4 | |histogram |NULL | +--------------+----------+ scala> sql("DESC EXTENDED t1 p1").show +--------------+----------+ |info_name |info_value| +--------------+----------+ |col_name |p1 | |data_type |int | |comment |NULL | |min |0 | |max |1 | |num_nulls |0 | |distinct_count|2 | |avg_col_len |4 | |max_col_len |4 | |histogram |NULL | +--------------+----------+ scala> sql("DESC EXTENDED t1 p2").show +--------------+----------+ |info_name |info_value| +--------------+----------+ |col_name |p2 | |data_type |string | |comment |NULL | |min |NULL | |max |NULL | |num_nulls |0 | |distinct_count|2 | |avg_col_len |4 | |max_col_len |4 | |histogram |NULL | +--------------+----------+ |
Cost-Based Optimizations
The Spark Optimizer uses heuristics (rules) that are applied to a logical query plan for cost-based optimization.
Among the optimization rules are the following:
-
CostBasedJoinReorder logical optimization rule for join reordering with 2 or more consecutive inner or cross joins (possibly separated by
Project
operators) when spark.sql.cbo.enabled and spark.sql.cbo.joinReorder.enabled configuration properties are both enabled.
Logical Commands for Altering Table Statistics
The following are the logical commands that alter table statistics in a metastore (aka external catalog):
-
AlterTableAddPartitionCommand
-
AlterTableDropPartitionCommand
-
AlterTableSetLocationCommand
-
TruncateTableCommand
-
LoadDataCommand
EXPLAIN COST SQL Command
Caution
|
FIXME See LogicalPlanStats |
LogicalPlanStats — Statistics Estimates of Logical Operator
LogicalPlanStats adds statistics support to logical operators and is used for query planning (with or without cost-based optimization, e.g. CostBasedJoinReorder or JoinSelection, respectively).
Equi-Height Histograms for Columns
Equi-height histogram is effective in handling skewed data distribution.
For equi-height histogram, the heights of all bins(intervals) are the same. The default number of bins we use is 254.
Now we use a two-step method to generate an equi-height histogram:
1. use percentile_approx to get percentiles (end points of the equi-height bin intervals);
2. use a new aggregate function to get distinct counts in each of these bins.Note that this method takes two table scans. In the future we may provide other algorithms which need only one table scan.
Equi-height histogram is effective in cardinality estimation, and more accurate than basic column stats (min, max, ndv, etc) especially in skew distribution.
For equi-height histogram, all buckets (intervals) have the same height (frequency).
we use a two-step method to generate an equi-height histogram:
use ApproximatePercentile to get percentiles p(0), p(1/n), p(2/n) … p((n-1)/n), p(1);
construct range values of buckets, e.g. [p(0), p(1/n)], [p(1/n), p(2/n)] … [p((n-1)/n), p(1)], and use ApproxCountDistinctForIntervals to count ndv in each bucket. Each bucket is of the form: (lowerBound, higherBound, ndv).
Spark SQL uses column statistics that may optionally hold the histogram of values (which is empty by default). With spark.sql.statistics.histogram.enabled configuration property turned on ANALYZE TABLE COMPUTE STATISTICS FOR COLUMNS SQL command generates column (equi-height) histograms.
Note
|
spark.sql.statistics.histogram.enabled is off by default.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
// Computing column statistics with histogram // ./bin/spark-shell --conf spark.sql.statistics.histogram.enabled=true scala> spark.sessionState.conf.histogramEnabled res1: Boolean = true val tableName = "t1" // Make the example reproducible import org.apache.spark.sql.catalyst.TableIdentifier val tid = TableIdentifier(tableName) val sessionCatalog = spark.sessionState.catalog sessionCatalog.dropTable(tid, ignoreIfNotExists = true, purge = true) // CREATE TABLE t1 Seq((0, 0, "zero"), (1, 1, "one")). toDF("id", "p1", "p2"). write. saveAsTable(tableName) // As we drop and create immediately we may face problems with unavailable partition files // Invalidate cache spark.sql(s"REFRESH TABLE $tableName") // Use ANALYZE TABLE...FOR COLUMNS to compute column statistics // that saves them in a metastore (aka an external catalog) val df = spark.table(tableName) val allCols = df.columns.mkString(",") val analyzeTableSQL = s"ANALYZE TABLE t1 COMPUTE STATISTICS FOR COLUMNS $allCols" spark.sql(analyzeTableSQL) // Column statistics with histogram should be in the external catalog (metastore) |
You can inspect the column statistics using DESCRIBE EXTENDED SQL command.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
// Inspecting column statistics with column histogram // See the above example for how to compute the stats val colName = "id" val descExtSQL = s"DESC EXTENDED $tableName $colName" // 254 bins by default --> num_of_bins in histogram row below scala> sql(descExtSQL).show(truncate = false) +--------------+-----------------------------------------------------+ |info_name |info_value | +--------------+-----------------------------------------------------+ |col_name |id | |data_type |int | |comment |NULL | |min |0 | |max |1 | |num_nulls |0 | |distinct_count|2 | |avg_col_len |4 | |max_col_len |4 | |histogram |height: 0.007874015748031496, num_of_bins: 254 | |bin_0 |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1| |bin_1 |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1| |bin_2 |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1| |bin_3 |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1| |bin_4 |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1| |bin_5 |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1| |bin_6 |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1| |bin_7 |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1| |bin_8 |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1| |bin_9 |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1| +--------------+-----------------------------------------------------+ only showing top 20 rows |