FileSourceStrategy Execution Planning Strategy for LogicalRelations with HadoopFsRelation
FileSourceStrategy
is an execution planning strategy that plans scans over collections of files (possibly partitioned or bucketed).
FileSourceStrategy
is part of predefined strategies of the Spark Planner.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
import org.apache.spark.sql.execution.datasources.FileSourceStrategy // Enable INFO logging level to see the details of the strategy val logger = FileSourceStrategy.getClass.getName.replace("$", "") import org.apache.log4j.{Level, Logger} Logger.getLogger(logger).setLevel(Level.INFO) // Create a bucketed data source table val tableName = "bucketed_4_id" spark .range(100) .write .bucketBy(4, "id") .sortBy("id") .mode("overwrite") .saveAsTable(tableName) val q = spark.table(tableName) val plan = q.queryExecution.optimizedPlan val executionPlan = FileSourceStrategy(plan).head scala> println(executionPlan.numberedTreeString) 00 FileScan parquet default.bucketed_4_id[id#140L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/apps/spark-2.3.0-bin-hadoop2.7/spark-warehouse/bucketed_4..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint> import org.apache.spark.sql.execution.FileSourceScanExec val scan = executionPlan.collectFirst { case fsse: FileSourceScanExec => fsse }.get scala> :type scan org.apache.spark.sql.execution.FileSourceScanExec |
FileSourceScanExec
supports Bucket Pruning for LogicalRelations over HadoopFsRelation with the bucketing specification with the following:
-
There is exactly one bucketing column
-
The number of buckets is greater than 1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
// Using the table created above // There is exactly one bucketing column, i.e. id // The number of buckets is greater than 1, i.e. 4 val tableName = "bucketed_4_id" val q = spark.table(tableName).where($"id" isin (50, 90)) val qe = q.queryExecution val plan = qe.optimizedPlan scala> println(optimizedPlan.numberedTreeString) 00 Filter id#7L IN (50,90) 01 +- Relation[id#7L] parquet import org.apache.spark.sql.execution.datasources.FileSourceStrategy // Enable INFO logging level to see the details of the strategy val logger = FileSourceStrategy.getClass.getName.replace("$", "") import org.apache.log4j.{Level, Logger} Logger.getLogger(logger).setLevel(Level.INFO) scala> val executionPlan = FileSourceStrategy(plan).head 18/11/18 17:56:53 INFO FileSourceStrategy: Pruning directories with: 18/11/18 17:56:53 INFO FileSourceStrategy: Pruned 2 out of 4 buckets. 18/11/18 17:56:53 INFO FileSourceStrategy: Post-Scan Filters: id#7L IN (50,90) 18/11/18 17:56:53 INFO FileSourceStrategy: Output Data Schema: struct<id: bigint> 18/11/18 17:56:53 INFO FileSourceScanExec: Pushed Filters: In(id, [50,90]) executionPlan: org.apache.spark.sql.execution.SparkPlan = ... scala> println(executionPlan.numberedTreeString) 00 Filter id#7L IN (50,90) 01 +- FileScan parquet default.bucketed_4_id[id#7L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id], PartitionFilters: [], PushedFilters: [In(id, [50,90])], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 2 out of 4 |
Tip
|
Enable Add the following line to
Refer to Logging. |
collectProjectsAndFilters
Method
1 2 3 4 5 6 |
collectProjectsAndFilters(plan: LogicalPlan): (Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression]) |
collectProjectsAndFilters
is a pattern used to destructure a LogicalPlan that can be Project
or Filter
. Any other LogicalPlan
give an all-empty response.
Applying FileSourceStrategy Strategy to Logical Plan (Executing FileSourceStrategy) — apply
Method
1 2 3 4 5 |
apply(plan: LogicalPlan): Seq[SparkPlan] |
Note
|
apply is part of GenericStrategy Contract to generate a collection of SparkPlans for a given logical plan.
|
apply
uses PhysicalOperation Scala extractor object to destructure a logical query plan into a tuple of projection and filter expressions together with a leaf logical operator.
apply
only works with logical plans that are actually a LogicalRelation with a HadoopFsRelation (possibly as a child of Project and Filter logical operators).
apply
computes partitionKeyFilters
expression set with the filter expressions that are a subset of the partitionSchema of the HadoopFsRelation
.
apply
prints out the following INFO message to the logs:
1 2 3 4 5 |
Pruning directories with: [partitionKeyFilters] |
apply
computes afterScanFilters
predicate expressions that should be evaluated after the scan.
apply
prints out the following INFO message to the logs:
1 2 3 4 5 |
Post-Scan Filters: [afterScanFilters] |
apply
computes readDataColumns
attributes that are the required attributes except the partition columns.
apply
prints out the following INFO message to the logs:
1 2 3 4 5 |
Output Data Schema: [outputSchema] |
apply
creates a FileSourceScanExec physical operator.
If there are any afterScanFilter
predicate expressions, apply
creates a FilterExec physical operator with them and the FileSourceScanExec
operator.
If the output of the FilterExec
physical operator is different from the projects
expressions, apply
creates a ProjectExec physical operator with them and the FilterExec
or the FileSourceScanExec
operators.