HadoopFsRelation — Relation for File-Based Data Source
HadoopFsRelation
is a BaseRelation and FileRelation.
HadoopFsRelation
is created when:
-
HiveMetastoreCatalog
is requested to convertToLogicalRelation (whenRelationConversions
logical evaluation rule is requested to convert a HiveTableRelation to a LogicalRelation forparquet
ornative
andhive
ORC storage formats -
DataSource
is requested to create a BaseRelation (for a non-streaming file-based data source)
The optional BucketSpec is defined exclusively for a non-streaming file-based data source and used for the following:
-
Output partitioning scheme and output data ordering of the corresponding FileSourceScanExec physical operator
-
DataSourceAnalysis post-hoc logical resolution rule (when executed on a InsertIntoTable logical operator over a LogicalRelation with
HadoopFsRelation
relation)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
CAUTION: Demo the different cases when `HadoopFsRelation` is created import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} // Example 1: spark.table for DataSource tables (provider != hive) import org.apache.spark.sql.catalyst.TableIdentifier val t1ID = TableIdentifier(tableName = "t1") spark.sessionState.catalog.dropTable(name = t1ID, ignoreIfNotExists = true, purge = true) spark.range(5).write.saveAsTable("t1") val metadata = spark.sessionState.catalog.getTableMetadata(t1ID) scala> println(metadata.provider.get) parquet assert(metadata.provider.get != "hive") val q = spark.table("t1") // Avoid dealing with UnresolvedRelations and SubqueryAliases // Hence going stright for optimizedPlan val plan1 = q.queryExecution.optimizedPlan scala> println(plan1.numberedTreeString) 00 Relation[id#7L] parquet val LogicalRelation(rel1, _, _, _) = plan1.asInstanceOf[LogicalRelation] val hadoopFsRel = rel1.asInstanceOf[HadoopFsRelation] // Example 2: spark.read with format as a `FileFormat` val q = spark.read.text("README.md") val plan2 = q.queryExecution.logical scala> println(plan2.numberedTreeString) 00 Relation[value#2] text val LogicalRelation(relation, _, _, _) = plan2.asInstanceOf[LogicalRelation] val hadoopFsRel = relation.asInstanceOf[HadoopFsRelation] // Example 3: Bucketing specified val tableName = "bucketed_4_id" spark .range(100000000) .write .bucketBy(4, "id") .sortBy("id") .mode("overwrite") .saveAsTable(tableName) val q = spark.table(tableName) // Avoid dealing with UnresolvedRelations and SubqueryAliases // Hence going stright for optimizedPlan val plan3 = q.queryExecution.optimizedPlan scala> println(plan3.numberedTreeString) 00 Relation[id#52L] parquet val LogicalRelation(rel3, _, _, _) = plan3.asInstanceOf[LogicalRelation] val hadoopFsRel = rel3.asInstanceOf[HadoopFsRelation] val bucketSpec = hadoopFsRel.bucketSpec.get // Exercise 3: spark.table for Hive tables (provider == hive) |
Creating HadoopFsRelation Instance
HadoopFsRelation
takes the following when created:
-
Partition schema
-
Data schema
-
Optional bucketing specification
HadoopFsRelation
initializes the internal registries and counters.