PrunedFilteredScan Contract — Relations with Column Pruning and Filter Pushdown
PrunedFilteredScan Contract — Relations with Column Pruning and Filter Pushdown
PrunedFilteredScan
is the contract of BaseRelations with support for column pruning (i.e. eliminating unneeded columns) and filter pushdown (i.e. filtering using selected predicates only).
1 2 3 4 5 6 7 8 9 |
package org.apache.spark.sql.sources trait PrunedFilteredScan { def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] } |
Property | Description |
---|---|
|
Building distributed data scan with column pruning and filter pushdown In other words, Used exclusively when |
Note
|
PrunedFilteredScan is a “lighter” and stable version of the CatalystScan Contract.
|
Note
|
JDBCRelation is the one and only known implementation of the PrunedFilteredScan Contract in Spark SQL. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
// Use :paste to define MyBaseRelation case class // BEGIN import org.apache.spark.sql.sources.PrunedFilteredScan import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.{StructField, StructType, StringType} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.sources.Filter import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row case class MyBaseRelation(sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan { override def schema: StructType = StructType(StructField("a", StringType) :: Nil) def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { println(s">>> [buildScan] requiredColumns = ${requiredColumns.mkString(",")}") println(s">>> [buildScan] filters = ${filters.mkString(",")}") import sqlContext.implicits._ (0 to 4).toDF.rdd } } // END val scan = MyBaseRelation(spark.sqlContext) import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.LogicalRelation val plan: LogicalPlan = LogicalRelation(scan) scala> println(plan.numberedTreeString) 00 Relation[a#1] MyBaseRelation(org.apache.spark.sql.SQLContext@4a57ad67) import org.apache.spark.sql.execution.datasources.DataSourceStrategy val strategy = DataSourceStrategy(spark.sessionState.conf) val sparkPlan = strategy(plan).head // >>> [buildScan] requiredColumns = a // >>> [buildScan] filters = scala> println(sparkPlan.numberedTreeString) 00 Scan MyBaseRelation(org.apache.spark.sql.SQLContext@4a57ad67) [a#8] PushedFilters: [], ReadSchema: struct<a:string> |
InsertableRelation Contract — Non-File-Based Relations with Inserting or Overwriting Data Support
InsertableRelation Contract — Non-File-Based Relations with Inserting or Overwriting Data Support
InsertableRelation
is the contract of non-file-based BaseRelations that support inserting or overwriting data.
1 2 3 4 5 6 7 8 9 |
package org.apache.spark.sql.sources trait InsertableRelation { def insert(data: DataFrame, overwrite: Boolean): Unit } |
Property | Description |
---|---|
|
Inserts or overwrites data (as DataFrame) in a relation Used exclusively when |
Note
|
JDBCRelation is the one and only known direct implementation of InsertableRelation Contract in Spark SQL. |
CatalystScan Contract
HadoopFsRelation — Relation for File-Based Data Source
HadoopFsRelation — Relation for File-Based Data Source
HadoopFsRelation
is a BaseRelation and FileRelation.
HadoopFsRelation
is created when:
-
HiveMetastoreCatalog
is requested to convertToLogicalRelation (whenRelationConversions
logical evaluation rule is requested to convert a HiveTableRelation to a LogicalRelation forparquet
ornative
andhive
ORC storage formats -
DataSource
is requested to create a BaseRelation (for a non-streaming file-based data source)
The optional BucketSpec is defined exclusively for a non-streaming file-based data source and used for the following:
-
Output partitioning scheme and output data ordering of the corresponding FileSourceScanExec physical operator
-
DataSourceAnalysis post-hoc logical resolution rule (when executed on a InsertIntoTable logical operator over a LogicalRelation with
HadoopFsRelation
relation)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
CAUTION: Demo the different cases when `HadoopFsRelation` is created import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} // Example 1: spark.table for DataSource tables (provider != hive) import org.apache.spark.sql.catalyst.TableIdentifier val t1ID = TableIdentifier(tableName = "t1") spark.sessionState.catalog.dropTable(name = t1ID, ignoreIfNotExists = true, purge = true) spark.range(5).write.saveAsTable("t1") val metadata = spark.sessionState.catalog.getTableMetadata(t1ID) scala> println(metadata.provider.get) parquet assert(metadata.provider.get != "hive") val q = spark.table("t1") // Avoid dealing with UnresolvedRelations and SubqueryAliases // Hence going stright for optimizedPlan val plan1 = q.queryExecution.optimizedPlan scala> println(plan1.numberedTreeString) 00 Relation[id#7L] parquet val LogicalRelation(rel1, _, _, _) = plan1.asInstanceOf[LogicalRelation] val hadoopFsRel = rel1.asInstanceOf[HadoopFsRelation] // Example 2: spark.read with format as a `FileFormat` val q = spark.read.text("README.md") val plan2 = q.queryExecution.logical scala> println(plan2.numberedTreeString) 00 Relation[value#2] text val LogicalRelation(relation, _, _, _) = plan2.asInstanceOf[LogicalRelation] val hadoopFsRel = relation.asInstanceOf[HadoopFsRelation] // Example 3: Bucketing specified val tableName = "bucketed_4_id" spark .range(100000000) .write .bucketBy(4, "id") .sortBy("id") .mode("overwrite") .saveAsTable(tableName) val q = spark.table(tableName) // Avoid dealing with UnresolvedRelations and SubqueryAliases // Hence going stright for optimizedPlan val plan3 = q.queryExecution.optimizedPlan scala> println(plan3.numberedTreeString) 00 Relation[id#52L] parquet val LogicalRelation(rel3, _, _, _) = plan3.asInstanceOf[LogicalRelation] val hadoopFsRel = rel3.asInstanceOf[HadoopFsRelation] val bucketSpec = hadoopFsRel.bucketSpec.get // Exercise 3: spark.table for Hive tables (provider == hive) |
Creating HadoopFsRelation Instance
HadoopFsRelation
takes the following when created:
-
Partition schema
-
Data schema
-
Optional bucketing specification
HadoopFsRelation
initializes the internal registries and counters.
BaseRelation — Collection of Tuples with Schema
BaseRelation — Collection of Tuples with Schema
Note
|
“Data source”, “relation” and “table” are often used as synonyms. |
1 2 3 4 5 6 7 8 9 10 11 12 |
package org.apache.spark.sql.sources abstract class BaseRelation { // only required properties (vals and methods) that have no implementation // the others follow def schema: StructType def sqlContext: SQLContext } |
Method | Description |
---|---|
|
StructType that describes the schema of tuples |
|
BaseRelation
is “created” when DataSource
is requested to resolve a relation.
BaseRelation
is transformed into a DataFrame
when SparkSession
is requested to create a DataFrame.
BaseRelation
uses needConversion flag to control type conversion of objects inside Rows to Catalyst types, e.g. java.lang.String
to UTF8String
.
Note
|
It is recommended that custom data sources (outside Spark SQL) should leave needConversion flag enabled, i.e. true .
|
BaseRelation
can optionally give an estimated size (in bytes).
BaseRelation | Description |
---|---|
|
|
Should JVM Objects Inside Rows Be Converted to Catalyst Types? — needConversion
Method
1 2 3 4 5 |
needConversion: Boolean |
needConversion
flag is enabled (true
) by default.
Note
|
It is recommended to leave needConversion enabled for data sources outside Spark SQL.
|
Note
|
needConversion is used exclusively when DataSourceStrategy execution planning strategy is executed (and does the RDD conversion from RDD[Row] to RDD[InternalRow] ).
|
Finding Unhandled Filter Predicates — unhandledFilters
Method
1 2 3 4 5 |
unhandledFilters(filters: Array[Filter]): Array[Filter] |
unhandledFilters
returns Filter predicates that the data source does not support (handle) natively.
Note
|
unhandledFilters returns the input filters by default as it is considered safe to double evaluate filters regardless whether they could be supported or not.
|
Note
|
unhandledFilters is used exclusively when DataSourceStrategy execution planning strategy is requested to selectFilters.
|
Requesting Estimated Size — sizeInBytes
Method
1 2 3 4 5 |
sizeInBytes: Long |
sizeInBytes
is the estimated size of a relation (used in query planning).
Note
|
sizeInBytes defaults to spark.sql.defaultSizeInBytes internal property (i.e. infinite).
|
Note
|
sizeInBytes is used exclusively when LogicalRelation is requested to computeStats (and they are not available in CatalogTable).
|
SchemaRelationProvider Contract — Relation Providers With Mandatory User-Defined Schema
SchemaRelationProvider Contract — Relation Providers With Mandatory User-Defined Schema
SchemaRelationProvider
is the contract of BaseRelation providers that require a user-defined schema while creating a relation.
The requirement of specifying a user-defined schema is enforced when DataSource
is requested for a BaseRelation for a given data source format. If not specified, DataSource
throws a AnalysisException
:
1 2 3 4 5 |
A schema needs to be specified when using [className]. |
1 2 3 4 5 6 7 8 9 10 11 12 |
package org.apache.spark.sql.sources trait SchemaRelationProvider { def createRelation( sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation } |
Method | Description |
---|---|
|
Creates a BaseRelation for the user-defined schema Used exclusively when |
Note
|
There are no known direct implementation of PrunedFilteredScan Contract in Spark SQL. |
Tip
|
Use RelationProvider for data source providers with schema inference. |
Tip
|
Use both SchemaRelationProvider and RelationProvider if a data source should support both schema inference and user-defined schemas.
|
RelationProvider Contract — Relation Providers With Schema Inference
RelationProvider Contract — Relation Providers With Schema Inference
RelationProvider
is the contract of BaseRelation providers that create a relation with schema inference.
Note
|
Schema inference is also called schema discovery. |
The requirement of not specifying a user-defined schema or having one that does not match the relation is enforced when DataSource
is requested for a BaseRelation for a given data source format. If specified and does not match, DataSource
throws a AnalysisException
:
1 2 3 4 5 |
[className] does not allow user-specified schemas. |
1 2 3 4 5 6 7 8 9 10 11 |
package org.apache.spark.sql.sources trait RelationProvider { def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation } |
Method | Description |
---|---|
|
Creates a BaseRelation for loading data from an external data source Used exclusively when |
RelationProvider | Description |
---|---|
Tip
|
Use SchemaRelationProvider for relation providers that require a user-defined schema. |
DataSourceRegister Contract — Registering Data Source Format
DataSourceRegister Contract — Registering Data Source Format
DataSourceRegister
is a contract to register a DataSource provider under shortName
alias (so it can be looked up by the alias not its fully-qualified class name).
1 2 3 4 5 6 7 8 9 |
package org.apache.spark.sql.sources trait DataSourceRegister { def shortName(): String } |
Data Source Format Discovery — Registering Data Source By Short Name (Alias)
Caution
|
FIXME Describe how Java’s ServiceLoader works to find all DataSourceRegister provider classes on the CLASSPATH. |
Any DataSourceRegister
has to register itself in META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
file to…FIXME
CreatableRelationProvider Contract — Data Sources That Write Rows Per Save Mode
CreatableRelationProvider Contract — Data Sources That Write Rows Per Save Mode
CreatableRelationProvider
is the contract for data source providers that want to support writing the rows of a structured query (a DataFrame) per save mode.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
package org.apache.spark.sql.sources trait CreatableRelationProvider { def createRelation( sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation } |
Method | Description |
---|---|
|
The save mode specifies what happens when the destination already exists and can be one of the following: |
CreatableRelationProvider
is used when:
-
DataSource
is requested to write the result of a structured query to data source per save mode (afterDataFrameWriter
is requested to save) -
DataSource
is requested to write the result of a structured query to data source per save mode followed by reading rows back (afterDataFrameWriter
is requested to save to a non-Hive table or for Create Table As Select (CTAS) SQL statements)
CreatableRelationProvider | Description |
---|---|
Data source provider for JDBC data source |
|
Data source provider for Kafka data source |