SessionCatalog — Session-Scoped Catalog of Relational Entities
SessionCatalog is the catalog (registry) of relational entities, i.e. databases, tables, views, partitions, and functions (in a SparkSession).
SessionCatalog uses the ExternalCatalog for the metadata of permanent entities (i.e. tables).
|
Note
|
SessionCatalog is a layer over ExternalCatalog in a SparkSession which allows for different metastores (i.e. in-memory or hive) to be used.
|
SessionCatalog is available through SessionState (of a SparkSession).
|
1 2 3 4 5 6 7 8 9 |
scala> :type spark org.apache.spark.sql.SparkSession scala> :type spark.sessionState.catalog org.apache.spark.sql.catalyst.catalog.SessionCatalog |
SessionCatalog is created when BaseSessionStateBuilder is requested for the SessionCatalog (when SessionState is requested for it).
Amongst the notable usages of SessionCatalog is to create an Analyzer or a SparkOptimizer.
| Name | Description |
|---|---|
|
|
Used when…FIXME |
|
|
A cache of fully-qualified table names to table relation plans (i.e. Used when |
|
|
Registry of temporary views (i.e. non-global temporary tables) |
requireTableExists Internal Method
|
1 2 3 4 5 |
requireTableExists(name: TableIdentifier): Unit |
requireTableExists…FIXME
|
Note
|
requireTableExists is used when…FIXME
|
databaseExists Method
|
1 2 3 4 5 |
databaseExists(db: String): Boolean |
databaseExists…FIXME
|
Note
|
databaseExists is used when…FIXME
|
listTables Method
|
1 2 3 4 5 6 |
listTables(db: String): Seq[TableIdentifier] (1) listTables(db: String, pattern: String): Seq[TableIdentifier] |
-
Uses
"*"as the pattern
listTables…FIXME
|
Note
|
|
Checking Whether Table Is Temporary View — isTemporaryTable Method
|
1 2 3 4 5 |
isTemporaryTable(name: TableIdentifier): Boolean |
isTemporaryTable…FIXME
|
Note
|
isTemporaryTable is used when…FIXME
|
alterPartitions Method
|
1 2 3 4 5 |
alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit |
alterPartitions…FIXME
|
Note
|
alterPartitions is used when…FIXME
|
listPartitions Method
|
1 2 3 4 5 6 7 |
listPartitions( tableName: TableIdentifier, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] |
listPartitions…FIXME
|
Note
|
listPartitions is used when…FIXME
|
alterTable Method
|
1 2 3 4 5 |
alterTable(tableDefinition: CatalogTable): Unit |
alterTable…FIXME
|
Note
|
alterTable is used when AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AlterTableChangeColumnCommand, AlterTableSerDePropertiesCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterViewAsCommand (for permanent views) logical commands are executed.
|
Altering Table Statistics in Metastore (and Invalidating Internal Cache) — alterTableStats Method
|
1 2 3 4 5 |
alterTableStats(identifier: TableIdentifier, newStats: Option[CatalogStatistics]): Unit |
alterTableStats requests ExternalCatalog to alter the statistics of the table (per identifier) followed by invalidating the table relation cache.
alterTableStats reports a NoSuchDatabaseException if the database does not exist.
alterTableStats reports a NoSuchTableException if the table does not exist.
|
Note
|
|
tableExists Method
|
1 2 3 4 5 |
tableExists(name: TableIdentifier): Boolean |
tableExists…FIXME
|
Note
|
tableExists is used when…FIXME
|
functionExists Method
|
1 2 3 4 5 |
functionExists(name: FunctionIdentifier): Boolean |
functionExists…FIXME
|
Note
|
|
listFunctions Method
|
1 2 3 4 5 6 |
listFunctions(db: String): Seq[(FunctionIdentifier, String)] listFunctions(db: String, pattern: String): Seq[(FunctionIdentifier, String)] |
listFunctions…FIXME
|
Note
|
listFunctions is used when…FIXME
|
Invalidating Table Relation Cache (aka Refreshing Table) — refreshTable Method
|
1 2 3 4 5 |
refreshTable(name: TableIdentifier): Unit |
refreshTable…FIXME
|
Note
|
refreshTable is used when…FIXME
|
loadFunctionResources Method
|
1 2 3 4 5 |
loadFunctionResources(resources: Seq[FunctionResource]): Unit |
loadFunctionResources…FIXME
|
Note
|
loadFunctionResources is used when…FIXME
|
Altering (Updating) Temporary View (Logical Plan) — alterTempViewDefinition Method
|
1 2 3 4 5 |
alterTempViewDefinition(name: TableIdentifier, viewDefinition: LogicalPlan): Boolean |
alterTempViewDefinition alters the temporary view by updating an in-memory temporary table (when a database is not specified and the table has already been registered) or a global temporary table (when a database is specified and it is for global temporary tables).
|
Note
|
“Temporary table” and “temporary view” are synonyms. |
alterTempViewDefinition returns true when an update could be executed and finished successfully.
|
Note
|
alterTempViewDefinition is used exclusively when AlterViewAsCommand logical command is executed.
|
Creating (Registering) Or Replacing Local Temporary View — createTempView Method
|
1 2 3 4 5 6 7 8 |
createTempView( name: String, tableDefinition: LogicalPlan, overrideIfExists: Boolean): Unit |
createTempView…FIXME
|
Note
|
createTempView is used when…FIXME
|
Creating (Registering) Or Replacing Global Temporary View — createGlobalTempView Method
|
1 2 3 4 5 6 7 8 |
createGlobalTempView( name: String, viewDefinition: LogicalPlan, overrideIfExists: Boolean): Unit |
createGlobalTempView simply requests the GlobalTempViewManager to create a global temporary view.
|
Note
|
|
createTable Method
|
1 2 3 4 5 |
createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit |
createTable…FIXME
|
Note
|
createTable is used when…FIXME
|
Creating SessionCatalog Instance
SessionCatalog takes the following when created:
-
Hadoop’s Configuration
SessionCatalog initializes the internal registries and counters.
Finding Function by Name (Using FunctionRegistry) — lookupFunction Method
|
1 2 3 4 5 6 7 |
lookupFunction( name: FunctionIdentifier, children: Seq[Expression]): Expression |
lookupFunction finds a function by name.
For a function with no database defined that exists in FunctionRegistry, lookupFunction requests FunctionRegistry to find the function (by its unqualified name, i.e. with no database).
If the name function has the database defined or does not exist in FunctionRegistry, lookupFunction uses the fully-qualified function name to check if the function exists in FunctionRegistry (by its fully-qualified name, i.e. with a database).
For other cases, lookupFunction requests ExternalCatalog to find the function and loads its resources. It then creates a corresponding temporary function and looks up the function again.
|
Note
|
|
Finding Relation (Table or View) in Catalogs — lookupRelation Method
|
1 2 3 4 5 |
lookupRelation(name: TableIdentifier): LogicalPlan |
lookupRelation finds the name table in the catalogs (i.e. GlobalTempViewManager, ExternalCatalog or registry of temporary views) and gives a SubqueryAlias per table type.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
scala> :type spark.sessionState.catalog org.apache.spark.sql.catalyst.catalog.SessionCatalog import spark.sessionState.{catalog => c} import org.apache.spark.sql.catalyst.TableIdentifier // Global temp view val db = spark.sharedState.globalTempViewManager.database // Make the example reproducible (and so "replace") spark.range(1).createOrReplaceGlobalTempView("gv1") val gv1 = TableIdentifier(table = "gv1", database = Some(db)) val plan = c.lookupRelation(gv1) scala> println(plan.numberedTreeString) 00 SubqueryAlias gv1 01 +- Range (0, 1, step=1, splits=Some(8)) val metastore = spark.sharedState.externalCatalog // Regular table val db = spark.catalog.currentDatabase metastore.dropTable(db, table = "t1", ignoreIfNotExists = true, purge = true) sql("CREATE TABLE t1 (id LONG) USING parquet") val t1 = TableIdentifier(table = "t1", database = Some(db)) val plan = c.lookupRelation(t1) scala> println(plan.numberedTreeString) 00 'SubqueryAlias t1 01 +- 'UnresolvedCatalogRelation `default`.`t1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe // Regular view (not temporary view!) // Make the example reproducible metastore.dropTable(db, table = "v1", ignoreIfNotExists = true, purge = true) import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} val v1 = TableIdentifier(table = "v1", database = Some(db)) import org.apache.spark.sql.types.StructType val schema = new StructType().add($"id".long) val storage = CatalogStorageFormat(locationUri = None, inputFormat = None, outputFormat = None, serde = None, compressed = false, properties = Map()) val tableDef = CatalogTable( identifier = v1, tableType = CatalogTableType.VIEW, storage, schema, viewText = Some("SELECT 1") /** Required or RuntimeException reported */) metastore.createTable(tableDef, ignoreIfExists = false) val plan = c.lookupRelation(v1) scala> println(plan.numberedTreeString) 00 'SubqueryAlias v1 01 +- View (`default`.`v1`, [id#77L]) 02 +- 'Project [unresolvedalias(1, None)] 03 +- OneRowRelation // Temporary view spark.range(1).createOrReplaceTempView("v2") val v2 = TableIdentifier(table = "v2", database = None) val plan = c.lookupRelation(v2) scala> println(plan.numberedTreeString) 00 SubqueryAlias v2 01 +- Range (0, 1, step=1, splits=Some(8)) |
Internally, lookupRelation looks up the name table using:
-
GlobalTempViewManager when the database name of the table matches the name of
GlobalTempViewManager-
Gives
SubqueryAliasor reports aNoSuchTableException
-
-
ExternalCatalog when the database name of the table is specified explicitly or the registry of temporary views does not contain the table
-
Gives
SubqueryAliaswithViewwhen the table is a view (aka temporary table) -
Gives
SubqueryAliaswithUnresolvedCatalogRelationotherwise
-
-
The registry of temporary views
-
Gives
SubqueryAliaswith the logical plan per the table as registered in the registry of temporary views
-
|
Note
|
lookupRelation considers default to be the name of the database if the name table does not specify the database explicitly.
|
|
Note
|
|
Retrieving Table Metadata from External Catalog (Metastore) — getTableMetadata Method
|
1 2 3 4 5 |
getTableMetadata(name: TableIdentifier): CatalogTable |
getTableMetadata simply requests external catalog (metastore) for the table metadata.
Retrieving Table Metadata — getTempViewOrPermanentTableMetadata Method
|
1 2 3 4 5 |
getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable |
Internally, getTempViewOrPermanentTableMetadata branches off per database.
When a database name is not specified, getTempViewOrPermanentTableMetadata finds a local temporary view and creates a CatalogTable (with VIEW table type and an undefined storage) or retrieves the table metadata from an external catalog.
With the database name of the GlobalTempViewManager, getTempViewOrPermanentTableMetadata requests GlobalTempViewManager for the global view definition and creates a CatalogTable (with the name of GlobalTempViewManager in table identifier, VIEW table type and an undefined storage) or reports a NoSuchTableException.
With the database name not of GlobalTempViewManager, getTempViewOrPermanentTableMetadata simply retrieves the table metadata from an external catalog.
|
Note
|
|
Reporting NoSuchDatabaseException When Specified Database Does Not Exist — requireDbExists Internal Method
|
1 2 3 4 5 |
requireDbExists(db: String): Unit |
requireDbExists reports a NoSuchDatabaseException if the specified database does not exist. Otherwise, requireDbExists does nothing.
reset Method
|
1 2 3 4 5 |
reset(): Unit |
reset…FIXME
|
Note
|
reset is used exclusively in the Spark SQL internal tests.
|
Dropping Global Temporary View — dropGlobalTempView Method
|
1 2 3 4 5 |
dropGlobalTempView(name: String): Boolean |
dropGlobalTempView simply requests the GlobalTempViewManager to remove the name global temporary view.
|
Note
|
dropGlobalTempView is used when…FIXME
|
Dropping Table — dropTable Method
|
1 2 3 4 5 6 7 8 |
dropTable( name: TableIdentifier, ignoreIfNotExists: Boolean, purge: Boolean): Unit |
dropTable…FIXME
|
Note
|
|
Getting Global Temporary View (Definition) — getGlobalTempView Method
|
1 2 3 4 5 |
getGlobalTempView(name: String): Option[LogicalPlan] |
getGlobalTempView…FIXME
|
Note
|
getGlobalTempView is used when…FIXME
|
registerFunction Method
|
1 2 3 4 5 6 7 8 |
registerFunction( funcDefinition: CatalogFunction, overrideIfExists: Boolean, functionBuilder: Option[FunctionBuilder] = None): Unit |
registerFunction…FIXME
|
Note
|
|
spark技术分享