关注 spark技术分享,
撸spark源码 玩spark最佳实践

Bucketing

admin阅读(1110)

Bucketing

Bucketing is an optimization technique that uses buckets (and bucketing columns) to determine data partitioning and avoid data shuffle.

The motivation is to optimize performance of a join query by avoiding shuffles (aka exchanges) of tables participating in the join. Bucketing results in fewer exchanges (and so stages).

Note
Bucketing can show the biggest benefit when pre-shuffled bucketed tables are used more than once as bucketing itself takes time (that you will offset executing multiple join queries later).

Bucketing is enabled by default. Spark SQL uses spark.sql.sources.bucketing.enabled configuration property to control whether bucketing should be enabled and used for query optimization or not.

Bucketing is used exclusively in FileSourceScanExec physical operator (when it is requested for the input RDD and to determine the partitioning and ordering of the output).

The above join query is a fine example of a SortMergeJoinExec (aka SortMergeJoin) of two FileSourceScanExecs (aka Scan). The join query uses ShuffleExchangeExec physical operators (aka Exchange) to shuffle the table datasets for the SortMergeJoin.

spark sql bucketing sortmergejoin filescans.png
Figure 1. SortMergeJoin of FileScans (Details for Query)

One way to avoid the exchanges (and so optimize the join query) is to use table bucketing that is applicable for all file-based data sources, e.g. Parquet, ORC, JSON, CSV, that are saved as a table using DataFrameWrite.saveAsTable or simply available in a catalog by SparkSession.table.

You use DataFrameWriter.bucketBy method to specify the number of buckets and the bucketing columns.

You can optionally sort the output rows in buckets using DataFrameWriter.sortBy method.

Note
DataFrameWriter.bucketBy and DataFrameWriter.sortBy simply set respective internal properties that eventually become a bucketing specification.

Unlike bucketing in Apache Hive, Spark SQL creates the bucket files per the number of buckets and partitions. In other words, the number of bucketing files is the number of buckets multiplied by the number of task writers (one per partition).

With bucketing, the Exchanges are no longer needed (as the tables are already pre-shuffled).

The above join query of the bucketed tables shows no ShuffleExchangeExec physical operators (aka Exchange) as the shuffling has already been executed (before the query was run).

spark sql bucketing sortmergejoin bucketed tables no exchanges.png
Figure 2. SortMergeJoin of Bucketed Tables (Details for Query)

The number of partitions of a bucketed table is exactly the number of buckets.

Use SessionCatalog or DESCRIBE EXTENDED SQL command to find the bucketing information.

The number of buckets has to be between 0 and 100000 exclusive or Spark SQL throws an AnalysisException:

There are however requirements that have to be met before Spark Optimizer gives a no-Exchange query plan:

  1. The number of partitions on both sides of a join has to be exactly the same.

  2. Both join operators have to use HashPartitioning partitioning scheme.

It is acceptable to use bucketing for one side of a join.

spark sql bucketing sortmergejoin one bucketed table.png
Figure 3. SortMergeJoin of One Bucketed Table (Details for Query)

Bucket Pruning — Optimizing Filtering on Bucketed Column (Reducing Bucket Files to Scan)

As of Spark 2.4, Spark SQL supports bucket pruning to optimize filtering on bucketed column (by reducing the number of bucket files to scan).

Bucket pruning supports the following predicate expressions:

  • EqualTo (=)

  • EqualNullSafe (<=>)

  • In

  • InSet

  • And and Or of the above

FileSourceStrategy execution planning strategy is responsible for selecting only LogicalRelations over HadoopFsRelation with the bucketing specification with the following:

  1. There is exactly one bucketing column

  2. The number of buckets is greater than 1

Sorting

Warning
There are two exchanges and sorts which makes the above use case almost unusable. I filed an issue at SPARK-24025 Join of bucketed and non-bucketed tables can give two exchanges and sorts for non-bucketed side.
spark sql bucketing sortmergejoin sorted dataset and bucketed table.png
Figure 4. SortMergeJoin of Sorted Dataset and Bucketed Table (Details for Query)

spark.sql.sources.bucketing.enabled Spark SQL Configuration Property

Bucketing is enabled when spark.sql.sources.bucketing.enabled configuration property is turned on (true) and it is by default.

Tip
Use SQLConf.bucketingEnabled to access the current value of spark.sql.sources.bucketing.enabled property.

Dynamic Partition Inserts

admin阅读(1570)

Dynamic Partition Inserts

Partitioning uses partitioning columns to divide a dataset into smaller chunks (based on the values of certain columns) that will be written into separate directories.

With a partitioned dataset, Spark SQL can load only the parts (partitions) that are really needed (and avoid doing filtering out unnecessary data on JVM). That leads to faster load time and more efficient memory consumption which gives a better performance overall.

With a partitioned dataset, Spark SQL can also be executed over different subsets (directories) in parallel at the same time.

Dynamic Partition Inserts is a feature of Spark SQL that allows for executing INSERT OVERWRITE TABLE SQL statements over partitioned HadoopFsRelations that limits what partitions are deleted to overwrite the partitioned table (and its partitions) with new data.

Dynamic partitions are the partition columns that have no values defined explicitly in the PARTITION clause of INSERT OVERWRITE TABLE SQL statements (in the partitionSpec part).

Static partitions are the partition columns that have values defined explicitly in the PARTITION clause of INSERT OVERWRITE TABLE SQL statements (in the partitionSpec part).

Note
INSERT OVERWRITE TABLE SQL statement is translated into InsertIntoTable logical operator.

Dynamic Partition Inserts is only supported in SQL mode (for INSERT OVERWRITE TABLE SQL statements).

Dynamic Partition Inserts is not supported for non-file-based data sources, i.e. InsertableRelations.

With Dynamic Partition Inserts, the behaviour of OVERWRITE keyword is controlled by spark.sql.sources.partitionOverwriteMode configuration property (default: static). The property controls whether Spark should delete all the partitions that match the partition specification regardless of whether there is data to be written to or not (static) or delete only those partitions that will have data written into (dynamic).

When the dynamic overwrite mode is enabled Spark will only delete the partitions for which it has data to be written to. All the other partitions remain intact.

Spark now writes data partitioned just as Hive would — which means only the partitions that are touched by the INSERT query get overwritten and the others are not touched.

UDFRegistration — Session-Scoped FunctionRegistry

admin阅读(1623)

UDFRegistration — Session-Scoped FunctionRegistry

UDFRegistration is an interface to the session-scoped FunctionRegistry to register user-defined functions (UDFs) and user-defined aggregate functions (UDAFs).

UDFRegistration is available using SparkSession.

UDFRegistration takes a FunctionRegistry when created.

UDFRegistration is created exclusively for SessionState.

Registering UserDefinedFunction (with FunctionRegistry) — register Method

register…​FIXME

Note
register is used when…​FIXME

Registering UserDefinedFunction (with FunctionRegistry) — register Method

register…​FIXME

Note
register is used when…​FIXME

Registering UserDefinedAggregateFunction (with FunctionRegistry) — register Method

register creates a ScalaUDAF internally to register a UDAF.

Note
register gives the input udaf aggregate function back after the function has been registered with FunctionRegistry.

CatalystConf

admin阅读(1458)

CatalystConf

CatalystConf is…​FIXME

Note
The default CatalystConf is SQLConf that is…​FIXME
Table 1. CatalystConf’s Internal Properties
Name Initial Value Description

caseSensitiveAnalysis

cboEnabled

Enables cost-based optimizations (CBO) for estimation of plan statistics when enabled.

Used in CostBasedJoinReorder logical plan optimization and Project, Filter, Join and Aggregate logical operators.

optimizerMaxIterations

spark.sql.optimizer.maxIterations

Maximum number of iterations for Analyzer and Optimizer.

sessionLocalTimeZone

resolver Method

resolver gives case-sensitive or case-insensitive Resolvers per caseSensitiveAnalysis setting.

Note
Resolver is a mere function of two String parameters that returns true if both refer to the same entity (i.e. for case insensitive equality).

StaticSQLConf — Cross-Session, Immutable and Static SQL Configuration

admin阅读(2136)

StaticSQLConf — Cross-Session, Immutable and Static SQL Configuration

Table 1. StaticSQLConf’s Configuration Properties
Name Default Value Scala Value Description

spark.sql.catalogImplementation

in-memory

CATALOG_IMPLEMENTATION

Selects the active catalog implementation from the available ExternalCatalogs:

Note
Use Builder.enableHiveSupport to enable Hive support (that sets spark.sql.catalogImplementation configuration property to hive when the Hive classes are available).

Used when:

  1. SparkSession is requested for the SessionState

  2. SharedState is requested for the ExternalCatalog

  3. SetCommand command is executed (with hive keys)

  4. SparkSession is created with Hive support

spark.sql.debug

false

DEBUG_MODE

(internal) Only used for internal debugging when HiveExternalCatalog is requested to restoreTableMetadata.

Not all functions are supported when enabled.

spark.sql.extensions

(empty)

SPARK_SESSION_EXTENSIONS

Name of the SQL extension configuration class that is used to configure SparkSession extensions (when Builder is requested to get or create a SparkSession). The class should implement Function1[SparkSessionExtensions, Unit], and must have a no-args constructor.

spark.sql.filesourceTableRelationCacheSize

1000

FILESOURCE_TABLE_RELATION_CACHE_SIZE

(internal) The maximum size of the cache that maps qualified table names to table relation plans. Must not be negative.

spark.sql.globalTempDatabase

global_temp

GLOBAL_TEMP_DATABASE

(internal) Name of the Spark-owned internal database of global temporary views

Used exclusively to create a GlobalTempViewManager when SharedState is first requested for the GlobalTempViewManager.

Note
The name of the internal database cannot conflict with the names of any database that is already available in ExternalCatalog.

spark.sql.hive.thriftServer.singleSession

false

HIVE_THRIFT_SERVER_SINGLESESSION

When set to true, Hive Thrift server is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database.

spark.sql.queryExecutionListeners

(empty)

QUERY_EXECUTION_LISTENERS

List of class names that implement QueryExecutionListener that will be automatically registered to new SparkSessions.

The classes should have either a no-arg constructor, or a constructor that expects a SparkConf argument.

spark.sql.sources.schemaStringLengthThreshold

4000

SCHEMA_STRING_LENGTH_THRESHOLD

(internal) The maximum length allowed in a single cell when storing additional schema information in Hive’s metastore

spark.sql.ui.retainedExecutions

1000

UI_RETAINED_EXECUTIONS

Number of executions to retain in the Spark UI.

spark.sql.warehouse.dir

spark-warehouse

WAREHOUSE_PATH

The directory of a Hive warehouse (using Derby) with managed databases and tables (aka Spark warehouse)

Tip
Read the official Hive Metastore Administration document to learn more.

The properties in StaticSQLConf can only be queried and can never be changed once the first SparkSession is created.

SQLConf — Internal Configuration Store

admin阅读(1713)

SQLConf — Internal Configuration Store

SQLConf is an internal key-value configuration store for parameters and hints used in Spark SQL.

Note

SQLConf is an internal part of Spark SQL and is not supposed to be used directly.

Spark SQL configuration is available through RuntimeConfig (the user-facing configuration management interface) that you can access using SparkSession.

You can access a SQLConf using:

  1. SQLConf.get (preferred) – the SQLConf of the current active SparkSession

  2. SessionState – direct access through SessionState of the SparkSession of your choice (that gives more flexibility on what SparkSession is used that can be different from the current active SparkSession)

SQLConf offers methods to get, set, unset or clear values of configuration properties, but has also the accessor methods to read the current value of a configuration property or hint.

Table 1. SQLConf’s Accessor Methods
Name Parameter Description

adaptiveExecutionEnabled

spark.sql.adaptive.enabled

Used exclusively when EnsureRequirements adds an ExchangeCoordinator (for adaptive query execution)

autoBroadcastJoinThreshold

spark.sql.autoBroadcastJoinThreshold

Used exclusively in JoinSelection execution planning strategy

autoSizeUpdateEnabled

spark.sql.statistics.size.autoUpdate.enabled

Used when:

avroCompressionCodec

spark.sql.avro.compression.codec

Used exclusively when AvroOptions is requested for the compression configuration property (and it was not set explicitly)

broadcastTimeout

spark.sql.broadcastTimeout

Used exclusively in BroadcastExchangeExec (for broadcasting a table to executors).

bucketingEnabled

spark.sql.sources.bucketing.enabled

Used when FileSourceScanExec is requested for the input RDD and to determine output partitioning and ordering

cacheVectorizedReaderEnabled

spark.sql.inMemoryColumnarStorage.enableVectorizedReader

Used exclusively when InMemoryTableScanExec physical operator is requested for supportsBatch flag.

caseSensitiveAnalysis

spark.sql.caseSensitive

cboEnabled

spark.sql.cbo.enabled

Used in:

columnBatchSize

spark.sql.inMemoryColumnarStorage.batchSize

Used when…​FIXME

dataFramePivotMaxValues

spark.sql.pivotMaxValues

Used exclusively in pivot operator.

dataFrameRetainGroupColumns

spark.sql.retainGroupColumns

Used exclusively in RelationalGroupedDataset when creating the result Dataset (after agg, count, mean, max, avg, min, and sum operators).

defaultSizeInBytes

spark.sql.defaultSizeInBytes

Used when:

enableRadixSort

spark.sql.sort.enableRadixSort

Used exclusively when SortExec physical operator is requested for a UnsafeExternalRowSorter.

exchangeReuseEnabled

spark.sql.exchange.reuse

Used when ReuseSubquery and ReuseExchange physical optimizations are executed

Note
When disabled (i.e. false), ReuseSubquery and ReuseExchange physical optimizations do no optimizations.

fallBackToHdfsForStatsEnabled

spark.sql.statistics.fallBackToHdfs

Used exclusively when DetermineTableStats logical resolution rule is executed.

fileCommitProtocolClass

spark.sql.sources.commitProtocolClass

Used (to instantiate a FileCommitProtocol) when:

histogramEnabled

spark.sql.statistics.histogram.enabled

Used exclusively when AnalyzeColumnCommand logical command is executed.

histogramNumBins

spark.sql.statistics.histogram.numBins

Used exclusively when AnalyzeColumnCommand is executed with spark.sql.statistics.histogram.enabled turned on (and calculates percentiles).

hugeMethodLimit

spark.sql.codegen.hugeMethodLimit

Used exclusively when WholeStageCodegenExec unary physical operator is requested to execute (and generate a RDD[InternalRow]), i.e. when the compiled function exceeds this threshold, the whole-stage codegen is deactivated for this subtree of the query plan.

ignoreCorruptFiles

spark.sql.files.ignoreCorruptFiles

Used when:

ignoreMissingFiles

spark.sql.files.ignoreMissingFiles

Used exclusively when FileScanRDD is created (and then to compute a partition)

inMemoryPartitionPruning

spark.sql.inMemoryColumnarStorage.partitionPruning

Used exclusively when InMemoryTableScanExec physical operator is requested for filtered cached column batches (as a RDD[CachedBatch]).

isParquetBinaryAsString

spark.sql.parquet.binaryAsString

isParquetINT96AsTimestamp

spark.sql.parquet.int96AsTimestamp

isParquetINT96TimestampConversion

spark.sql.parquet.int96TimestampConversion

Used exclusively when ParquetFileFormat is requested to build a data reader with partition column values appended.

joinReorderEnabled

spark.sql.cbo.joinReorder.enabled

Used exclusively in CostBasedJoinReorder logical plan optimization

limitScaleUpFactor

spark.sql.limit.scaleUpFactor

Used exclusively when a physical operator is requested the first n rows as an array.

manageFilesourcePartitions

spark.sql.hive.manageFilesourcePartitions

Used in:

numShufflePartitions

spark.sql.shuffle.partitions

Used in:

offHeapColumnVectorEnabled

spark.sql.columnVector.offheap.enabled

Used when:

optimizerExcludedRules

spark.sql.optimizer.excludedRules

Used exclusively when Optimizer is requested for the optimization batches

optimizerInSetConversionThreshold

spark.sql.optimizer.inSetConversionThreshold

Used exclusively when OptimizeIn logical query optimization is applied to a logical plan (and replaces an In predicate expression with an InSet)

parallelFileListingInStatsComputation

spark.sql.statistics.parallelFileListingInStatsComputation.enabled

Used exclusively when CommandUtils helper object is requested to calculate the total size of a table (with partitions) (for AnalyzeColumnCommand and AnalyzeTableCommand commands)

parquetFilterPushDown

spark.sql.parquet.filterPushdown

Used exclusively when ParquetFileFormat is requested to build a data reader with partition column values appended.

parquetRecordFilterEnabled

spark.sql.parquet.recordLevelFilter.enabled

Used exclusively when ParquetFileFormat is requested to build a data reader with partition column values appended.

parquetVectorizedReaderEnabled

spark.sql.parquet.enableVectorizedReader

Used when:

partitionOverwriteMode

spark.sql.sources.partitionOverwriteMode

Used exclusively when InsertIntoHadoopFsRelationCommand logical command is executed

preferSortMergeJoin

spark.sql.join.preferSortMergeJoin

Used exclusively in JoinSelection execution planning strategy to prefer sort merge join over shuffle hash join.

runSQLonFile

spark.sql.runSQLOnFiles

Used when:

sessionLocalTimeZone

spark.sql.session.timeZone

starSchemaDetection

spark.sql.cbo.starSchemaDetection

Used exclusively in ReorderJoin logical plan optimization (and indirectly in StarSchemaDetection)

stringRedactionPattern

spark.sql.redaction.string.regex

Used when:

subexpressionEliminationEnabled

spark.sql.subexpressionElimination.enabled

Used exclusively when SparkPlan is requested for subexpressionEliminationEnabled flag.

supportQuotedRegexColumnName

spark.sql.parser.quotedRegexColumnNames

Used when:

useCompression

spark.sql.inMemoryColumnarStorage.compressed

Used when…​FIXME

wholeStageEnabled

spark.sql.codegen.wholeStage

Used in:

wholeStageFallback

spark.sql.codegen.fallback

Used exclusively when WholeStageCodegenExec is executed.

wholeStageMaxNumFields

spark.sql.codegen.maxFields

Used in:

wholeStageSplitConsumeFuncByOperator

spark.sql.codegen.splitConsumeFuncByOperator

Used exclusively when CodegenSupport is requested to consume

wholeStageUseIdInClassName

spark.sql.codegen.useIdInClassName

Used exclusively when WholeStageCodegenExec is requested to generate the Java source code for the child physical plan subtree (when created)

windowExecBufferInMemoryThreshold

spark.sql.windowExec.buffer.in.memory.threshold

Used exclusively when WindowExec unary physical operator is executed.

windowExecBufferSpillThreshold

spark.sql.windowExec.buffer.spill.threshold

Used exclusively when WindowExec unary physical operator is executed.

useObjectHashAggregation

spark.sql.execution.useObjectHashAggregateExec

Used exclusively when Aggregation execution planning strategy is executed (and uses AggUtils to create an aggregation physical operator).

Getting Parameters and Hints

You can get the current parameters and hints using the following family of get methods.

Setting Parameters and Hints

You can set parameters and hints using the following family of set methods.

Unsetting Parameters and Hints

You can unset parameters and hints using the following family of unset methods.

Clearing All Parameters and Hints

You can use clear to remove all the parameters and hints in SQLConf.

Redacting Data Source Options with Sensitive Information — redactOptions Method

redactOptions takes the values of the spark.sql.redaction.options.regex and spark.redaction.regex configuration properties.

For every regular expression (in the order), redactOptions redacts sensitive information, i.e. finds the first match of a regular expression pattern in every option key or value and if either matches replaces the value with ***(redacted).

Note
redactOptions is used exclusively when SaveIntoDataSourceCommand logical command is requested for the simple description.

RuntimeConfig — Management Interface of Runtime Configuration

admin阅读(1322)

RuntimeConfig — Management Interface of Runtime Configuration

RuntimeConfig is the management interface of the runtime configuration.

Table 1. RuntimeConfig API
Method Description

get

getAll

getOption

isModifiable

(New in 2.4.0)

set

unset

RuntimeConfig is available using the conf attribute of a SparkSession.

spark sql RuntimeConfig.png
Figure 1. RuntimeConfig, SparkSession and SQLConf

RuntimeConfig is created exclusively when SparkSession is requested for one.

RuntimeConfig takes a SQLConf when created.

get Method

get…​FIXME

Note
get is used when…​FIXME

getAll Method

getAll…​FIXME

Note
getAll is used when…​FIXME

getOption Method

getOption…​FIXME

Note
getOption is used when…​FIXME

set Method

set…​FIXME

Note
set is used when…​FIXME

unset Method

unset…​FIXME

Note
unset is used when…​FIXME

CacheManager — In-Memory Cache for Tables and Views

admin阅读(4448)

CacheManager — In-Memory Cache for Tables and Views

CacheManager is an in-memory cache for tables and views (as logical plans). It uses the internal cachedData collection of CachedData to track logical plans and their cached InMemoryRelation representation.

CacheManager is shared across SparkSessions through SharedState.

Note
A Spark developer can use CacheManager to cache Datasets using cache or persist operators.

Cached Queries — cachedData Internal Registry

cachedData is a collection of CachedData with logical plans and their cached InMemoryRelation representation.

cachedData is cleared when…​FIXME

invalidateCachedPath Method

Caution
FIXME

invalidateCache Method

Caution
FIXME

lookupCachedData Method

Caution
FIXME

uncacheQuery Method

Caution
FIXME

isEmpty Method

Caution
FIXME

Caching Dataset (Registering Analyzed Logical Plan as InMemoryRelation) — cacheQuery Method

cacheQuery adds the analyzed logical plan of the input query to the cachedData internal registry of cached queries.

Internally, cacheQuery firstly requests the input query for the analyzed logical plan and creates a InMemoryRelation with the following properties:

cacheQuery then creates a CachedData (for the analyzed query plan and the InMemoryRelation) and adds it to the cachedData internal registry.

If the input query has already been cached, cacheQuery simply prints the following WARN message to the logs and exits (i.e. does nothing but printing out the WARN message):

Note

cacheQuery is used when:

Removing All Cached Tables From In-Memory Cache — clearCache Method

clearCache acquires a write lock and unpersists RDD[CachedBatch]s of the queries in cachedData before removing them altogether.

Note
clearCache is used when the CatalogImpl is requested to clearCache.

CachedData

Caution
FIXME

recacheByCondition Internal Method

recacheByCondition…​FIXME

Note
recacheByCondition is used when CacheManager is requested to recacheByPlan or recacheByPath.

recacheByPlan Method

recacheByPlan…​FIXME

Note
recacheByPlan is used exclusively when InsertIntoDataSourceCommand logical command is executed.

recacheByPath Method

recacheByPath…​FIXME

Note
recacheByPath is used exclusively when CatalogImpl is requested to refreshByPath.

Replacing Logical Query Segments With Cached Query Plans — useCachedData Method

useCachedData…​FIXME

Note
useCachedData is used exclusively when QueryExecution is requested for a cached logical query plan.

SharedState — State Shared Across SparkSessions

admin阅读(1550)

SharedState — State Shared Across SparkSessions

SharedState holds the shared state across multiple SparkSessions.

Table 1. SharedState’s Properties
Name Type Description

cacheManager

CacheManager

externalCatalog

ExternalCatalog

Metastore of permanent relational entities, i.e. databases, tables, partitions, and functions.

Note
externalCatalog is initialized lazily on the first access.

globalTempViewManager

GlobalTempViewManager

Management interface of global temporary views

jarClassLoader

NonClosableMutableURLClassLoader

sparkContext

SparkContext

Spark Core’s SparkContext

statusStore

SQLAppStatusStore

warehousePath

String

Warehouse path

SharedState is available as the sharedState property of a SparkSession.

SharedState is shared across SparkSessions.

SharedState is created exclusively when accessed using sharedState property of SparkSession.

Tip

Enable INFO logging level for org.apache.spark.sql.internal.SharedState logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

warehousePath Property

warehousePath is the warehouse path with the value of:

  1. hive.metastore.warehouse.dir if defined and spark.sql.warehouse.dir is not

  2. spark.sql.warehouse.dir if hive.metastore.warehouse.dir is undefined

You should see the following INFO message in the logs when SharedState is created:

warehousePath is used exclusively when SharedState initializes ExternalCatalog (and creates the default database in the metastore).

While initialized, warehousePath does the following:

  1. Loads hive-site.xml if available on CLASSPATH, i.e. adds it as a configuration resource to Hadoop’s Configuration (of SparkContext).

  2. Removes hive.metastore.warehouse.dir from SparkConf (of SparkContext) and leaves it off if defined using any of the Hadoop configuration resources.

  3. Sets spark.sql.warehouse.dir or hive.metastore.warehouse.dir in the Hadoop configuration (of SparkContext)

    1. If hive.metastore.warehouse.dir has been defined in any of the Hadoop configuration resources but spark.sql.warehouse.dir has not, spark.sql.warehouse.dir becomes the value of hive.metastore.warehouse.dir.

      You should see the following INFO message in the logs:

    2. Otherwise, the Hadoop configuration’s hive.metastore.warehouse.dir is set to spark.sql.warehouse.dir

      You should see the following INFO message in the logs:

externalCatalog Property

externalCatalog is created reflectively per spark.sql.catalogImplementation internal configuration property (with the current Hadoop’s Configuration as SparkContext.hadoopConfiguration):

While initialized:

  1. Creates the default database (with default database description and warehousePath location) if it doesn’t exist.

  2. Registers a ExternalCatalogEventListener that propagates external catalog events to the Spark listener bus.

externalCatalogClassName Internal Method

externalCatalogClassName gives the name of the class of the ExternalCatalog per spark.sql.catalogImplementation, i.e.

Note
externalCatalogClassName is used exclusively when SharedState is requested for the ExternalCatalog.

Accessing Management Interface of Global Temporary Views — globalTempViewManager Property

When accessed for the very first time, globalTempViewManager gets the name of the global temporary view database (as the value of spark.sql.globalTempDatabase internal static configuration property).

In the end, globalTempViewManager creates a new GlobalTempViewManager (with the database name).

globalTempViewManager throws a SparkException when the global temporary view database exist in the ExternalCatalog.

Note
globalTempViewManager is used when BaseSessionStateBuilder and HiveSessionStateBuilder are requested for the SessionCatalog.

HiveSessionStateBuilder — Builder of Hive-Specific SessionState

admin阅读(1484)

HiveSessionStateBuilder — Builder of Hive-Specific SessionState

spark sql HiveSessionStateBuilder.png
Figure 1. HiveSessionStateBuilder’s Hive-Specific Properties

HiveSessionStateBuilder is created (using newBuilder) exclusively when…​FIXME

spark sql HiveSessionStateBuilder SessionState.png
Figure 2. HiveSessionStateBuilder and SessionState (in SparkSession)
Table 1. HiveSessionStateBuilder’s Properties
Name Description

analyzer

catalog

HiveSessionCatalog with the following:

Note
If parentState is defined, the state is copied to catalog

Used to create Hive-specific Analyzer and a RelationConversions logical evaluation rule (as part of Hive-Specific Analyzer’s PostHoc Resolution Rules)

externalCatalog

HiveExternalCatalog

planner

SparkPlanner with Hive-specific strategies.

resourceLoader

HiveSessionResourceLoader

SparkPlanner with Hive-Specific Strategies — planner Property

Note
planner is part of BaseSessionStateBuilder Contract to create a query planner.

planner is a SparkPlanner with…​FIXME

planner uses the Hive-specific strategies.

Table 2. Hive-Specific SparkPlanner’s Hive-Specific Strategies
Strategy Description

HiveTableScans

Scripts

Logical Query Plan Analyzer with Hive-Specific Rules — analyzer Property

Note
analyzer is part of BaseSessionStateBuilder Contract to create a logical query plan analyzer.

analyzer is a Analyzer with Hive-specific SessionCatalog (and SQLConf).

analyzer uses the Hive-specific extended resolution, postHoc resolution and extended check rules.

Table 3. Hive-Specific Analyzer’s Extended Resolution Rules (in the order of execution)
Logical Rule Description

ResolveHiveSerdeTable

FindDataSourceTable

ResolveSQLOnFile

Table 4. Hive-Specific Analyzer’s PostHoc Resolution Rules
Logical Rule Description

DetermineTableStats

RelationConversions

PreprocessTableCreation

PreprocessTableInsertion

DataSourceAnalysis

HiveAnalysis

Table 5. Hive-Specific Analyzer’s Extended Check Rules
Logical Rule Description

PreWriteCheck

PreReadCheck

Builder Function to Create HiveSessionStateBuilder — newBuilder Factory Method

Note
newBuilder is part of BaseSessionStateBuilder Contract to…​FIXME.

newBuilder…​FIXME

Creating HiveSessionStateBuilder Instance

HiveSessionStateBuilder takes the following when created:

关注公众号:spark技术分享

联系我们联系我们