Bucketing
Bucketing is an optimization technique that uses buckets (and bucketing columns) to determine data partitioning and avoid data shuffle.
The motivation is to optimize performance of a join query by avoiding shuffles (aka exchanges) of tables participating in the join. Bucketing results in fewer exchanges (and so stages).
Note
|
Bucketing can show the biggest benefit when pre-shuffled bucketed tables are used more than once as bucketing itself takes time (that you will offset executing multiple join queries later). |
Bucketing is enabled by default. Spark SQL uses spark.sql.sources.bucketing.enabled configuration property to control whether bucketing should be enabled and used for query optimization or not.
Bucketing is used exclusively in FileSourceScanExec physical operator (when it is requested for the input RDD and to determine the partitioning and ordering of the output).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
Example: SortMergeJoin of two FileScans import org.apache.spark.sql.SaveMode spark.range(10e4.toLong).write.mode(SaveMode.Overwrite).saveAsTable("t10e4") spark.range(10e6.toLong).write.mode(SaveMode.Overwrite).saveAsTable("t10e6") // Bucketing is enabled by default // Let's check it out anyway assert(spark.sessionState.conf.bucketingEnabled, "Bucketing disabled?!") // Make sure that you don't end up with a BroadcastHashJoin and a BroadcastExchange // For that, let's disable auto broadcasting spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) val tables = spark.catalog.listTables.where($"name" startsWith "t10e") scala> tables.show +-----+--------+-----------+---------+-----------+ | name|database|description|tableType|isTemporary| +-----+--------+-----------+---------+-----------+ |t10e4| default| null| MANAGED| false| |t10e6| default| null| MANAGED| false| +-----+--------+-----------+---------+-----------+ val t4 = spark.table("t10e4") val t6 = spark.table("t10e6") assert(t4.count == 10e4) assert(t6.count == 10e6) // trigger execution of the join query t4.join(t6, "id").foreach(_ => ()) |
The above join query is a fine example of a SortMergeJoinExec (aka SortMergeJoin) of two FileSourceScanExecs (aka Scan). The join query uses ShuffleExchangeExec physical operators (aka Exchange) to shuffle the table datasets for the SortMergeJoin.
One way to avoid the exchanges (and so optimize the join query) is to use table bucketing that is applicable for all file-based data sources, e.g. Parquet, ORC, JSON, CSV, that are saved as a table using DataFrameWrite.saveAsTable or simply available in a catalog by SparkSession.table.
Note
|
Bucketing is not supported for DataFrameWriter.save, DataFrameWriter.insertInto and DataFrameWriter.jdbc methods. |
You use DataFrameWriter.bucketBy method to specify the number of buckets and the bucketing columns.
You can optionally sort the output rows in buckets using DataFrameWriter.sortBy method.
1 2 3 4 5 6 7 8 |
people.write .bucketBy(42, "name") .sortBy("age") .saveAsTable("people_bucketed") |
Note
|
DataFrameWriter.bucketBy and DataFrameWriter.sortBy simply set respective internal properties that eventually become a bucketing specification. |
Unlike bucketing in Apache Hive, Spark SQL creates the bucket files per the number of buckets and partitions. In other words, the number of bucketing files is the number of buckets multiplied by the number of task writers (one per partition).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
val large = spark.range(10e6.toLong) import org.apache.spark.sql.SaveMode large.write .bucketBy(4, "id") .sortBy("id") .mode(SaveMode.Overwrite) .saveAsTable("bucketed_4_id") scala> println(large.queryExecution.toRdd.getNumPartitions) 8 // That gives 8 (partitions/task writers) x 4 (buckets) = 32 files // With _SUCCESS extra file and the ls -l header "total 794624" that gives 34 files $ ls -tlr spark-warehouse/bucketed_4_id | wc -l 34 |
With bucketing, the Exchanges are no longer needed (as the tables are already pre-shuffled).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
// Create bucketed tables import org.apache.spark.sql.SaveMode spark.range(10e4.toLong) .write .bucketBy(4, "id") .sortBy("id") .mode(SaveMode.Overwrite) .saveAsTable("bucketed_4_10e4") spark.range(10e6.toLong) .write .bucketBy(4, "id") .sortBy("id") .mode(SaveMode.Overwrite) .saveAsTable("bucketed_4_10e6") val bucketed_4_10e4 = spark.table("bucketed_4_10e4") val bucketed_4_10e6 = spark.table("bucketed_4_10e6") // trigger execution of the join query bucketed_4_10e4.join(bucketed_4_10e6, "id").foreach(_ => ()) |
The above join query of the bucketed tables shows no ShuffleExchangeExec physical operators (aka Exchange) as the shuffling has already been executed (before the query was run).
The number of partitions of a bucketed table is exactly the number of buckets.
1 2 3 4 5 6 7 |
val bucketed_4_10e4 = spark.table("bucketed_4_10e4") val numPartitions = bucketed_4_10e4.queryExecution.toRdd.getNumPartitions assert(numPartitions == 4) |
Use SessionCatalog or DESCRIBE EXTENDED
SQL command to find the bucketing information.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
val bucketed_tables = spark.catalog.listTables.where($"name" startsWith "bucketed_") scala> bucketed_tables.show +---------------+--------+-----------+---------+-----------+ | name|database|description|tableType|isTemporary| +---------------+--------+-----------+---------+-----------+ |bucketed_4_10e4| default| null| MANAGED| false| |bucketed_4_10e6| default| null| MANAGED| false| +---------------+--------+-----------+---------+-----------+ val demoTable = "bucketed_4_10e4" // DESC EXTENDED or DESC FORMATTED would also work val describeSQL = sql(s"DESCRIBE EXTENDED $demoTable") scala> describeSQL.show(numRows = 21, truncate = false) +----------------------------+---------------------------------------------------------------+-------+ |col_name |data_type |comment| +----------------------------+---------------------------------------------------------------+-------+ |id |bigint |null | | | | | |# Detailed Table Information| | | |Database |default | | |Table |bucketed_4_10e4 | | |Owner |jacek | | |Created Time |Tue Oct 02 10:50:50 CEST 2018 | | |Last Access |Thu Jan 01 01:00:00 CET 1970 | | |Created By |Spark 2.3.2 | | |Type |MANAGED | | |Provider |parquet | | |Num Buckets |4 | | |Bucket Columns |[`id`] | | |Sort Columns |[`id`] | | |Table Properties |[transient_lastDdlTime=1538470250] | | |Statistics |413954 bytes | | |Location |file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_10e4| | |Serde Library |org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | | |InputFormat |org.apache.hadoop.mapred.SequenceFileInputFormat | | |OutputFormat |org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat | | |Storage Properties |[serialization.format=1] | | +----------------------------+---------------------------------------------------------------+-------+ import org.apache.spark.sql.catalyst.TableIdentifier val metadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(demoTable)) scala> metadata.bucketSpec.foreach(println) 4 buckets, bucket columns: [id], sort columns: [id] |
The number of buckets has to be between 0
and 100000
exclusive or Spark SQL throws an AnalysisException
:
1 2 3 4 5 |
Number of buckets should be greater than 0 but less than 100000. Got `[numBuckets]` |
There are however requirements that have to be met before Spark Optimizer gives a no-Exchange query plan:
-
The number of partitions on both sides of a join has to be exactly the same.
-
Both join operators have to use HashPartitioning partitioning scheme.
It is acceptable to use bucketing for one side of a join.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
// Make sure that you don't end up with a BroadcastHashJoin and a BroadcastExchange // For this, let's disable auto broadcasting spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) val bucketedTableName = "bucketed_4_id" val large = spark.range(10e5.toLong) import org.apache.spark.sql.SaveMode large.write .bucketBy(4, "id") .sortBy("id") .mode(SaveMode.Overwrite) .saveAsTable(bucketedTableName) val bucketedTable = spark.table(bucketedTableName) val t1 = spark .range(4) .repartition(4, $"id") // Make sure that the number of partitions matches the other side val q = t1.join(bucketedTable, "id") scala> q.explain == Physical Plan == *(4) Project [id#169L] +- *(4) SortMergeJoin [id#169L], [id#167L], Inner :- *(2) Sort [id#169L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#169L, 4) : +- *(1) Range (0, 4, step=1, splits=8) +- *(3) Sort [id#167L ASC NULLS FIRST], false, 0 +- *(3) Project [id#167L] +- *(3) Filter isnotnull(id#167L) +- *(3) FileScan parquet default.bucketed_4_id[id#167L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> q.foreach(_ => ()) |
Bucket Pruning — Optimizing Filtering on Bucketed Column (Reducing Bucket Files to Scan)
As of Spark 2.4, Spark SQL supports bucket pruning to optimize filtering on bucketed column (by reducing the number of bucket files to scan).
Bucket pruning supports the following predicate expressions:
FileSourceStrategy execution planning strategy is responsible for selecting only LogicalRelations over HadoopFsRelation with the bucketing specification with the following:
-
There is exactly one bucketing column
-
The number of buckets is greater than 1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
Example: Bucket Pruning // Enable INFO logging level of FileSourceStrategy logger to see the details of the strategy import org.apache.spark.sql.execution.datasources.FileSourceStrategy val logger = FileSourceStrategy.getClass.getName.replace("$", "") import org.apache.log4j.{Level, Logger} Logger.getLogger(logger).setLevel(Level.INFO) val q57 = q.where($"id" isin (50, 70)) scala> val sparkPlan57 = q57.queryExecution.executedPlan 18/11/17 23:18:04 INFO FileSourceStrategy: Pruning directories with: 18/11/17 23:18:04 INFO FileSourceStrategy: Pruned 2 out of 4 buckets. 18/11/17 23:18:04 INFO FileSourceStrategy: Post-Scan Filters: id#0L IN (50,70) 18/11/17 23:18:04 INFO FileSourceStrategy: Output Data Schema: struct<id: bigint> 18/11/17 23:18:04 INFO FileSourceScanExec: Pushed Filters: In(id, [50,70]) ... scala> println(sparkPlan57.numberedTreeString) 00 *(1) Filter id#0L IN (50,70) 01 +- *(1) FileScan parquet default.bucketed_4_id[id#0L,part#1L] Batched: true, Format: Parquet, Location: CatalogFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id], PartitionCount: 2, PartitionFilters: [], PushedFilters: [In(id, [50,70])], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 2 out of 4 import org.apache.spark.sql.execution.FileSourceScanExec val scan57 = sparkPlan57.collectFirst { case exec: FileSourceScanExec => exec }.get import org.apache.spark.sql.execution.datasources.FileScanRDD val rdd57 = scan57.inputRDDs.head.asInstanceOf[FileScanRDD] import org.apache.spark.sql.execution.datasources.FilePartition val bucketFiles57 = for { FilePartition(bucketId, files) <- rdd57.filePartitions f <- files } yield s"Bucket $bucketId => $f" scala> println(bucketFiles57.size) 24 |
Sorting
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
// Make sure that you don't end up with a BroadcastHashJoin and a BroadcastExchange // Disable auto broadcasting spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) val bucketedTableName = "bucketed_4_id" val large = spark.range(10e5.toLong) import org.apache.spark.sql.SaveMode large.write .bucketBy(4, "id") .sortBy("id") .mode(SaveMode.Overwrite) .saveAsTable(bucketedTableName) // Describe the table and include bucketing spec only val descSQL = sql(s"DESC FORMATTED $bucketedTableName") .filter($"col_name".contains("Bucket") || $"col_name" === "Sort Columns") scala> descSQL.show +--------------+---------+-------+ | col_name|data_type|comment| +--------------+---------+-------+ | Num Buckets| 4| | |Bucket Columns| [`id`]| | | Sort Columns| [`id`]| | +--------------+---------+-------+ val bucketedTable = spark.table(bucketedTableName) val t1 = spark.range(4) .repartition(2, $"id") // Use just 2 partitions .sortWithinPartitions("id") // sort partitions val q = t1.join(bucketedTable, "id") // Note two exchanges and sorts scala> q.explain == Physical Plan == *(5) Project [id#205L] +- *(5) SortMergeJoin [id#205L], [id#203L], Inner :- *(3) Sort [id#205L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#205L, 4) : +- *(2) Sort [id#205L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#205L, 2) : +- *(1) Range (0, 4, step=1, splits=8) +- *(4) Sort [id#203L ASC NULLS FIRST], false, 0 +- *(4) Project [id#203L] +- *(4) Filter isnotnull(id#203L) +- *(4) FileScan parquet default.bucketed_4_id[id#203L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> q.foreach(_ => ()) |
Warning
|
There are two exchanges and sorts which makes the above use case almost unusable. I filed an issue at SPARK-24025 Join of bucketed and non-bucketed tables can give two exchanges and sorts for non-bucketed side. |
spark.sql.sources.bucketing.enabled Spark SQL Configuration Property
Bucketing is enabled when spark.sql.sources.bucketing.enabled configuration property is turned on (true
) and it is by default.
Tip
|
Use SQLConf.bucketingEnabled to access the current value of spark.sql.sources.bucketing.enabled property.
|
1 2 3 4 5 6 |
// Bucketing is on by default assert(spark.sessionState.conf.bucketingEnabled, "Bucketing disabled?!") |