关注 spark技术分享,
撸spark源码 玩spark最佳实践

Configuration Properties

Configuration Properties

Configuration properties (aka settings) allow you to fine-tune a Spark SQL application.

You can set a configuration property in a SparkSession while creating a new instance using config method.

  1. Sets spark.sql.warehouse.dir for the Spark SQL session

You can also set a property using SQL SET command.

Table 1. Spark SQL Configuration Properties
Name Default Description

spark.sql.adaptive.enabled

false

Use SQLConf.adaptiveExecutionEnabled method to access the current value.

spark.sql.allowMultipleContexts

true

Controls whether creating multiple SQLContexts/HiveContexts is allowed

spark.sql.autoBroadcastJoinThreshold

10L * 1024 * 1024 (10M)

Maximum size (in bytes) for a table that will be broadcast to all worker nodes when performing a join.

If the size of the statistics of the logical plan of a table is at most the setting, the DataFrame is broadcast for join.

Negative values or 0 disable broadcasting.

Use SQLConf.autoBroadcastJoinThreshold method to access the current value.

spark.sql.avro.compression.codec

snappy

The compression codec to use when writing Avro data to disk

The supported codecs are:

  • uncompressed

  • deflate

  • snappy

  • bzip2

  • xz

Use SQLConf.avroCompressionCodec method to access the current value.

spark.sql.broadcastTimeout

5 * 60

Timeout in seconds for the broadcast wait time in broadcast joins.

When negative, it is assumed infinite (i.e. Duration.Inf)

Use SQLConf.broadcastTimeout method to access the current value.

spark.sql.caseSensitive

false

(internal) Controls whether the query analyzer should be case sensitive (true) or not (false). It is highly discouraged to turn on case sensitive mode.

Use SQLConf.caseSensitiveAnalysis method to access the current value.

spark.sql.cbo.enabled

false

Enables cost-based optimization (CBO) for estimation of plan statistics when true.

Use SQLConf.cboEnabled method to access the current value.

spark.sql.cbo.joinReorder.enabled

false

Enables join reorder for cost-based optimization (CBO).

Use SQLConf.joinReorderEnabled method to access the current value.

spark.sql.cbo.starSchemaDetection

false

Enables join reordering based on star schema detection for cost-based optimization (CBO) in ReorderJoin logical plan optimization.

Use SQLConf.starSchemaDetection method to access the current value.

spark.sql.codegen.comments

false

Controls whether CodegenContext should register comments (true) or not (false).

spark.sql.codegen.factoryMode

FALLBACK

(internal) Determines the codegen generator fallback behavior

Acceptable values:

  • CODEGEN_ONLY – disable fallback mode

  • FALLBACK – try codegen first and, if any compile error happens, fallback to interpreted mode

  • NO_CODEGEN – skips codegen and always uses interpreted path

Used when CodeGeneratorWithInterpretedFallback is requested to createObject (when UnsafeProjection is requested to create an UnsafeProjection for Catalyst expressions)

spark.sql.codegen.fallback

true

(internal) Whether the whole stage codegen could be temporary disabled for the part of a query that has failed to compile generated code (true) or not (false).

Use SQLConf.wholeStageFallback method to access the current value.

spark.sql.codegen.hugeMethodLimit

65535

(internal) The maximum bytecode size of a single compiled Java function generated by whole-stage codegen.

The default value 65535 is the largest bytecode size possible for a valid Java method. When running on HotSpot, it may be preferable to set the value to 8000 (which is the value of HugeMethodLimit in the OpenJDK JVM settings)

Use SQLConf.hugeMethodLimit method to access the current value.

spark.sql.codegen.useIdInClassName

true

(internal) Controls whether to embed the (whole-stage) codegen stage ID into the class name of the generated class as a suffix (true) or not (false)

Use SQLConf.wholeStageUseIdInClassName method to access the current value.

spark.sql.codegen.maxFields

100

(internal) Maximum number of output fields (including nested fields) that whole-stage codegen supports. Going above the number deactivates whole-stage codegen.

Use SQLConf.wholeStageMaxNumFields method to access the current value.

spark.sql.codegen.splitConsumeFuncByOperator

true

(internal) Controls whether whole stage codegen puts the logic of consuming rows of each physical operator into individual methods, instead of a single big method. This can be used to avoid oversized function that can miss the opportunity of JIT optimization.

Use SQLConf.wholeStageSplitConsumeFuncByOperator method to access the current value.

spark.sql.codegen.wholeStage

true

(internal) Whether the whole stage (of multiple physical operators) will be compiled into a single Java method (true) or not (false).

Use SQLConf.wholeStageEnabled method to access the current value.

spark.sql.columnVector.offheap.enabled

false

(internal) Enables OffHeapColumnVector in ColumnarBatch (true) or not (false). When disabled, OnHeapColumnVector is used instead.

Use SQLConf.offHeapColumnVectorEnabled method to access the current value.

spark.sql.columnNameOfCorruptRecord

spark.sql.defaultSizeInBytes

Java’s Long.MaxValue

(internal) Estimated size of a table or relation used in query planning

Set to Java’s Long.MaxValue which is larger than spark.sql.autoBroadcastJoinThreshold to be more conservative. That is to say by default the optimizer will not choose to broadcast a table unless it knows for sure that the table size is small enough.

Used by the planner to decide when it is safe to broadcast a relation. By default, the system will assume that tables are too large to broadcast.

Use SQLConf.defaultSizeInBytes method to access the current value.

spark.sql.dialect

spark.sql.exchange.reuse

true

(internal) When enabled (i.e. true), the Spark planner will find duplicated exchanges and subqueries and re-use them.

Note
When disabled (i.e. false), ReuseSubquery and ReuseExchange physical optimizations (that the Spark planner uses for physical query plan optimization) do nothing.

Use SQLConf.exchangeReuseEnabled method to access the current value.

spark.sql.execution.useObjectHashAggregateExec

true

Enables ObjectHashAggregateExec when Aggregation execution planning strategy is executed.

Use SQLConf.useObjectHashAggregation method to access the current value.

spark.sql.files.ignoreCorruptFiles

false

Controls whether to ignore corrupt files (true) or not (false). If true, the Spark jobs will continue to run when encountering corrupted files and the contents that have been read will still be returned.

Use SQLConf.ignoreCorruptFiles method to access the current value.

spark.sql.files.ignoreMissingFiles

false

Controls whether to ignore missing files (true) or not (false). If true, the Spark jobs will continue to run when encountering missing files and the contents that have been read will still be returned.

Use SQLConf.ignoreMissingFiles method to access the current value.

spark.sql.hive.convertMetastoreOrc

true

(internal) When enabled (i.e. true), the built-in ORC reader and writer are used to process ORC tables created by using the HiveQL syntax (instead of Hive serde).

spark.sql.hive.convertMetastoreParquet

true

Controls whether to use the built-in Parquet reader and writer to process parquet tables created by using the HiveQL syntax (instead of Hive serde).

spark.sql.hive.convertMetastoreParquet.mergeSchema

false

Enables trying to merge possibly different but compatible Parquet schemas in different Parquet data files.

This configuration is only effective when spark.sql.hive.convertMetastoreParquet is enabled.

spark.sql.hive.manageFilesourcePartitions

true

Enables metastore partition management for file source tables. This includes both datasource and converted Hive tables.

When enabled (true), datasource tables store partition in the Hive metastore, and use the metastore to prune partitions during query planning.

Use SQLConf.manageFilesourcePartitions method to access the current value.

spark.sql.hive.metastore.barrierPrefixes

(empty)

Comma-separated list of class prefixes that should explicitly be reloaded for each version of Hive that Spark SQL is communicating with, e.g. Hive UDFs that are declared in a prefix that typically would be shared (i.e. org.apache.spark.*)

spark.sql.hive.metastore.jars

builtin

Location of the jars that should be used to create a HiveClientImpl.

Supported locations:

  1. builtin – the jars that were used to load Spark SQL (aka Spark classes). Valid only when using the execution version of Hive, i.e. spark.sql.hive.metastore.version

  2. maven – download the Hive jars from Maven repositories

  3. Classpath in the standard format for both Hive and Hadoop

spark.sql.hive.metastore.sharedPrefixes

"com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc"

Comma-separated list of class prefixes that should be loaded using the classloader that is shared between Spark SQL and a specific version of Hive.

An example of classes that should be shared are:

  • JDBC drivers that are needed to talk to the metastore

  • Other classes that interact with classes that are already shared, e.g. custom appenders that are used by log4j

spark.sql.hive.metastore.version

1.2.1

Version of the Hive metastore (and the client classes and jars).

Supported versions range from 0.12.0 up to and including 2.3.2.

spark.sql.inMemoryColumnarStorage.batchSize

10000

(internal) Controls…​FIXME

Use SQLConf.columnBatchSize method to access the current value.

spark.sql.inMemoryColumnarStorage.compressed

true

(internal) Controls…​FIXME

Use SQLConf.useCompression method to access the current value.

spark.sql.inMemoryColumnarStorage.enableVectorizedReader

true

Enables vectorized reader for columnar caching.

Use SQLConf.cacheVectorizedReaderEnabled method to access the current value.

spark.sql.inMemoryColumnarStorage.partitionPruning

true

(internal) Enables partition pruning for in-memory columnar tables

Use SQLConf.inMemoryPartitionPruning method to access the current value.

spark.sql.join.preferSortMergeJoin

true

(internal) Controls whether JoinSelection execution planning strategy prefers sort merge join over shuffled hash join.

Use SQLConf.preferSortMergeJoin method to access the current value.

spark.sql.limit.scaleUpFactor

4

(internal) Minimal increase rate in the number of partitions between attempts when executing take operator on a structured query. Higher values lead to more partitions read. Lower values might lead to longer execution times as more jobs will be run.

Use SQLConf.limitScaleUpFactor method to access the current value.

spark.sql.optimizer.excludedRules

(empty)

Comma-separated list of optimization rule names that should be disabled (excluded) in the optimizer. The optimizer will log the rules that have indeed been excluded.

Note
It is not guaranteed that all the rules in this configuration will eventually be excluded, as some rules are necessary for correctness.

Use SQLConf.optimizerExcludedRules method to access the current value.

spark.sql.optimizer.inSetConversionThreshold

10

(internal) The threshold of set size for InSet conversion.

Use SQLConf.optimizerInSetConversionThreshold method to access the current value.

spark.sql.optimizer.maxIterations

100

Maximum number of iterations for Analyzer and Optimizer.

spark.sql.orc.impl

native

(internal) When native, use the native version of ORC support instead of the ORC library in Hive 1.2.1.

Acceptable values:

  • hive

  • native

spark.sql.parquet.binaryAsString

false

Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do not differentiate between binary data and strings when writing out the Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.

Use SQLConf.isParquetBinaryAsString method to access the current value.

spark.sql.parquet.int96AsTimestamp

true

Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.

Use SQLConf.isParquetINT96AsTimestamp method to access the current value.

spark.sql.parquet.enableVectorizedReader

true

Enables vectorized parquet decoding.

Use SQLConf.parquetVectorizedReaderEnabled method to access the current value.

spark.sql.parquet.filterPushdown

true

Controls the filter predicate push-down optimization for data sources using parquet file format

Use SQLConf.parquetFilterPushDown method to access the current value.

spark.sql.parquet.int96TimestampConversion

false

Controls whether timestamp adjustments should be applied to INT96 data when converting to timestamps, for data written by Impala. This is necessary because Impala stores INT96 data with a different timezone offset than Hive and Spark.

Use SQLConf.isParquetINT96TimestampConversion method to access the current value.

spark.sql.parquet.recordLevelFilter.enabled

false

Enables Parquet’s native record-level filtering using the pushed down filters.

Note
This configuration only has an effect when spark.sql.parquet.filterPushdown is enabled (and it is by default).

Use SQLConf.parquetRecordFilterEnabled method to access the current value.

spark.sql.parser.quotedRegexColumnNames

false

Controls whether quoted identifiers (using backticks) in SELECT statements should be interpreted as regular expressions.

Use SQLConf.supportQuotedRegexColumnName method to access the current value.

spark.sql.sort.enableRadixSort

true

(internal) Controls whether to use radix sort (true) or not (false) in ShuffleExchangeExec and SortExec physical operators

Radix sort is much faster but requires additional memory to be reserved up-front. The memory overhead may be significant when sorting very small rows (up to 50% more).

Use SQLConf.enableRadixSort method to access the current value.

spark.sql.sources.commitProtocolClass

SQLHadoopMapReduceCommitProtocol

(internal)

Use SQLConf.fileCommitProtocolClass method to access the current value.

spark.sql.sources.partitionOverwriteMode

static

When INSERT OVERWRITE a partitioned data source table with dynamic partition columns, Spark SQL supports two modes (case-insensitive):

  • static – Spark deletes all the partitions that match the partition specification (e.g. PARTITION(a=1,b)) in the INSERT statement, before overwriting

  • dynamic – Spark doesn’t delete partitions ahead, and only overwrites those partitions that have data written into it

The default (STATIC) is to keep the same behavior of Spark prior to 2.3. Note that this config doesn’t affect Hive serde tables, as they are always overwritten with dynamic mode.

Use SQLConf.partitionOverwriteMode method to access the current value.

spark.sql.pivotMaxValues

10000

Maximum number of (distinct) values that will be collected without error (when doing a pivot without specifying the values for the pivot column)

Use SQLConf.dataFramePivotMaxValues method to access the current value.

spark.sql.redaction.options.regex

(?i)secret!password

Regular expression to find options of a Spark SQL command with sensitive information

The values of the options matched will be redacted in the explain output.

This redaction is applied on top of the global redaction configuration defined by spark.redaction.regex configuration.

Used exclusively when SQLConf is requested to redactOptions.

spark.sql.redaction.string.regex

(undefined)

Regular expression to point at sensitive information in text output

When this regex matches a string part, that string part is replaced by a dummy value (i.e. ***(redacted)). This is currently used to redact the output of SQL explain commands.

Note
When this conf is not set, the value of spark.redaction.string.regex is used instead.

Use SQLConf.stringRedactionPattern method to access the current value.

spark.sql.retainGroupColumns

true

Controls whether to retain columns used for aggregation or not (in RelationalGroupedDataset operators).

Use SQLConf.dataFrameRetainGroupColumns method to access the current value.

spark.sql.runSQLOnFiles

true

(internal) Controls whether Spark SQL could use datasource.path as a table in a SQL query.

Use SQLConf.runSQLonFile method to access the current value.

spark.sql.selfJoinAutoResolveAmbiguity

true

Controls whether to resolve ambiguity in join conditions for self-joins automatically.

spark.sql.session.timeZone

Java’s TimeZone.getDefault.getID

The ID of session-local timezone, e.g. “GMT”, “America/Los_Angeles”, etc.

Use SQLConf.sessionLocalTimeZone method to access the current value.

spark.sql.shuffle.partitions

200

Number of partitions to use by default when shuffling data for joins or aggregations

Corresponds to Apache Hive’s mapred.reduce.tasks property that Spark considers deprecated.

Use SQLConf.numShufflePartitions method to access the current value.

spark.sql.sources.bucketing.enabled

true

Enables bucketing support. When disabled (i.e. false), bucketed tables are considered regular (non-bucketed) tables.

Use SQLConf.bucketingEnabled method to access the current value.

spark.sql.sources.default

parquet

Defines the default data source to use for DataFrameReader.

Used when:

spark.sql.statistics.fallBackToHdfs

false

Enables automatic calculation of table size statistic by falling back to HDFS if the table statistics are not available from table metadata.

This can be useful in determining if a table is small enough for auto broadcast joins in query planning.

Use SQLConf.fallBackToHdfsForStatsEnabled method to access the current value.

spark.sql.statistics.histogram.enabled

false

Enables generating histograms when computing column statistics

Note
Histograms can provide better estimation accuracy. Currently, Spark only supports equi-height histogram. Note that collecting histograms takes extra cost. For example, collecting column statistics usually takes only one table scan, but generating equi-height histogram will cause an extra table scan.

Use SQLConf.histogramEnabled method to access the current value.

spark.sql.statistics.histogram.numBins

254

(internal) The number of bins when generating histograms.

Note
The number of bins must be greater than 1.

Use SQLConf.histogramNumBins method to access the current value.

spark.sql.statistics.parallelFileListingInStatsComputation.enabled

true

(internal) Enables parallel file listing in SQL commands, e.g. ANALYZE TABLE (as opposed to single thread listing that can be particularly slow with tables with hundreds of partitions)

Use SQLConf.parallelFileListingInStatsComputation method to access the current value.

spark.sql.statistics.size.autoUpdate.enabled

false

Enables automatic update of the table size statistic of a table after the table has changed.

Important
If the total number of files of the table is very large this can be expensive and slow down data change commands.

Use SQLConf.autoSizeUpdateEnabled method to access the current value.

spark.sql.subexpressionElimination.enabled

true

(internal) Enables subexpression elimination

Use subexpressionEliminationEnabled method to access the current value.

spark.sql.TungstenAggregate.testFallbackStartsAt

(empty)

A comma-separated pair of numbers, e.g. 5,10, that HashAggregateExec uses to inform TungstenAggregationIterator to switch to a sort-based aggregation when the hash-based approach is unable to acquire enough memory.

spark.sql.ui.retainedExecutions

1000

The number of SQLExecutionUIData entries to keep in failedExecutions and completedExecutions internal registries.

When a query execution finishes, the execution is removed from the internal activeExecutions registry and stored in failedExecutions or completedExecutions given the end execution status. It is when SQLListener makes sure that the number of SQLExecutionUIData entires does not exceed spark.sql.ui.retainedExecutions Spark property and removes the excess of entries.

spark.sql.windowExec.buffer.in.memory.threshold

4096

(internal) Threshold for number of rows guaranteed to be held in memory by WindowExec physical operator.

Use windowExecBufferInMemoryThreshold method to access the current value.

spark.sql.windowExec.buffer.spill.threshold

4096

(internal) Threshold for number of rows buffered in a WindowExec physical operator.

Use windowExecBufferSpillThreshold method to access the current value.

赞(0) 打赏
未经允许不得转载:spark技术分享 » Configuration Properties
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏