PrunedFilteredScan Contract — Relations with Column Pruning and Filter Pushdown
PrunedFilteredScan
is the contract of BaseRelations with support for column pruning (i.e. eliminating unneeded columns) and filter pushdown (i.e. filtering using selected predicates only).
1 2 3 4 5 6 7 8 9 |
package org.apache.spark.sql.sources trait PrunedFilteredScan { def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] } |
Property | Description |
---|---|
|
Building distributed data scan with column pruning and filter pushdown In other words, Used exclusively when |
Note
|
PrunedFilteredScan is a “lighter” and stable version of the CatalystScan Contract.
|
Note
|
JDBCRelation is the one and only known implementation of the PrunedFilteredScan Contract in Spark SQL. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
// Use :paste to define MyBaseRelation case class // BEGIN import org.apache.spark.sql.sources.PrunedFilteredScan import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.{StructField, StructType, StringType} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.sources.Filter import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row case class MyBaseRelation(sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan { override def schema: StructType = StructType(StructField("a", StringType) :: Nil) def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { println(s">>> [buildScan] requiredColumns = ${requiredColumns.mkString(",")}") println(s">>> [buildScan] filters = ${filters.mkString(",")}") import sqlContext.implicits._ (0 to 4).toDF.rdd } } // END val scan = MyBaseRelation(spark.sqlContext) import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.LogicalRelation val plan: LogicalPlan = LogicalRelation(scan) scala> println(plan.numberedTreeString) 00 Relation[a#1] MyBaseRelation(org.apache.spark.sql.SQLContext@4a57ad67) import org.apache.spark.sql.execution.datasources.DataSourceStrategy val strategy = DataSourceStrategy(spark.sessionState.conf) val sparkPlan = strategy(plan).head // >>> [buildScan] requiredColumns = a // >>> [buildScan] filters = scala> println(sparkPlan.numberedTreeString) 00 Scan MyBaseRelation(org.apache.spark.sql.SQLContext@4a57ad67) [a#8] PushedFilters: [], ReadSchema: struct<a:string> |