BaseSessionStateBuilder — Generic Builder of SessionState
BaseSessionStateBuilder — Generic Builder of SessionState
BaseSessionStateBuilder is the contract of builder objects that coordinate construction of a new SessionState.
| BaseSessionStateBuilder | Description |
|---|---|
BaseSessionStateBuilder is created when SparkSession is requested for a SessionState.
|
1 2 3 4 5 6 7 8 9 |
scala> :type spark org.apache.spark.sql.SparkSession scala> :type spark.sessionState org.apache.spark.sql.internal.SessionState |
BaseSessionStateBuilder requires that implementations define newBuilder method that SparkSession uses (indirectly) when requested for the SessionState (per spark.sql.catalogImplementation internal configuration property).
|
1 2 3 4 5 |
newBuilder: (SparkSession, Option[SessionState]) => BaseSessionStateBuilder |
|
Note
|
BaseSessionStateBuilder and spark.sql.catalogImplementation configuration property allow for Hive and non-Hive Spark deployments.
|
BaseSessionStateBuilder holds properties that (together with newBuilder) are used to create a SessionState.
| Name | Description | ||
|---|---|---|---|
|
analyzer |
|||
|
|
Used to create Analyzer, Optimizer and a SessionState itself
|
||
|
|
|||
|
|
Custom operator optimization rules to add to the base Operator Optimization batch. When requested for the custom rules, |
||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
SparkOptimizer (that is downcast to the base Optimizer) that is created with the SessionCatalog and the ExperimentalMethods. Note that the
|
||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|
Note
|
|
|
Note
|
BaseSessionStateBuilder is an experimental and unstable API.
|
Creating Function to Build SessionState — createClone Method
|
1 2 3 4 5 |
createClone: (SparkSession, SessionState) => SessionState |
createClone gives a function of SparkSession and SessionState that executes newBuilder followed by build.
|
Note
|
createClone is used exclusively when BaseSessionStateBuilder is requested for a SessionState
|
Creating SessionState Instance — build Method
|
1 2 3 4 5 |
build(): SessionState |
build creates a SessionState with the following:
|
Note
|
|
Creating BaseSessionStateBuilder Instance
BaseSessionStateBuilder takes the following when created:
-
Optional SessionState
Getting Function to Create QueryExecution For LogicalPlan — createQueryExecution Method
|
1 2 3 4 5 |
createQueryExecution: LogicalPlan => QueryExecution |
createQueryExecution simply returns a function that takes a LogicalPlan and creates a QueryExecution with the SparkSession and the logical plan.
|
Note
|
createQueryExecution is used exclusively when BaseSessionStateBuilder is requested to create a SessionState instance.
|
SessionState
SessionState — State Separation Layer Between SparkSessions
SessionState is the state separation layer between Spark SQL sessions, including SQL configuration, tables, functions, UDFs, SQL parser, and everything else that depends on a SQLConf.
SessionState is available as the sessionState property of a SparkSession.
|
1 2 3 4 5 6 7 8 9 |
scala> :type spark org.apache.spark.sql.SparkSession scala> :type spark.sessionState org.apache.spark.sql.internal.SessionState |
SessionState is created when SparkSession is requested to instantiateSessionState (when requested for the SessionState per spark.sql.catalogImplementation configuration property).
|
Note
|
When requested for the SessionState, There are two
|
| Name | Type | Description |
|---|---|---|
|
|
Initialized lazily (i.e. only when requested the first time) using the analyzerBuilder factory function. Used when…FIXME |
|
|
|
Metastore of tables and databases Used when…FIXME |
|
|
|
Used when…FIXME |
|
|
|
Used when…FIXME |
|
|
|
Used when…FIXME |
|
|
|
|
Used when…FIXME |
|
|
Used when…FIXME |
|
|
|
Used exclusively when |
|
|
|
|
Used when…FIXME |
|
|
Used when…FIXME |
|
|
|
|
Used to manage streaming queries in Spark Structured Streaming |
|
|
Interface to register user-defined functions. Used when…FIXME |
|
Note
|
SessionState is a private[sql] class and, given the package org.apache.spark.sql.internal, SessionState should be considered internal.
|
Creating SessionState Instance
SessionState takes the following when created:
-
catalogBuilderfunction to create a SessionCatalog (i.e.() ⇒ SessionCatalog) -
analyzerBuilderfunction to create an Analyzer (i.e.() ⇒ Analyzer) -
optimizerBuilderfunction to create an Optimizer (i.e.() ⇒ Optimizer) -
resourceLoaderBuilderfunction to create aSessionResourceLoader(i.e.() ⇒ SessionResourceLoader) -
createQueryExecutionfunction to create a QueryExecution given a LogicalPlan (i.e.LogicalPlan ⇒ QueryExecution) -
createClonefunction to clone theSessionStategiven a SparkSession (i.e.(SparkSession, SessionState) ⇒ SessionState)
apply Factory Methods
|
Caution
|
FIXME |
|
1 2 3 4 5 6 |
apply(sparkSession: SparkSession): SessionState (1) apply(sparkSession: SparkSession, sqlConf: SQLConf): SessionState |
-
Passes
sparkSessionto the otherapplywith a newSQLConf
|
Note
|
apply is used when SparkSession is requested for SessionState.
|
createAnalyzer Internal Method
|
1 2 3 4 5 6 7 8 |
createAnalyzer( sparkSession: SparkSession, catalog: SessionCatalog, sqlConf: SQLConf): Analyzer |
createAnalyzer creates a logical query plan Analyzer with rules specific to a non-Hive SessionState.
| Method | Rules | Description |
|---|---|---|
|
extendedResolutionRules |
FindDataSourceTable |
Replaces InsertIntoTable (with |
|
ResolveSQLOnFile |
||
|
postHocResolutionRules |
||
|
PreprocessTableInsertion |
||
|
extendedCheckRules |
PreWriteCheck |
|
|
HiveOnlyCheck |
“Executing” Logical Plan (Creating QueryExecution For LogicalPlan) — executePlan Method
|
1 2 3 4 5 |
executePlan(plan: LogicalPlan): QueryExecution |
executePlan simply executes the createQueryExecution function on the input logical plan (that simply creates a QueryExecution with the current SparkSession and the input logical plan).
Creating New Hadoop Configuration — newHadoopConf Method
|
1 2 3 4 5 6 |
newHadoopConf(): Configuration newHadoopConf(hadoopConf: Configuration, sqlConf: SQLConf): Configuration |
newHadoopConf returns a Hadoop Configuration (with the SparkContext.hadoopConfiguration and all the configuration properties of the SQLConf).
|
Note
|
newHadoopConf is used by ScriptTransformation, ParquetRelation, StateStoreRDD, and SessionState itself, and few other places.
|
Creating New Hadoop Configuration With Extra Options — newHadoopConfWithOptions Method
|
1 2 3 4 5 |
newHadoopConfWithOptions(options: Map[String, String]): Configuration |
newHadoopConfWithOptions creates a new Hadoop Configuration with the input options set (except path and paths options that are skipped).
|
Note
|
|
HiveMetastoreCatalog — Legacy SessionCatalog for Converting Hive Metastore Relations to Data Source Relations
HiveMetastoreCatalog — Legacy SessionCatalog for Converting Hive Metastore Relations to Data Source Relations
HiveMetastoreCatalog is a legacy session-scoped catalog of relational entities that HiveSessionCatalog uses exclusively for converting Hive metastore relations to data source relations (when RelationConversions logical evaluation rule is executed).
HiveMetastoreCatalog is created exclusively when HiveSessionStateBuilder is requested for SessionCatalog (and creates a HiveSessionCatalog).
HiveMetastoreCatalog takes a SparkSession when created.
Converting HiveTableRelation to LogicalRelation — convertToLogicalRelation Method
|
1 2 3 4 5 6 7 8 9 |
convertToLogicalRelation( relation: HiveTableRelation, options: Map[String, String], fileFormatClass: Class[_ <: FileFormat], fileType: String): LogicalRelation |
convertToLogicalRelation…FIXME
|
Note
|
convertToLogicalRelation is used exclusively when RelationConversions logical evaluation rule is requested to convert a HiveTableRelation to a LogicalRelation for parquet, native and hive ORC storage formats.
|
inferIfNeeded Internal Method
|
1 2 3 4 5 6 7 8 9 |
inferIfNeeded( relation: HiveTableRelation, options: Map[String, String], fileFormat: FileFormat, fileIndexOpt: Option[FileIndex] = None): CatalogTable |
inferIfNeeded…FIXME
|
Note
|
inferIfNeeded is used exclusively when HiveMetastoreCatalog is requested to convertToLogicalRelation.
|
HiveSessionCatalog — Hive-Specific Catalog of Relational Entities
HiveSessionCatalog — Hive-Specific Catalog of Relational Entities
HiveSessionCatalog is a session-scoped catalog of relational entities that is used when SparkSession was created with Hive support enabled.
HiveSessionCatalog is available as catalog property of SessionState when SparkSession was created with Hive support enabled (that in the end sets spark.sql.catalogImplementation internal configuration property to hive).
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import org.apache.spark.sql.internal.StaticSQLConf val catalogType = spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION.key) scala> println(catalogType) hive // You could also use the property key by name scala> spark.conf.get("spark.sql.catalogImplementation") res1: String = hive // Since Hive is enabled HiveSessionCatalog is the implementation scala> spark.sessionState.catalog res2: org.apache.spark.sql.catalyst.catalog.SessionCatalog = org.apache.spark.sql.hive.HiveSessionCatalog@1ae3d0a8 |
HiveSessionCatalog is created exclusively when HiveSessionStateBuilder is requested for the SessionCatalog.
HiveSessionCatalog uses the legacy HiveMetastoreCatalog (which is another session-scoped catalog of relational entities) exclusively to allow RelationConversions logical evaluation rule to convert Hive metastore relations to data source relations when executed.
Creating HiveSessionCatalog Instance
HiveSessionCatalog takes the following when created:
-
Legacy HiveMetastoreCatalog
-
Hadoop Configuration
BucketSpec — Bucketing Specification of Table
BucketSpec — Bucketing Specification of Table
BucketSpec is the bucketing specification of a table, i.e. the metadata of the bucketing of a table.
BucketSpec includes the following:
The number of buckets has to be between 0 and 100000 exclusive (or an AnalysisException is thrown).
BucketSpec is created when:
-
DataFrameWriteris requested to saveAsTable (and does getBucketSpec) -
HiveExternalCatalogis requested to getBucketSpecFromTableProperties and tableMetaToTableProps -
HiveClientImplis requested to retrieve a table metadata -
SparkSqlAstBuilderis requested to visitBucketSpec (forCREATE TABLESQL statement withCLUSTERED BYandINTO n BUCKETSwith optionalSORTED BYclauses)
BucketSpec uses the following text representation (i.e. toString):
|
1 2 3 4 5 |
[numBuckets] buckets, bucket columns: [[bucketColumnNames]], sort columns: [[sortColumnNames]] |
|
1 2 3 4 5 6 7 8 9 10 11 |
import org.apache.spark.sql.catalyst.catalog.BucketSpec val bucketSpec = BucketSpec( numBuckets = 8, bucketColumnNames = Seq("col1"), sortColumnNames = Seq("col2")) scala> println(bucketSpec) 8 buckets, bucket columns: [col1], sort columns: [col2] |
Converting Bucketing Specification to LinkedHashMap — toLinkedHashMap Method
|
1 2 3 4 5 |
toLinkedHashMap: mutable.LinkedHashMap[String, String] |
toLinkedHashMap converts the bucketing specification to a collection of pairs (LinkedHashMap[String, String]) with the following fields and their values:
-
Num Buckets with the numBuckets
-
Bucket Columns with the bucketColumnNames
-
Sort Columns with the sortColumnNames
toLinkedHashMap quotes the column names.
|
1 2 3 4 5 6 |
scala> println(bucketSpec.toLinkedHashMap) Map(Num Buckets -> 8, Bucket Columns -> [`col1`], Sort Columns -> [`col2`]) |
|
Note
|
|
CatalogTablePartition — Partition Specification of Table
CatalogTablePartition — Partition Specification of Table
CatalogTablePartition is the partition specification of a table, i.e. the metadata of the partitions of a table.
CatalogTablePartition is created when:
-
HiveClientImplis requested to retrieve a table partition metadata -
AlterTableAddPartitionCommandandAlterTableRecoverPartitionsCommandlogical commands are executed
CatalogTablePartition can hold the table statistics that…FIXME
The readable text representation of a CatalogTablePartition (aka simpleString) is…FIXME
|
Note
|
simpleString is used exclusively when ShowTablesCommand is executed (with a partition specification).
|
CatalogTablePartition uses the following text representation (i.e. toString)…FIXME
Creating CatalogTablePartition Instance
CatalogTablePartition takes the following when created:
-
Table statistics (default:
None)
Converting Partition Specification to LinkedHashMap — toLinkedHashMap Method
|
1 2 3 4 5 |
toLinkedHashMap: mutable.LinkedHashMap[String, String] |
toLinkedHashMap converts the partition specification to a collection of pairs (LinkedHashMap[String, String]) with the following fields and their values:
-
Partition Values with the spec
-
Storage specification (of the given CatalogStorageFormat)
-
Partition Parameters with the parameters (if not empty)
-
Partition Statistics with the CatalogStatistics (if available)
|
Note
|
|
location Method
|
1 2 3 4 5 |
location: URI |
location simply returns the location URI of the CatalogStorageFormat or throws an AnalysisException:
|
1 2 3 4 5 |
Partition [[specString]] did not specify locationUri |
|
Note
|
location is used when…FIXME
|
CatalogStorageFormat — Storage Specification of Table or Partition
CatalogStorageFormat — Storage Specification of Table or Partition
CatalogStorageFormat is the storage specification of a partition or a table, i.e. the metadata that includes the following:
CatalogStorageFormat is created when:
-
HiveClientImplis requested for metadata of a table or table partition -
SparkSqlAstBuilderis requested to parse Hive-specific CREATE TABLE or INSERT OVERWRITE DIRECTORY SQL statements
CatalogStorageFormat uses the following text representation (i.e. toString)…FIXME
Converting Storage Specification to LinkedHashMap — toLinkedHashMap Method
|
1 2 3 4 5 |
toLinkedHashMap: mutable.LinkedHashMap[String, String] |
toLinkedHashMap…FIXME
|
Note
|
|
CatalogTable — Table Specification (Native Table Metadata)
CatalogTable — Table Specification (Native Table Metadata)
CatalogTable is the table specification, i.e. the metadata of a table that is stored in a session-scoped catalog of relational entities (i.e. SessionCatalog).
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
scala> :type spark.sessionState.catalog org.apache.spark.sql.catalyst.catalog.SessionCatalog // Using high-level user-friendly catalog interface scala> spark.catalog.listTables.filter($"name" === "t1").show +----+--------+-----------+---------+-----------+ |name|database|description|tableType|isTemporary| +----+--------+-----------+---------+-----------+ | t1| default| null| MANAGED| false| +----+--------+-----------+---------+-----------+ // Using low-level internal SessionCatalog interface to access CatalogTables val t1Tid = spark.sessionState.sqlParser.parseTableIdentifier("t1") val t1Metadata = spark.sessionState.catalog.getTempViewOrPermanentTableMetadata(t1Tid) scala> :type t1Metadata org.apache.spark.sql.catalyst.catalog.CatalogTable |
CatalogTable is created when:
-
SessionCatalogis requested for a table metadata -
HiveClientImplis requested for looking up a table in a metastore -
DataFrameWriteris requested to create a table -
InsertIntoHiveDirCommandlogical command is executed -
SparkSqlAstBuilderdoes visitCreateTable and visitCreateHiveTable -
CreateTableLikeCommandlogical command is executed -
CreateViewCommandlogical command is executed (and prepareTable) -
CatalogImplis requested to createTable
The readable text representation of a CatalogTable (aka simpleString) is…FIXME
|
Note
|
simpleString is used exclusively when ShowTablesCommand logical command is executed (with a partition specification).
|
CatalogTable uses the following text representation (i.e. toString)…FIXME
CatalogTable is created with the optional bucketing specification that is used for the following:
-
CatalogImplis requested to list the columns of a table -
FindDataSourceTablelogical evaluation rule is requested to readDataSourceTable (when executed for data source tables) -
CreateTableLikeCommandlogical command is executed -
DescribeTableCommandlogical command is requested to describe detailed partition and storage information (when executed) -
ShowCreateTableCommand logical command is executed
-
CreateDataSourceTableCommand and CreateDataSourceTableAsSelectCommand logical commands are executed
-
CatalogTableis requested to convert itself to LinkedHashMap -
HiveExternalCatalogis requested to doCreateTable, tableMetaToTableProps, doAlterTable, restoreHiveSerdeTable and restoreDataSourceTable -
HiveClientImplis requested to retrieve a table metadata if available and toHiveTable -
InsertIntoHiveTablelogical command is processInsert (when executed) -
DataFrameWriteris requested to create a table (via saveAsTable) -
SparkSqlAstBuilderis requested to visitCreateTable and visitCreateHiveTable
Table Statistics for Query Planning (Auto Broadcast Joins and Cost-Based Optimization)
You manage a table metadata using the catalog interface (aka metastore). Among the management tasks is to get the statistics of a table (that are used for cost-based query optimization).
|
1 2 3 4 5 6 7 8 9 |
scala> t1Metadata.stats.foreach(println) CatalogStatistics(714,Some(2),Map(p1 -> ColumnStat(2,Some(0),Some(1),0,4,4,None), id -> ColumnStat(2,Some(0),Some(1),0,4,4,None))) scala> t1Metadata.stats.map(_.simpleString).foreach(println) 714 bytes, 2 rows |
|
Note
|
The CatalogStatistics are optional when CatalogTable is created.
|
|
Caution
|
FIXME When are stats specified? What if there are not? |
Unless CatalogStatistics are available in a table metadata (in a catalog) for a non-streaming file data source table, DataSource creates a HadoopFsRelation with the table size specified by spark.sql.defaultSizeInBytes internal property (default: Long.MaxValue) for query planning of joins (and possibly to auto broadcast the table).
Internally, Spark alters table statistics using ExternalCatalog.doAlterTableStats.
Unless CatalogStatistics are available in a table metadata (in a catalog) for HiveTableRelation (and hive provider) DetermineTableStats logical resolution rule can compute the table size using HDFS (if spark.sql.statistics.fallBackToHdfs property is turned on) or assume spark.sql.defaultSizeInBytes (that effectively disables table broadcasting).
When requested to look up a table in a metastore, HiveClientImpl reads table or partition statistics directly from a Hive metastore.
You can use AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand commands to record statistics in a catalog.
The table statistics can be automatically updated (after executing commands like AlterTableAddPartitionCommand) when spark.sql.statistics.size.autoUpdate.enabled property is turned on.
You can use DESCRIBE SQL command to show the histogram of a column if stored in a catalog.
dataSchema Method
|
1 2 3 4 5 |
dataSchema: StructType |
dataSchema…FIXME
|
Note
|
dataSchema is used when…FIXME
|
partitionSchema Method
|
1 2 3 4 5 |
partitionSchema: StructType |
partitionSchema…FIXME
|
Note
|
partitionSchema is used when…FIXME
|
Converting Table Specification to LinkedHashMap — toLinkedHashMap Method
|
1 2 3 4 5 |
toLinkedHashMap: mutable.LinkedHashMap[String, String] |
toLinkedHashMap converts the table specification to a collection of pairs (LinkedHashMap[String, String]) with the following fields and their values:
-
Database with the database of the TableIdentifier
-
Table with the table of the TableIdentifier
-
Owner with the owner (if defined)
-
Created Time with the createTime
-
Created By with
Sparkand the createVersion -
Type with the name of the CatalogTableType
-
Provider with the provider (if defined)
-
Bucket specification (of the BucketSpec if defined)
-
Comment with the comment (if defined)
-
View Text, View Default Database and View Query Output Columns for VIEW table type
-
Table Properties with the tableProperties (if not empty)
-
Statistics with the CatalogStatistics (if defined)
-
Storage specification (of the CatalogStorageFormat if defined)
-
Partition Provider with Catalog if the tracksPartitionsInCatalog flag is on
-
Partition Columns with the partitionColumns (if not empty)
-
Schema with the schema (if not empty)
|
Note
|
|
Creating CatalogTable Instance
CatalogTable takes the following when created:
-
Optional Bucketing specification (default:
None) -
Optional table statistics
database Method
|
1 2 3 4 5 |
database: String |
database simply returns the database (of the TableIdentifier) or throws an AnalysisException:
|
1 2 3 4 5 |
table [identifier] did not specify database |
|
Note
|
database is used when…FIXME
|
SessionCatalog — Session-Scoped Catalog of Relational Entities
SessionCatalog — Session-Scoped Catalog of Relational Entities
SessionCatalog is the catalog (registry) of relational entities, i.e. databases, tables, views, partitions, and functions (in a SparkSession).
SessionCatalog uses the ExternalCatalog for the metadata of permanent entities (i.e. tables).
|
Note
|
SessionCatalog is a layer over ExternalCatalog in a SparkSession which allows for different metastores (i.e. in-memory or hive) to be used.
|
SessionCatalog is available through SessionState (of a SparkSession).
|
1 2 3 4 5 6 7 8 9 |
scala> :type spark org.apache.spark.sql.SparkSession scala> :type spark.sessionState.catalog org.apache.spark.sql.catalyst.catalog.SessionCatalog |
SessionCatalog is created when BaseSessionStateBuilder is requested for the SessionCatalog (when SessionState is requested for it).
Amongst the notable usages of SessionCatalog is to create an Analyzer or a SparkOptimizer.
| Name | Description |
|---|---|
|
|
Used when…FIXME |
|
|
A cache of fully-qualified table names to table relation plans (i.e. Used when |
|
|
Registry of temporary views (i.e. non-global temporary tables) |
requireTableExists Internal Method
|
1 2 3 4 5 |
requireTableExists(name: TableIdentifier): Unit |
requireTableExists…FIXME
|
Note
|
requireTableExists is used when…FIXME
|
databaseExists Method
|
1 2 3 4 5 |
databaseExists(db: String): Boolean |
databaseExists…FIXME
|
Note
|
databaseExists is used when…FIXME
|
listTables Method
|
1 2 3 4 5 6 |
listTables(db: String): Seq[TableIdentifier] (1) listTables(db: String, pattern: String): Seq[TableIdentifier] |
-
Uses
"*"as the pattern
listTables…FIXME
|
Note
|
|
Checking Whether Table Is Temporary View — isTemporaryTable Method
|
1 2 3 4 5 |
isTemporaryTable(name: TableIdentifier): Boolean |
isTemporaryTable…FIXME
|
Note
|
isTemporaryTable is used when…FIXME
|
alterPartitions Method
|
1 2 3 4 5 |
alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit |
alterPartitions…FIXME
|
Note
|
alterPartitions is used when…FIXME
|
listPartitions Method
|
1 2 3 4 5 6 7 |
listPartitions( tableName: TableIdentifier, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] |
listPartitions…FIXME
|
Note
|
listPartitions is used when…FIXME
|
alterTable Method
|
1 2 3 4 5 |
alterTable(tableDefinition: CatalogTable): Unit |
alterTable…FIXME
|
Note
|
alterTable is used when AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AlterTableChangeColumnCommand, AlterTableSerDePropertiesCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterViewAsCommand (for permanent views) logical commands are executed.
|
Altering Table Statistics in Metastore (and Invalidating Internal Cache) — alterTableStats Method
|
1 2 3 4 5 |
alterTableStats(identifier: TableIdentifier, newStats: Option[CatalogStatistics]): Unit |
alterTableStats requests ExternalCatalog to alter the statistics of the table (per identifier) followed by invalidating the table relation cache.
alterTableStats reports a NoSuchDatabaseException if the database does not exist.
alterTableStats reports a NoSuchTableException if the table does not exist.
|
Note
|
|
tableExists Method
|
1 2 3 4 5 |
tableExists(name: TableIdentifier): Boolean |
tableExists…FIXME
|
Note
|
tableExists is used when…FIXME
|
functionExists Method
|
1 2 3 4 5 |
functionExists(name: FunctionIdentifier): Boolean |
functionExists…FIXME
|
Note
|
|
listFunctions Method
|
1 2 3 4 5 6 |
listFunctions(db: String): Seq[(FunctionIdentifier, String)] listFunctions(db: String, pattern: String): Seq[(FunctionIdentifier, String)] |
listFunctions…FIXME
|
Note
|
listFunctions is used when…FIXME
|
Invalidating Table Relation Cache (aka Refreshing Table) — refreshTable Method
|
1 2 3 4 5 |
refreshTable(name: TableIdentifier): Unit |
refreshTable…FIXME
|
Note
|
refreshTable is used when…FIXME
|
loadFunctionResources Method
|
1 2 3 4 5 |
loadFunctionResources(resources: Seq[FunctionResource]): Unit |
loadFunctionResources…FIXME
|
Note
|
loadFunctionResources is used when…FIXME
|
Altering (Updating) Temporary View (Logical Plan) — alterTempViewDefinition Method
|
1 2 3 4 5 |
alterTempViewDefinition(name: TableIdentifier, viewDefinition: LogicalPlan): Boolean |
alterTempViewDefinition alters the temporary view by updating an in-memory temporary table (when a database is not specified and the table has already been registered) or a global temporary table (when a database is specified and it is for global temporary tables).
|
Note
|
“Temporary table” and “temporary view” are synonyms. |
alterTempViewDefinition returns true when an update could be executed and finished successfully.
|
Note
|
alterTempViewDefinition is used exclusively when AlterViewAsCommand logical command is executed.
|
Creating (Registering) Or Replacing Local Temporary View — createTempView Method
|
1 2 3 4 5 6 7 8 |
createTempView( name: String, tableDefinition: LogicalPlan, overrideIfExists: Boolean): Unit |
createTempView…FIXME
|
Note
|
createTempView is used when…FIXME
|
Creating (Registering) Or Replacing Global Temporary View — createGlobalTempView Method
|
1 2 3 4 5 6 7 8 |
createGlobalTempView( name: String, viewDefinition: LogicalPlan, overrideIfExists: Boolean): Unit |
createGlobalTempView simply requests the GlobalTempViewManager to create a global temporary view.
|
Note
|
|
createTable Method
|
1 2 3 4 5 |
createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit |
createTable…FIXME
|
Note
|
createTable is used when…FIXME
|
Creating SessionCatalog Instance
SessionCatalog takes the following when created:
-
Hadoop’s Configuration
SessionCatalog initializes the internal registries and counters.
Finding Function by Name (Using FunctionRegistry) — lookupFunction Method
|
1 2 3 4 5 6 7 |
lookupFunction( name: FunctionIdentifier, children: Seq[Expression]): Expression |
lookupFunction finds a function by name.
For a function with no database defined that exists in FunctionRegistry, lookupFunction requests FunctionRegistry to find the function (by its unqualified name, i.e. with no database).
If the name function has the database defined or does not exist in FunctionRegistry, lookupFunction uses the fully-qualified function name to check if the function exists in FunctionRegistry (by its fully-qualified name, i.e. with a database).
For other cases, lookupFunction requests ExternalCatalog to find the function and loads its resources. It then creates a corresponding temporary function and looks up the function again.
|
Note
|
|
Finding Relation (Table or View) in Catalogs — lookupRelation Method
|
1 2 3 4 5 |
lookupRelation(name: TableIdentifier): LogicalPlan |
lookupRelation finds the name table in the catalogs (i.e. GlobalTempViewManager, ExternalCatalog or registry of temporary views) and gives a SubqueryAlias per table type.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
scala> :type spark.sessionState.catalog org.apache.spark.sql.catalyst.catalog.SessionCatalog import spark.sessionState.{catalog => c} import org.apache.spark.sql.catalyst.TableIdentifier // Global temp view val db = spark.sharedState.globalTempViewManager.database // Make the example reproducible (and so "replace") spark.range(1).createOrReplaceGlobalTempView("gv1") val gv1 = TableIdentifier(table = "gv1", database = Some(db)) val plan = c.lookupRelation(gv1) scala> println(plan.numberedTreeString) 00 SubqueryAlias gv1 01 +- Range (0, 1, step=1, splits=Some(8)) val metastore = spark.sharedState.externalCatalog // Regular table val db = spark.catalog.currentDatabase metastore.dropTable(db, table = "t1", ignoreIfNotExists = true, purge = true) sql("CREATE TABLE t1 (id LONG) USING parquet") val t1 = TableIdentifier(table = "t1", database = Some(db)) val plan = c.lookupRelation(t1) scala> println(plan.numberedTreeString) 00 'SubqueryAlias t1 01 +- 'UnresolvedCatalogRelation `default`.`t1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe // Regular view (not temporary view!) // Make the example reproducible metastore.dropTable(db, table = "v1", ignoreIfNotExists = true, purge = true) import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} val v1 = TableIdentifier(table = "v1", database = Some(db)) import org.apache.spark.sql.types.StructType val schema = new StructType().add($"id".long) val storage = CatalogStorageFormat(locationUri = None, inputFormat = None, outputFormat = None, serde = None, compressed = false, properties = Map()) val tableDef = CatalogTable( identifier = v1, tableType = CatalogTableType.VIEW, storage, schema, viewText = Some("SELECT 1") /** Required or RuntimeException reported */) metastore.createTable(tableDef, ignoreIfExists = false) val plan = c.lookupRelation(v1) scala> println(plan.numberedTreeString) 00 'SubqueryAlias v1 01 +- View (`default`.`v1`, [id#77L]) 02 +- 'Project [unresolvedalias(1, None)] 03 +- OneRowRelation // Temporary view spark.range(1).createOrReplaceTempView("v2") val v2 = TableIdentifier(table = "v2", database = None) val plan = c.lookupRelation(v2) scala> println(plan.numberedTreeString) 00 SubqueryAlias v2 01 +- Range (0, 1, step=1, splits=Some(8)) |
Internally, lookupRelation looks up the name table using:
-
GlobalTempViewManager when the database name of the table matches the name of
GlobalTempViewManager-
Gives
SubqueryAliasor reports aNoSuchTableException
-
-
ExternalCatalog when the database name of the table is specified explicitly or the registry of temporary views does not contain the table
-
Gives
SubqueryAliaswithViewwhen the table is a view (aka temporary table) -
Gives
SubqueryAliaswithUnresolvedCatalogRelationotherwise
-
-
The registry of temporary views
-
Gives
SubqueryAliaswith the logical plan per the table as registered in the registry of temporary views
-
|
Note
|
lookupRelation considers default to be the name of the database if the name table does not specify the database explicitly.
|
|
Note
|
|
Retrieving Table Metadata from External Catalog (Metastore) — getTableMetadata Method
|
1 2 3 4 5 |
getTableMetadata(name: TableIdentifier): CatalogTable |
getTableMetadata simply requests external catalog (metastore) for the table metadata.
Retrieving Table Metadata — getTempViewOrPermanentTableMetadata Method
|
1 2 3 4 5 |
getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable |
Internally, getTempViewOrPermanentTableMetadata branches off per database.
When a database name is not specified, getTempViewOrPermanentTableMetadata finds a local temporary view and creates a CatalogTable (with VIEW table type and an undefined storage) or retrieves the table metadata from an external catalog.
With the database name of the GlobalTempViewManager, getTempViewOrPermanentTableMetadata requests GlobalTempViewManager for the global view definition and creates a CatalogTable (with the name of GlobalTempViewManager in table identifier, VIEW table type and an undefined storage) or reports a NoSuchTableException.
With the database name not of GlobalTempViewManager, getTempViewOrPermanentTableMetadata simply retrieves the table metadata from an external catalog.
|
Note
|
|
Reporting NoSuchDatabaseException When Specified Database Does Not Exist — requireDbExists Internal Method
|
1 2 3 4 5 |
requireDbExists(db: String): Unit |
requireDbExists reports a NoSuchDatabaseException if the specified database does not exist. Otherwise, requireDbExists does nothing.
reset Method
|
1 2 3 4 5 |
reset(): Unit |
reset…FIXME
|
Note
|
reset is used exclusively in the Spark SQL internal tests.
|
Dropping Global Temporary View — dropGlobalTempView Method
|
1 2 3 4 5 |
dropGlobalTempView(name: String): Boolean |
dropGlobalTempView simply requests the GlobalTempViewManager to remove the name global temporary view.
|
Note
|
dropGlobalTempView is used when…FIXME
|
Dropping Table — dropTable Method
|
1 2 3 4 5 6 7 8 |
dropTable( name: TableIdentifier, ignoreIfNotExists: Boolean, purge: Boolean): Unit |
dropTable…FIXME
|
Note
|
|
Getting Global Temporary View (Definition) — getGlobalTempView Method
|
1 2 3 4 5 |
getGlobalTempView(name: String): Option[LogicalPlan] |
getGlobalTempView…FIXME
|
Note
|
getGlobalTempView is used when…FIXME
|
registerFunction Method
|
1 2 3 4 5 6 7 8 |
registerFunction( funcDefinition: CatalogFunction, overrideIfExists: Boolean, functionBuilder: Option[FunctionBuilder] = None): Unit |
registerFunction…FIXME
|
Note
|
|
spark技术分享