关注 spark技术分享,
撸spark源码 玩spark最佳实践

GlobalTempViewManager — Management Interface of Global Temporary Views

admin阅读(1418)

GlobalTempViewManager — Management Interface of Global Temporary Views

GlobalTempViewManager is the interface to manage global temporary views (that SessionCatalog uses when requested to create, alter or drop global temporary views).

Strictly speaking, GlobalTempViewManager simply manages the names of the global temporary views registered (and the corresponding logical plans) and has no interaction with other services in Spark SQL.

GlobalTempViewManager is available as globalTempViewManager property of a SharedState.

spark sql GlobalTempViewManager.png
Figure 1. GlobalTempViewManager and SparkSession

Table 1. GlobalTempViewManager API
Method Description

clear

create

get

listViewNames

remove

rename

update

GlobalTempViewManager is created exclusively when SharedState is requested for one (for the very first time only as it is cached).

GlobalTempViewManager takes the name of the database when created.

spark sql GlobalTempViewManager creating instance.png
Figure 2. Creating GlobalTempViewManager
Table 2. GlobalTempViewManager’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

viewDefinitions

Registry of global temporary view definitions as logical plans per view name.

clear Method

clear simply removes all the entries in the viewDefinitions internal registry.

Note
clear is used when SessionCatalog is requested to reset (that happens to be exclusively in the Spark SQL internal tests).

Creating (Registering) Global Temporary View (Definition) — create Method

create simply registers (adds) the input LogicalPlan under the input name.

create throws an AnalysisException when the input overrideIfExists flag is off and the viewDefinitions internal registry contains the input name.

Note
create is used when SessionCatalog is requested to createGlobalTempView (when CreateViewCommand and CreateTempViewUsing logical commands are executed).

Retrieving Global View Definition Per Name — get Method

get simply returns the LogicalPlan that was registered under the name if it defined.

Note
get is used when SessionCatalog is requested to getGlobalTempView, getTempViewOrPermanentTableMetadata, lookupRelation, isTemporaryTable or refreshTable.

Listing Global Temporary Views For Pattern — listViewNames Method

listViewNames simply gives a list of the global temporary views with names matching the input pattern.

Note
listViewNames is used exclusively when SessionCatalog is requested to listTables

Removing (De-Registering) Global Temporary View — remove Method

remove simply tries to remove the name from the viewDefinitions internal registry and returns true when removed or false otherwise.

Note
remove is used when SessionCatalog is requested to drop a global temporary view or table.

rename Method

rename…​FIXME

Note
rename is used when…​FIXME

update Method

update…​FIXME

Note
update is used exclusively when SessionCatalog is requested to alter a global temporary view.

FunctionRegistry — Contract for Function Registries (Catalogs)

admin阅读(1673)

FunctionRegistry — Contract for Function Registries (Catalogs)

FunctionRegistry is the contract of function registries (catalogs) of native and user-defined functions.

Table 1. FunctionRegistry Contract
Property Description

clear

Used exclusively when SessionCatalog is requested to reset

dropFunction

Used when…​FIXME

listFunction

Used when…​FIXME

lookupFunction

Used when:

lookupFunctionBuilder

Used when…​FIXME

registerFunction

Used when:

Note
The one and only FunctionRegistry available in Spark SQL is SimpleFunctionRegistry.

FunctionRegistry is available through functionRegistry property of a SessionState (that is available as sessionState property of a SparkSession).

Note
You can register a new user-defined function using UDFRegistration.
Table 2. FunctionRegistry’s Attributes
Name Description

builtin

SimpleFunctionRegistry with the built-in functions registered.

FunctionRegistry manages function expression registry of Catalyst expressions and the corresponding built-in/native SQL functions (that can be used in SQL statements).

Table 3. (Subset of) FunctionRegistry’s Catalyst Expression to SQL Function Mapping
Catalyst Expression SQL Function

CumeDist

cume_dist

IfNull

ifnull

Left

left

MonotonicallyIncreasingID

monotonically_increasing_id

NullIf

nullif

Nvl

nvl

Nvl2

nvl2

ParseToDate

to_date

ParseToTimestamp

to_timestamp

Right

right

CreateNamedStruct

struct

expression Internal Method

expression…​FIXME

Note
expression is used when…​FIXME

SimpleFunctionRegistry

SimpleFunctionRegistry is the default FunctionRegistry that is backed by a hash map (with optional case sensitivity).

createOrReplaceTempFunction Final Method

createOrReplaceTempFunction…​FIXME

Note
createOrReplaceTempFunction is used exclusively when UDFRegistration is requested to register an user-defined function, user-defined aggregate function, user-defined function (as UserDefinedFunction) or registerPython.

functionExists Method

functionExists…​FIXME

Note
functionExists is used when…​FIXME

HiveExternalCatalog — Hive-Aware Metastore of Permanent Relational Entities

admin阅读(1441)

HiveExternalCatalog — Hive-Aware Metastore of Permanent Relational Entities

HiveExternalCatalog is a external catalog of permanent relational entities (aka metastore) that is used when SparkSession was created with Hive support enabled.

spark sql HiveExternalCatalog.png
Figure 1. HiveExternalCatalog and SharedState

HiveExternalCatalog is created exclusively when SharedState is requested for the ExternalCatalog for the first time (and spark.sql.catalogImplementation internal configuration property is hive).

Note
The Hadoop configuration to create a HiveExternalCatalog is the default Hadoop configuration from Spark Core’s SparkContext.hadoopConfiguration with the Spark properties with spark.hadoop prefix.

HiveExternalCatalog uses the internal HiveClient to retrieve metadata from a Hive metastore.

Note

spark.sql.catalogImplementation configuration property is in-memory by default.

Use Builder.enableHiveSupport to enable Hive support (that sets spark.sql.catalogImplementation internal configuration property to hive when the Hive classes are available).

Tip

Use spark.sql.warehouse.dir Spark property to change the location of Hive’s hive.metastore.warehouse.dir property, i.e. the location of the Hive local/embedded metastore database (using Derby).

Refer to SharedState to learn about (the low-level details of) Spark SQL support for Apache Hive.

See also the official Hive Metastore Administration document.

Table 1. HiveExternalCatalog’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

client

HiveClient for retrieving metadata from a Hive metastore

Created by requesting HiveUtils for a new HiveClientImpl (with the current SparkConf and Hadoop Configuration)

getRawTable Method

getRawTable…​FIXME

Note
getRawTable is used when…​FIXME

doAlterTableStats Method

Note
doAlterTableStats is part of ExternalCatalog Contract to alter the statistics of a table.

doAlterTableStats…​FIXME

Converting Table Statistics to Properties — statsToProperties Internal Method

statsToProperties converts the table statistics to properties (i.e. key-value pairs that will be persisted as properties in the table metadata to a Hive metastore using the Hive client).

statsToProperties adds the following properties to the properties:

statsToProperties takes the column statistics and for every column (field) in schema converts the column statistics to properties and adds the properties (as column statistic property) to the properties.

Note

statsToProperties is used when HiveExternalCatalog is requested for:

Restoring Table Statistics from Properties (from Hive Metastore) — statsFromProperties Internal Method

statsFromProperties collects statistics-related properties, i.e. the properties with their keys with spark.sql.statistics prefix.

statsFromProperties returns None if there are no keys with the spark.sql.statistics prefix in properties.

If there are keys with spark.sql.statistics prefix, statsFromProperties creates a ColumnStat that is the column statistics for every column in schema.

For every column name in schema statsFromProperties collects all the keys that start with spark.sql.statistics.colStats.[name] prefix (after having checked that the key spark.sql.statistics.colStats.[name].version exists that is a marker that the column statistics exist in the statistics properties) and converts them to a ColumnStat (for the column name).

In the end, statsFromProperties creates a CatalogStatistics with the following properties:

  • sizeInBytes as spark.sql.statistics.totalSize property

  • rowCount as spark.sql.statistics.numRows property

  • colStats as the collection of the column names and their ColumnStat (calculated above)

Note
statsFromProperties is used when HiveExternalCatalog is requested for restoring table and partition metadata.

listPartitionsByFilter Method

Note
listPartitionsByFilter is part of ExternalCatalog Contract to…​FIXME.

listPartitionsByFilter…​FIXME

alterPartitions Method

Note
alterPartitions is part of ExternalCatalog Contract to…​FIXME.

alterPartitions…​FIXME

getTable Method

Note
getTable is part of ExternalCatalog Contract to…​FIXME.

getTable…​FIXME

doAlterTable Method

Note
doAlterTable is part of ExternalCatalog Contract to alter a table.

doAlterTable…​FIXME

restorePartitionMetadata Internal Method

restorePartitionMetadata…​FIXME

Note

restorePartitionMetadata is used when HiveExternalCatalog is requested for:

getPartition Method

Note
getPartition is part of ExternalCatalog Contract to…​FIXME.

getPartition…​FIXME

getPartitionOption Method

Note
getPartitionOption is part of ExternalCatalog Contract to…​FIXME.

getPartitionOption…​FIXME

Creating HiveExternalCatalog Instance

HiveExternalCatalog takes the following when created:

Building Property Name for Column and Statistic Key — columnStatKeyPropName Internal Method

columnStatKeyPropName builds a property name of the form spark.sql.statistics.colStats.[columnName].[statKey] for the input columnName and statKey.

Note
columnStatKeyPropName is used when HiveExternalCatalog is requested to statsToProperties and statsFromProperties.

getBucketSpecFromTableProperties Internal Method

getBucketSpecFromTableProperties…​FIXME

Note
getBucketSpecFromTableProperties is used when HiveExternalCatalog is requested to restoreHiveSerdeTable or restoreDataSourceTable.

Restoring Hive Serde Table — restoreHiveSerdeTable Internal Method

restoreHiveSerdeTable…​FIXME

Note
restoreHiveSerdeTable is used exclusively when HiveExternalCatalog is requested to restoreTableMetadata (when there is no provider specified in table properties, which means this is a Hive serde table).

Restoring Data Source Table — restoreDataSourceTable Internal Method

restoreDataSourceTable…​FIXME

Note
restoreDataSourceTable is used exclusively when HiveExternalCatalog is requested to restoreTableMetadata (for regular data source table with provider specified in table properties).

restoreTableMetadata Internal Method

restoreTableMetadata…​FIXME

Note

restoreTableMetadata is used when HiveExternalCatalog is requested for:

Retrieving CatalogTablePartition of Table — listPartitions Method

Note
listPartitions is part of the ExternalCatalog Contract to list partitions of a table.

listPartitions…​FIXME

doCreateTable Method

Note
doCreateTable is part of the ExternalCatalog Contract to…​FIXME.

doCreateTable…​FIXME

tableMetaToTableProps Internal Method

tableMetaToTableProps…​FIXME

Note
tableMetaToTableProps is used when HiveExternalCatalog is requested to doAlterTableDataSchema and doCreateTable (and createDataSourceTable).

doAlterTableDataSchema Method

Note
doAlterTableDataSchema is part of the ExternalCatalog Contract to…​FIXME.

doAlterTableDataSchema…​FIXME

createDataSourceTable Internal Method

createDataSourceTable…​FIXME

Note
createDataSourceTable is used exclusively when HiveExternalCatalog is requested to doCreateTable (for non-hive providers).

InMemoryCatalog

admin阅读(1571)

InMemoryCatalog

InMemoryCatalog is…​FIXME

listPartitionsByFilter Method

Note
listPartitionsByFilter is part of ExternalCatalog Contract to…​FIXME.

listPartitionsByFilter…​FIXME

ExternalCatalog Contract — External Catalog (Metastore) of Permanent Relational Entities

admin阅读(1498)

ExternalCatalog Contract — External Catalog (Metastore) of Permanent Relational Entities

ExternalCatalog is the contract of an external system catalog (aka metadata registry or metastore) of permanent relational entities, i.e. databases, tables, partitions, and functions.

Table 1. ExternalCatalog’s Features per Relational Entity
Feature Database Function Partition Table

Alter

alterDatabase

alterFunction

alterPartitions

alterTable, alterTableDataSchema, alterTableStats

Create

createDatabase

createFunction

createPartitions

createTable

Drop

dropDatabase

dropFunction

dropPartitions

dropTable

Get

getDatabase

getFunction

getPartition, getPartitionOption

getTable

List

listDatabases

listFunctions

listPartitionNames, listPartitions, listPartitionsByFilter

listTables

Load

loadDynamicPartitions, loadPartition

loadTable

Rename

renameFunction

renamePartitions

renameTable

Check Existence

databaseExists

functionExists

tableExists

Set

setCurrentDatabase

Table 2. ExternalCatalog Contract (incl. Protected Methods)
Method Description

alterPartitions

createPartitions

databaseExists

doAlterDatabase

doAlterFunction

doAlterTable

doAlterTableDataSchema

doAlterTableStats

doCreateDatabase

doCreateFunction

doCreateTable

doDropDatabase

doDropFunction

doDropTable

doRenameFunction

doRenameTable

dropPartitions

functionExists

getDatabase

getFunction

getPartition

getPartitionOption

getTable

listDatabases

listFunctions

listPartitionNames

listPartitions

listPartitionsByFilter

listTables

loadDynamicPartitions

loadPartition

loadTable

renamePartitions

setCurrentDatabase

tableExists

ExternalCatalog is available as externalCatalog of SharedState (in SparkSession).

ExternalCatalog is available as ephemeral in-memory or persistent hive-aware.

Table 3. ExternalCatalogs
ExternalCatalog Alias Description

HiveExternalCatalog

hive

A persistent system catalog using a Hive metastore.

InMemoryCatalog

in-memory

An in-memory (ephemeral) system catalog that does not require setting up external systems (like a Hive metastore).

It is intended for testing or exploration purposes only and therefore should not be used in production.

The concrete ExternalCatalog is chosen using Builder.enableHiveSupport that enables the Hive support (and sets spark.sql.catalogImplementation configuration property to hive when the Hive classes are available).

Tip

Set spark.sql.catalogImplementation to in-memory when starting spark-shell to use InMemoryCatalog external catalog.

Important

You cannot change ExternalCatalog after SparkSession has been created using spark.sql.catalogImplementation configuration property as it is a static configuration.

ExternalCatalog is a ListenerBus of ExternalCatalogEventListener listeners that handle ExternalCatalogEvent events.

Tip

Use addListener and removeListener to register and de-register ExternalCatalogEventListener listeners, accordingly.

Read ListenerBus Event Bus Contract in Mastering Apache Spark 2 gitbook to learn more about Spark Core’s ListenerBus interface.

Altering Table Statistics — alterTableStats Method

alterTableStats…​FIXME

Note
alterTableStats is used exclusively when SessionCatalog is requested for altering the statistics of a table in a metastore (that can happen when any logical command is executed that could change the table statistics).

Altering Table — alterTable Method

alterTable…​FIXME

Note
alterTable is used exclusively when SessionCatalog is requested for altering the statistics of a table in a metastore.

createTable Method

createTable…​FIXME

Note
createTable is used when…​FIXME

alterTableDataSchema Method

alterTableDataSchema…​FIXME

Note
alterTableDataSchema is used exclusively when SessionCatalog is requested to alterTableDataSchema.

ExperimentalMethods

admin阅读(1534)

ExperimentalMethods

ExperimentalMethods holds extra optimizations and strategies that are used in SparkOptimizer and SparkPlanner, respectively.

Table 1. ExperimentalMethods’ Attributes
Name Description

extraOptimizations

Collection of rules to optimize LogicalPlans (i.e. Rule[LogicalPlan] objects)

Used when SparkOptimizer is requested for the User Provided Optimizers

extraStrategies

Collection of SparkStrategies

Used when SessionState is requested for the SparkPlanner

ExperimentalMethods is available as the experimental property of a SparkSession.

Example

ExecutionListenerManager — Management Interface of QueryExecutionListeners

admin阅读(1586)

ExecutionListenerManager — Management Interface of QueryExecutionListeners

ExecutionListenerManager is the management interface for QueryExecutionListeners that listen for execution metrics:

  • Name of the action (that triggered a query execution)

  • QueryExecution

  • Execution time of this query (in nanoseconds)

ExecutionListenerManager is available as listenerManager property of SparkSession (and listenerManager property of SessionState).

ExecutionListenerManager takes a single SparkConf when created

While created, ExecutionListenerManager reads spark.sql.queryExecutionListeners configuration property with QueryExecutionListeners and registers them.

ExecutionListenerManager uses spark.sql.queryExecutionListeners configuration property as the list of QueryExecutionListeners that should be automatically added to newly created sessions (and registers them while being created).

Table 1. ExecutionListenerManager’s Public Methods
Method Description

register

unregister

clear

ExecutionListenerManager is created exclusively when BaseSessionStateBuilder is requested for ExecutionListenerManager (while SessionState is built).

ExecutionListenerManager uses listeners internal registry for registered QueryExecutionListeners.

onSuccess Internal Method

onSuccess…​FIXME

Note

onSuccess is used when:

onFailure Internal Method

onFailure…​FIXME

Note

onFailure is used when:

withErrorHandling Internal Method

withErrorHandling…​FIXME

Note
withErrorHandling is used when ExecutionListenerManager is requested to onSuccess and onFailure.

Registering QueryExecutionListener — register Method

Internally, register simply registers (adds) the input QueryExecutionListener to the listeners internal registry.

CatalogImpl

admin阅读(2641)

CatalogImpl

CatalogImpl is the Catalog in Spark SQL that…​FIXME

spark sql CatalogImpl.png
Figure 1. CatalogImpl uses SessionCatalog (through SparkSession)
Note
CatalogImpl is in org.apache.spark.sql.internal package.

Creating Table — createTable Method

Note
createTable is part of Catalog Contract to…​FIXME.

createTable…​FIXME

getTable Method

Note
getTable is part of Catalog Contract to…​FIXME.

getTable…​FIXME

functionExists Method

Caution
FIXME

Caching Table or View In-Memory — cacheTable Method

Internally, cacheTable first creates a DataFrame for the table followed by requesting CacheManager to cache it.

Note
cacheTable uses the session-scoped SharedState to access the CacheManager.
Note
cacheTable is part of Catalog contract.

Removing All Cached Tables From In-Memory Cache — clearCache Method

clearCache requests CacheManager to remove all cached tables from in-memory cache.

Note
clearCache is part of Catalog contract.

Creating External Table From Path — createExternalTable Method

createExternalTable creates an external table tableName from the given path and returns the corresponding DataFrame.

The source input parameter is the name of the data source provider for the table, e.g. parquet, json, text. If not specified, createExternalTable uses spark.sql.sources.default setting to know the data source format.

Note
source input parameter must not be hive as it leads to a AnalysisException.

createExternalTable sets the mandatory path option when specified explicitly in the input parameter list.

createExternalTable parses tableName into TableIdentifier (using SparkSqlParser). It creates a CatalogTable and then executes (by toRDD) a CreateTable logical plan. The result DataFrame is a Dataset[Row] with the QueryExecution after executing SubqueryAlias logical plan and RowEncoder.

spark sql CatalogImpl createExternalTable.png
Figure 2. CatalogImpl.createExternalTable
Note
createExternalTable is part of Catalog contract.

Listing Tables in Database (as Dataset) — listTables Method

Note
listTables is part of Catalog Contract to get a list of tables in the specified database.

Internally, listTables requests SessionCatalog to list all tables in the specified dbName database and converts them to Tables.

In the end, listTables creates a Dataset with the tables.

Listing Columns of Table (as Dataset) — listColumns Method

Note
listColumns is part of Catalog Contract to…​FIXME.

listColumns requests SessionCatalog for the table metadata.

listColumns takes the schema from the table metadata and creates a Column for every field (with the optional comment as the description).

In the end, listColumns creates a Dataset with the columns.

Converting TableIdentifier to Table — makeTable Internal Method

makeTable creates a Table using the input TableIdentifier and the table metadata (from the current SessionCatalog) if available.

Note
makeTable uses SparkSession to access SessionState that is then used to access SessionCatalog.
Note
makeTable is used when CatalogImpl is requested to listTables or getTable.

Creating Dataset from DefinedByConstructorParams Data — makeDataset Method

makeDataset creates an ExpressionEncoder (from DefinedByConstructorParams) and encodes elements of the input data to internal binary rows.

makeDataset then creates a LocalRelation logical operator. makeDataset requests SessionState to execute the plan and creates the result Dataset.

Note
makeDataset is used when CatalogImpl is requested to list databases, tables, functions and columns

Refreshing Analyzed Logical Plan of Table Query and Re-Caching It — refreshTable Method

Note
refreshTable is part of Catalog Contract to…​FIXME.

refreshTable requests SessionState for the SQL parser to parse a TableIdentifier given the table name.

Note
refreshTable uses SparkSession to access the SessionState.

refreshTable requests SessionCatalog for the table metadata.

For a temporary or persistent VIEW table, refreshTable requests the analyzed logical plan of the DataFrame (for the table) to refresh itself.

For other types of table, refreshTable requests SessionCatalog for refreshing the table metadata (i.e. invalidating the table).

If the table has been cached, refreshTable requests CacheManager to uncache and cache the table DataFrame again.

Note
refreshTable uses SparkSession to access the SharedState that is used to access CacheManager.

refreshByPath Method

Note
refreshByPath is part of Catalog Contract to…​FIXME.

refreshByPath…​FIXME

listColumns Internal Method

listColumns…​FIXME

Note
listColumns is used exclusively when CatalogImpl is requested to listColumns.

Catalog — Metastore Management Interface

admin阅读(1408)

Catalog — Metastore Management Interface

Catalog is the interface for managing a metastore (aka metadata catalog) of relational entities (e.g. database(s), tables, functions, table columns and temporary views).

Catalog is available using SparkSession.catalog property.

Table 1. Catalog Contract
Method Description

cacheTable

Caches the specified table in memory

Used for SQL’s CACHE TABLE and AlterTableRenameCommand command.

clearCache

createTable

currentDatabase

databaseExists

dropGlobalTempView

dropTempView

functionExists

getDatabase

getFunction

getTable

isCached

listColumns

listDatabases

listFunctions

listTables

recoverPartitions

refreshByPath

refreshTable

setCurrentDatabase

tableExists

uncacheTable

Note
CatalogImpl is the one and only known implementation of the Catalog Contract in Apache Spark.

Configuration Properties

admin阅读(2041)

Configuration Properties

Configuration properties (aka settings) allow you to fine-tune a Spark SQL application.

You can set a configuration property in a SparkSession while creating a new instance using config method.

  1. Sets spark.sql.warehouse.dir for the Spark SQL session

You can also set a property using SQL SET command.

Table 1. Spark SQL Configuration Properties
Name Default Description

spark.sql.adaptive.enabled

false

Use SQLConf.adaptiveExecutionEnabled method to access the current value.

spark.sql.allowMultipleContexts

true

Controls whether creating multiple SQLContexts/HiveContexts is allowed

spark.sql.autoBroadcastJoinThreshold

10L * 1024 * 1024 (10M)

Maximum size (in bytes) for a table that will be broadcast to all worker nodes when performing a join.

If the size of the statistics of the logical plan of a table is at most the setting, the DataFrame is broadcast for join.

Negative values or 0 disable broadcasting.

Use SQLConf.autoBroadcastJoinThreshold method to access the current value.

spark.sql.avro.compression.codec

snappy

The compression codec to use when writing Avro data to disk

The supported codecs are:

  • uncompressed

  • deflate

  • snappy

  • bzip2

  • xz

Use SQLConf.avroCompressionCodec method to access the current value.

spark.sql.broadcastTimeout

5 * 60

Timeout in seconds for the broadcast wait time in broadcast joins.

When negative, it is assumed infinite (i.e. Duration.Inf)

Use SQLConf.broadcastTimeout method to access the current value.

spark.sql.caseSensitive

false

(internal) Controls whether the query analyzer should be case sensitive (true) or not (false). It is highly discouraged to turn on case sensitive mode.

Use SQLConf.caseSensitiveAnalysis method to access the current value.

spark.sql.cbo.enabled

false

Enables cost-based optimization (CBO) for estimation of plan statistics when true.

Use SQLConf.cboEnabled method to access the current value.

spark.sql.cbo.joinReorder.enabled

false

Enables join reorder for cost-based optimization (CBO).

Use SQLConf.joinReorderEnabled method to access the current value.

spark.sql.cbo.starSchemaDetection

false

Enables join reordering based on star schema detection for cost-based optimization (CBO) in ReorderJoin logical plan optimization.

Use SQLConf.starSchemaDetection method to access the current value.

spark.sql.codegen.comments

false

Controls whether CodegenContext should register comments (true) or not (false).

spark.sql.codegen.factoryMode

FALLBACK

(internal) Determines the codegen generator fallback behavior

Acceptable values:

  • CODEGEN_ONLY – disable fallback mode

  • FALLBACK – try codegen first and, if any compile error happens, fallback to interpreted mode

  • NO_CODEGEN – skips codegen and always uses interpreted path

Used when CodeGeneratorWithInterpretedFallback is requested to createObject (when UnsafeProjection is requested to create an UnsafeProjection for Catalyst expressions)

spark.sql.codegen.fallback

true

(internal) Whether the whole stage codegen could be temporary disabled for the part of a query that has failed to compile generated code (true) or not (false).

Use SQLConf.wholeStageFallback method to access the current value.

spark.sql.codegen.hugeMethodLimit

65535

(internal) The maximum bytecode size of a single compiled Java function generated by whole-stage codegen.

The default value 65535 is the largest bytecode size possible for a valid Java method. When running on HotSpot, it may be preferable to set the value to 8000 (which is the value of HugeMethodLimit in the OpenJDK JVM settings)

Use SQLConf.hugeMethodLimit method to access the current value.

spark.sql.codegen.useIdInClassName

true

(internal) Controls whether to embed the (whole-stage) codegen stage ID into the class name of the generated class as a suffix (true) or not (false)

Use SQLConf.wholeStageUseIdInClassName method to access the current value.

spark.sql.codegen.maxFields

100

(internal) Maximum number of output fields (including nested fields) that whole-stage codegen supports. Going above the number deactivates whole-stage codegen.

Use SQLConf.wholeStageMaxNumFields method to access the current value.

spark.sql.codegen.splitConsumeFuncByOperator

true

(internal) Controls whether whole stage codegen puts the logic of consuming rows of each physical operator into individual methods, instead of a single big method. This can be used to avoid oversized function that can miss the opportunity of JIT optimization.

Use SQLConf.wholeStageSplitConsumeFuncByOperator method to access the current value.

spark.sql.codegen.wholeStage

true

(internal) Whether the whole stage (of multiple physical operators) will be compiled into a single Java method (true) or not (false).

Use SQLConf.wholeStageEnabled method to access the current value.

spark.sql.columnVector.offheap.enabled

false

(internal) Enables OffHeapColumnVector in ColumnarBatch (true) or not (false). When disabled, OnHeapColumnVector is used instead.

Use SQLConf.offHeapColumnVectorEnabled method to access the current value.

spark.sql.columnNameOfCorruptRecord

spark.sql.defaultSizeInBytes

Java’s Long.MaxValue

(internal) Estimated size of a table or relation used in query planning

Set to Java’s Long.MaxValue which is larger than spark.sql.autoBroadcastJoinThreshold to be more conservative. That is to say by default the optimizer will not choose to broadcast a table unless it knows for sure that the table size is small enough.

Used by the planner to decide when it is safe to broadcast a relation. By default, the system will assume that tables are too large to broadcast.

Use SQLConf.defaultSizeInBytes method to access the current value.

spark.sql.dialect

spark.sql.exchange.reuse

true

(internal) When enabled (i.e. true), the Spark planner will find duplicated exchanges and subqueries and re-use them.

Note
When disabled (i.e. false), ReuseSubquery and ReuseExchange physical optimizations (that the Spark planner uses for physical query plan optimization) do nothing.

Use SQLConf.exchangeReuseEnabled method to access the current value.

spark.sql.execution.useObjectHashAggregateExec

true

Enables ObjectHashAggregateExec when Aggregation execution planning strategy is executed.

Use SQLConf.useObjectHashAggregation method to access the current value.

spark.sql.files.ignoreCorruptFiles

false

Controls whether to ignore corrupt files (true) or not (false). If true, the Spark jobs will continue to run when encountering corrupted files and the contents that have been read will still be returned.

Use SQLConf.ignoreCorruptFiles method to access the current value.

spark.sql.files.ignoreMissingFiles

false

Controls whether to ignore missing files (true) or not (false). If true, the Spark jobs will continue to run when encountering missing files and the contents that have been read will still be returned.

Use SQLConf.ignoreMissingFiles method to access the current value.

spark.sql.hive.convertMetastoreOrc

true

(internal) When enabled (i.e. true), the built-in ORC reader and writer are used to process ORC tables created by using the HiveQL syntax (instead of Hive serde).

spark.sql.hive.convertMetastoreParquet

true

Controls whether to use the built-in Parquet reader and writer to process parquet tables created by using the HiveQL syntax (instead of Hive serde).

spark.sql.hive.convertMetastoreParquet.mergeSchema

false

Enables trying to merge possibly different but compatible Parquet schemas in different Parquet data files.

This configuration is only effective when spark.sql.hive.convertMetastoreParquet is enabled.

spark.sql.hive.manageFilesourcePartitions

true

Enables metastore partition management for file source tables. This includes both datasource and converted Hive tables.

When enabled (true), datasource tables store partition in the Hive metastore, and use the metastore to prune partitions during query planning.

Use SQLConf.manageFilesourcePartitions method to access the current value.

spark.sql.hive.metastore.barrierPrefixes

(empty)

Comma-separated list of class prefixes that should explicitly be reloaded for each version of Hive that Spark SQL is communicating with, e.g. Hive UDFs that are declared in a prefix that typically would be shared (i.e. org.apache.spark.*)

spark.sql.hive.metastore.jars

builtin

Location of the jars that should be used to create a HiveClientImpl.

Supported locations:

  1. builtin – the jars that were used to load Spark SQL (aka Spark classes). Valid only when using the execution version of Hive, i.e. spark.sql.hive.metastore.version

  2. maven – download the Hive jars from Maven repositories

  3. Classpath in the standard format for both Hive and Hadoop

spark.sql.hive.metastore.sharedPrefixes

"com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc"

Comma-separated list of class prefixes that should be loaded using the classloader that is shared between Spark SQL and a specific version of Hive.

An example of classes that should be shared are:

  • JDBC drivers that are needed to talk to the metastore

  • Other classes that interact with classes that are already shared, e.g. custom appenders that are used by log4j

spark.sql.hive.metastore.version

1.2.1

Version of the Hive metastore (and the client classes and jars).

Supported versions range from 0.12.0 up to and including 2.3.2.

spark.sql.inMemoryColumnarStorage.batchSize

10000

(internal) Controls…​FIXME

Use SQLConf.columnBatchSize method to access the current value.

spark.sql.inMemoryColumnarStorage.compressed

true

(internal) Controls…​FIXME

Use SQLConf.useCompression method to access the current value.

spark.sql.inMemoryColumnarStorage.enableVectorizedReader

true

Enables vectorized reader for columnar caching.

Use SQLConf.cacheVectorizedReaderEnabled method to access the current value.

spark.sql.inMemoryColumnarStorage.partitionPruning

true

(internal) Enables partition pruning for in-memory columnar tables

Use SQLConf.inMemoryPartitionPruning method to access the current value.

spark.sql.join.preferSortMergeJoin

true

(internal) Controls whether JoinSelection execution planning strategy prefers sort merge join over shuffled hash join.

Use SQLConf.preferSortMergeJoin method to access the current value.

spark.sql.limit.scaleUpFactor

4

(internal) Minimal increase rate in the number of partitions between attempts when executing take operator on a structured query. Higher values lead to more partitions read. Lower values might lead to longer execution times as more jobs will be run.

Use SQLConf.limitScaleUpFactor method to access the current value.

spark.sql.optimizer.excludedRules

(empty)

Comma-separated list of optimization rule names that should be disabled (excluded) in the optimizer. The optimizer will log the rules that have indeed been excluded.

Note
It is not guaranteed that all the rules in this configuration will eventually be excluded, as some rules are necessary for correctness.

Use SQLConf.optimizerExcludedRules method to access the current value.

spark.sql.optimizer.inSetConversionThreshold

10

(internal) The threshold of set size for InSet conversion.

Use SQLConf.optimizerInSetConversionThreshold method to access the current value.

spark.sql.optimizer.maxIterations

100

Maximum number of iterations for Analyzer and Optimizer.

spark.sql.orc.impl

native

(internal) When native, use the native version of ORC support instead of the ORC library in Hive 1.2.1.

Acceptable values:

  • hive

  • native

spark.sql.parquet.binaryAsString

false

Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do not differentiate between binary data and strings when writing out the Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.

Use SQLConf.isParquetBinaryAsString method to access the current value.

spark.sql.parquet.int96AsTimestamp

true

Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.

Use SQLConf.isParquetINT96AsTimestamp method to access the current value.

spark.sql.parquet.enableVectorizedReader

true

Enables vectorized parquet decoding.

Use SQLConf.parquetVectorizedReaderEnabled method to access the current value.

spark.sql.parquet.filterPushdown

true

Controls the filter predicate push-down optimization for data sources using parquet file format

Use SQLConf.parquetFilterPushDown method to access the current value.

spark.sql.parquet.int96TimestampConversion

false

Controls whether timestamp adjustments should be applied to INT96 data when converting to timestamps, for data written by Impala. This is necessary because Impala stores INT96 data with a different timezone offset than Hive and Spark.

Use SQLConf.isParquetINT96TimestampConversion method to access the current value.

spark.sql.parquet.recordLevelFilter.enabled

false

Enables Parquet’s native record-level filtering using the pushed down filters.

Note
This configuration only has an effect when spark.sql.parquet.filterPushdown is enabled (and it is by default).

Use SQLConf.parquetRecordFilterEnabled method to access the current value.

spark.sql.parser.quotedRegexColumnNames

false

Controls whether quoted identifiers (using backticks) in SELECT statements should be interpreted as regular expressions.

Use SQLConf.supportQuotedRegexColumnName method to access the current value.

spark.sql.sort.enableRadixSort

true

(internal) Controls whether to use radix sort (true) or not (false) in ShuffleExchangeExec and SortExec physical operators

Radix sort is much faster but requires additional memory to be reserved up-front. The memory overhead may be significant when sorting very small rows (up to 50% more).

Use SQLConf.enableRadixSort method to access the current value.

spark.sql.sources.commitProtocolClass

SQLHadoopMapReduceCommitProtocol

(internal)

Use SQLConf.fileCommitProtocolClass method to access the current value.

spark.sql.sources.partitionOverwriteMode

static

When INSERT OVERWRITE a partitioned data source table with dynamic partition columns, Spark SQL supports two modes (case-insensitive):

  • static – Spark deletes all the partitions that match the partition specification (e.g. PARTITION(a=1,b)) in the INSERT statement, before overwriting

  • dynamic – Spark doesn’t delete partitions ahead, and only overwrites those partitions that have data written into it

The default (STATIC) is to keep the same behavior of Spark prior to 2.3. Note that this config doesn’t affect Hive serde tables, as they are always overwritten with dynamic mode.

Use SQLConf.partitionOverwriteMode method to access the current value.

spark.sql.pivotMaxValues

10000

Maximum number of (distinct) values that will be collected without error (when doing a pivot without specifying the values for the pivot column)

Use SQLConf.dataFramePivotMaxValues method to access the current value.

spark.sql.redaction.options.regex

(?i)secret!password

Regular expression to find options of a Spark SQL command with sensitive information

The values of the options matched will be redacted in the explain output.

This redaction is applied on top of the global redaction configuration defined by spark.redaction.regex configuration.

Used exclusively when SQLConf is requested to redactOptions.

spark.sql.redaction.string.regex

(undefined)

Regular expression to point at sensitive information in text output

When this regex matches a string part, that string part is replaced by a dummy value (i.e. ***(redacted)). This is currently used to redact the output of SQL explain commands.

Note
When this conf is not set, the value of spark.redaction.string.regex is used instead.

Use SQLConf.stringRedactionPattern method to access the current value.

spark.sql.retainGroupColumns

true

Controls whether to retain columns used for aggregation or not (in RelationalGroupedDataset operators).

Use SQLConf.dataFrameRetainGroupColumns method to access the current value.

spark.sql.runSQLOnFiles

true

(internal) Controls whether Spark SQL could use datasource.path as a table in a SQL query.

Use SQLConf.runSQLonFile method to access the current value.

spark.sql.selfJoinAutoResolveAmbiguity

true

Controls whether to resolve ambiguity in join conditions for self-joins automatically.

spark.sql.session.timeZone

Java’s TimeZone.getDefault.getID

The ID of session-local timezone, e.g. “GMT”, “America/Los_Angeles”, etc.

Use SQLConf.sessionLocalTimeZone method to access the current value.

spark.sql.shuffle.partitions

200

Number of partitions to use by default when shuffling data for joins or aggregations

Corresponds to Apache Hive’s mapred.reduce.tasks property that Spark considers deprecated.

Use SQLConf.numShufflePartitions method to access the current value.

spark.sql.sources.bucketing.enabled

true

Enables bucketing support. When disabled (i.e. false), bucketed tables are considered regular (non-bucketed) tables.

Use SQLConf.bucketingEnabled method to access the current value.

spark.sql.sources.default

parquet

Defines the default data source to use for DataFrameReader.

Used when:

spark.sql.statistics.fallBackToHdfs

false

Enables automatic calculation of table size statistic by falling back to HDFS if the table statistics are not available from table metadata.

This can be useful in determining if a table is small enough for auto broadcast joins in query planning.

Use SQLConf.fallBackToHdfsForStatsEnabled method to access the current value.

spark.sql.statistics.histogram.enabled

false

Enables generating histograms when computing column statistics

Note
Histograms can provide better estimation accuracy. Currently, Spark only supports equi-height histogram. Note that collecting histograms takes extra cost. For example, collecting column statistics usually takes only one table scan, but generating equi-height histogram will cause an extra table scan.

Use SQLConf.histogramEnabled method to access the current value.

spark.sql.statistics.histogram.numBins

254

(internal) The number of bins when generating histograms.

Note
The number of bins must be greater than 1.

Use SQLConf.histogramNumBins method to access the current value.

spark.sql.statistics.parallelFileListingInStatsComputation.enabled

true

(internal) Enables parallel file listing in SQL commands, e.g. ANALYZE TABLE (as opposed to single thread listing that can be particularly slow with tables with hundreds of partitions)

Use SQLConf.parallelFileListingInStatsComputation method to access the current value.

spark.sql.statistics.size.autoUpdate.enabled

false

Enables automatic update of the table size statistic of a table after the table has changed.

Important
If the total number of files of the table is very large this can be expensive and slow down data change commands.

Use SQLConf.autoSizeUpdateEnabled method to access the current value.

spark.sql.subexpressionElimination.enabled

true

(internal) Enables subexpression elimination

Use subexpressionEliminationEnabled method to access the current value.

spark.sql.TungstenAggregate.testFallbackStartsAt

(empty)

A comma-separated pair of numbers, e.g. 5,10, that HashAggregateExec uses to inform TungstenAggregationIterator to switch to a sort-based aggregation when the hash-based approach is unable to acquire enough memory.

spark.sql.ui.retainedExecutions

1000

The number of SQLExecutionUIData entries to keep in failedExecutions and completedExecutions internal registries.

When a query execution finishes, the execution is removed from the internal activeExecutions registry and stored in failedExecutions or completedExecutions given the end execution status. It is when SQLListener makes sure that the number of SQLExecutionUIData entires does not exceed spark.sql.ui.retainedExecutions Spark property and removes the excess of entries.

spark.sql.windowExec.buffer.in.memory.threshold

4096

(internal) Threshold for number of rows guaranteed to be held in memory by WindowExec physical operator.

Use windowExecBufferInMemoryThreshold method to access the current value.

spark.sql.windowExec.buffer.spill.threshold

4096

(internal) Threshold for number of rows buffered in a WindowExec physical operator.

Use windowExecBufferSpillThreshold method to access the current value.

关注公众号:spark技术分享

联系我们联系我们