关注 spark技术分享,
撸spark源码 玩spark最佳实践

Bucketing

Bucketing

Bucketing is an optimization technique that uses buckets (and bucketing columns) to determine data partitioning and avoid data shuffle.

The motivation is to optimize performance of a join query by avoiding shuffles (aka exchanges) of tables participating in the join. Bucketing results in fewer exchanges (and so stages).

Note
Bucketing can show the biggest benefit when pre-shuffled bucketed tables are used more than once as bucketing itself takes time (that you will offset executing multiple join queries later).

Bucketing is enabled by default. Spark SQL uses spark.sql.sources.bucketing.enabled configuration property to control whether bucketing should be enabled and used for query optimization or not.

Bucketing is used exclusively in FileSourceScanExec physical operator (when it is requested for the input RDD and to determine the partitioning and ordering of the output).

The above join query is a fine example of a SortMergeJoinExec (aka SortMergeJoin) of two FileSourceScanExecs (aka Scan). The join query uses ShuffleExchangeExec physical operators (aka Exchange) to shuffle the table datasets for the SortMergeJoin.

spark sql bucketing sortmergejoin filescans.png
Figure 1. SortMergeJoin of FileScans (Details for Query)

One way to avoid the exchanges (and so optimize the join query) is to use table bucketing that is applicable for all file-based data sources, e.g. Parquet, ORC, JSON, CSV, that are saved as a table using DataFrameWrite.saveAsTable or simply available in a catalog by SparkSession.table.

You use DataFrameWriter.bucketBy method to specify the number of buckets and the bucketing columns.

You can optionally sort the output rows in buckets using DataFrameWriter.sortBy method.

Note
DataFrameWriter.bucketBy and DataFrameWriter.sortBy simply set respective internal properties that eventually become a bucketing specification.

Unlike bucketing in Apache Hive, Spark SQL creates the bucket files per the number of buckets and partitions. In other words, the number of bucketing files is the number of buckets multiplied by the number of task writers (one per partition).

With bucketing, the Exchanges are no longer needed (as the tables are already pre-shuffled).

The above join query of the bucketed tables shows no ShuffleExchangeExec physical operators (aka Exchange) as the shuffling has already been executed (before the query was run).

spark sql bucketing sortmergejoin bucketed tables no exchanges.png
Figure 2. SortMergeJoin of Bucketed Tables (Details for Query)

The number of partitions of a bucketed table is exactly the number of buckets.

Use SessionCatalog or DESCRIBE EXTENDED SQL command to find the bucketing information.

The number of buckets has to be between 0 and 100000 exclusive or Spark SQL throws an AnalysisException:

There are however requirements that have to be met before Spark Optimizer gives a no-Exchange query plan:

  1. The number of partitions on both sides of a join has to be exactly the same.

  2. Both join operators have to use HashPartitioning partitioning scheme.

It is acceptable to use bucketing for one side of a join.

spark sql bucketing sortmergejoin one bucketed table.png
Figure 3. SortMergeJoin of One Bucketed Table (Details for Query)

Bucket Pruning — Optimizing Filtering on Bucketed Column (Reducing Bucket Files to Scan)

As of Spark 2.4, Spark SQL supports bucket pruning to optimize filtering on bucketed column (by reducing the number of bucket files to scan).

Bucket pruning supports the following predicate expressions:

  • EqualTo (=)

  • EqualNullSafe (<=>)

  • In

  • InSet

  • And and Or of the above

FileSourceStrategy execution planning strategy is responsible for selecting only LogicalRelations over HadoopFsRelation with the bucketing specification with the following:

  1. There is exactly one bucketing column

  2. The number of buckets is greater than 1

Sorting

Warning
There are two exchanges and sorts which makes the above use case almost unusable. I filed an issue at SPARK-24025 Join of bucketed and non-bucketed tables can give two exchanges and sorts for non-bucketed side.
spark sql bucketing sortmergejoin sorted dataset and bucketed table.png
Figure 4. SortMergeJoin of Sorted Dataset and Bucketed Table (Details for Query)

spark.sql.sources.bucketing.enabled Spark SQL Configuration Property

Bucketing is enabled when spark.sql.sources.bucketing.enabled configuration property is turned on (true) and it is by default.

Tip
Use SQLConf.bucketingEnabled to access the current value of spark.sql.sources.bucketing.enabled property.

赞(0) 打赏
未经允许不得转载:spark技术分享 » Bucketing
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏