关注 spark技术分享,
撸spark源码 玩spark最佳实践

ClusteredDistribution

admin阅读(1537)

ClusteredDistribution

ClusteredDistribution is a Distribution that creates a HashPartitioning for the clustering expressions and a requested number of partitions.

ClusteredDistribution requires that the clustering expressions should not be empty (i.e. Nil).

ClusteredDistribution is created when the following physical operators are requested for a required child distribution:

  • MapGroupsExec, HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec, WindowExec

  • Spark Structured Streaming’s FlatMapGroupsWithStateExec, StateStoreRestoreExec, StateStoreSaveExec, StreamingDeduplicateExec, StreamingSymmetricHashJoinExec, StreamingSymmetricHashJoinExec

  • SparkR’s FlatMapGroupsInRExec

  • PySpark’s FlatMapGroupsInPandasExec

ClusteredDistribution is used when:

  • DataSourcePartitioning, SinglePartition, HashPartitioning, and RangePartitioning are requested to satisfies

  • EnsureRequirements is requested to add an ExchangeCoordinator for Adaptive Query Execution

createPartitioning Method

Note
createPartitioning is part of Distribution Contract to create a Partitioning for a given number of partitions.

createPartitioning creates a HashPartitioning for the clustering expressions and the input numPartitions.

createPartitioning reports an AssertionError when the number of partitions is not the input numPartitions.

Creating ClusteredDistribution Instance

ClusteredDistribution takes the following when created:

  • Clustering expressions

  • Required number of partitions (default: None)

Note
None for the required number of partitions indicates to use any number of partitions (possibly spark.sql.shuffle.partitions configuration property with the default of 200 partitions).

BroadcastDistribution

admin阅读(1428)

BroadcastDistribution

BroadcastDistribution is a Distribution that indicates to use one partition only and…​FIXME.

BroadcastDistribution is created when:

  1. BroadcastHashJoinExec is requested for required child output distributions (with HashedRelationBroadcastMode of the build join keys)

  2. BroadcastNestedLoopJoinExec is requested for required child output distributions (with IdentityBroadcastMode)

BroadcastDistribution takes a BroadcastMode when created.

Note
BroadcastDistribution is converted to a BroadcastExchangeExec physical operator when EnsureRequirements physical query plan optimization is executed (and enforces partition requirements for data distribution and ordering).

createPartitioning Method

Note
createPartitioning is part of Distribution Contract to create a Partitioning for a given number of partitions.

createPartitioning…​FIXME

AllTuples

admin阅读(1325)

AllTuples

AllTuples is a Distribution that indicates to use one partition only.

createPartitioning Method

Note
createPartitioning is part of Distribution Contract to create a Partitioning for a given number of partitions.

createPartitioning…​FIXME

Distribution — Contract For Data Distribution Across Partitions

admin阅读(1422)

Distribution — Contract For Data Distribution Across Partitions

Distribution is the contract of…​FIXME

Note
Distribution is a Scala sealed contract which means that all possible distributions are all in the same compilation unit (file).
Table 1. Distribution Contract
Method Description

requiredNumPartitions

Gives the required number of partitions for a distribution.

Used exclusively when EnsureRequirements physical optimization is requested to enforce partition requirements of a physical operator (and a child operator’s output partitioning does not satisfy a required child distribution that leads to inserting a ShuffleExchangeExec operator to a physical plan).

Note
None for the required number of partitions indicates to use any number of partitions (possibly spark.sql.shuffle.partitions configuration property with the default of 200 partitions).

createPartitioning

Creates a Partitioning for a given number of partitions.

Used exclusively when EnsureRequirements physical optimization is requested to enforce partition requirements of a physical operator (and creates a ShuffleExchangeExec physical operator with a required Partitioning).

Table 2. Distributions
Distribution Description

AllTuples

BroadcastDistribution

ClusteredDistribution

HashClusteredDistribution

OrderedDistribution

UnspecifiedDistribution

ExchangeCoordinator

admin阅读(1763)

ExchangeCoordinator

Caution
FIXME

postShuffleRDD Method

Caution
FIXME

Partitioning — Specification of Physical Operator’s Output Partitions

admin阅读(1570)

Partitioning — Specification of Physical Operator’s Output Partitions

Partitioning is the contract to hint the Spark Physical Optimizer for the number of partitions the output of a physical operator should be split across.

numPartitions is used in:

Table 1. Partitioning Schemes (Partitionings) and Their Properties
Partitioning compatibleWith guarantees numPartitions satisfies

BroadcastPartitioning

BroadcastPartitioning with the same BroadcastMode

Exactly the same BroadcastPartitioning

1

BroadcastDistribution with the same BroadcastMode

HashPartitioning

  • clustering expressions

  • numPartitions

HashPartitioning (when their underlying expressions are semantically equal, i.e. deterministic and canonically equal)

HashPartitioning (when their underlying expressions are semantically equal, i.e. deterministic and canonically equal)

Input numPartitions

PartitioningCollection

  • partitionings

Any Partitioning that is compatible with one of the input partitionings

Any Partitioning that is guaranteed by any of the input partitionings

Number of partitions of the first Partitioning in the input partitionings

Any Distribution that is satisfied by any of the input partitionings

RangePartitioning

  • ordering collection of SortOrder

  • numPartitions

RangePartitioning (when semantically equal, i.e. underlying expressions are deterministic and canonically equal)

RangePartitioning (when semantically equal, i.e. underlying expressions are deterministic and canonically equal)

Input numPartitions

RoundRobinPartitioning

  • numPartitions

Always negative

Always negative

Input numPartitions

UnspecifiedDistribution

SinglePartition

Any Partitioning with exactly one partition

Any Partitioning with exactly one partition

1

Any Distribution except BroadcastDistribution

UnknownPartitioning

  • numPartitions

Always negative

Always negative

Input numPartitions

UnspecifiedDistribution

ProjectEstimation

admin阅读(1098)

ProjectEstimation

ProjectEstimation is…​FIXME

Estimating Statistics and Query Hints of Project Logical Operator — estimate Method

estimate…​FIXME

Note
estimate is used exclusively when BasicStatsPlanVisitor is requested to estimate statistics and query hints of a Project logical operator.

JoinEstimation

admin阅读(1225)

JoinEstimation

JoinEstimation is created exclusively for BasicStatsPlanVisitor to estimate statistics of a Join logical operator.

Note
BasicStatsPlanVisitor is used only when cost-based optimization is enabled.

JoinEstimation takes a Join logical operator when created.

When created, JoinEstimation immediately takes the estimated statistics and query hints of the left and right sides of the Join logical operator.

  • Inner, Cross, LeftOuter, RightOuter, FullOuter, LeftSemi and LeftAnti

For the other join types (e.g. ExistenceJoin), JoinEstimation prints out a DEBUG message to the logs and returns None (to “announce” that no statistics could be computed).

Tip

Enable DEBUG logging level for org.apache.spark.sql.catalyst.plans.logical.statsEstimation.JoinEstimation logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

estimateInnerOuterJoin Internal Method

estimateInnerOuterJoin destructures Join logical operator into a join type with the left and right keys.

estimateInnerOuterJoin simply returns None (i.e. nothing) when either side of the Join logical operator have no row count statistic.

Note
estimateInnerOuterJoin is used exclusively when JoinEstimation is requested to estimate statistics and query hints of a Join logical operator for Inner, Cross, LeftOuter, RightOuter and FullOuter joins.

computeByNdv Internal Method

computeByNdv…​FIXME

Note
computeByNdv is used exclusively when JoinEstimation is requested for computeCardinalityAndStats

computeCardinalityAndStats Internal Method

computeCardinalityAndStats…​FIXME

Note
computeCardinalityAndStats is used exclusively when JoinEstimation is requested for estimateInnerOuterJoin

Computing Join Cardinality Using Equi-Height Histograms — computeByHistogram Internal Method

computeByHistogram…​FIXME

Note
computeByHistogram is used exclusively when JoinEstimation is requested for computeCardinalityAndStats (and the histograms of both column attributes used in a join are available).

Estimating Statistics for Left Semi and Left Anti Joins — estimateLeftSemiAntiJoin Internal Method

estimateLeftSemiAntiJoin estimates statistics of the Join logical operator only when estimated row count statistic is available. Otherwise, estimateLeftSemiAntiJoin simply returns None (i.e. no statistics estimated).

Note
row count statistic of a table is available only after ANALYZE TABLE COMPUTE STATISTICS SQL command.

If available, estimateLeftSemiAntiJoin takes the estimated row count statistic of the left side of the Join operator.

Note
Use ANALYZE TABLE COMPUTE STATISTICS SQL command on the left logical plan to compute row count statistics.
Note
Use ANALYZE TABLE COMPUTE STATISTICS FOR COLUMNS SQL command on the left logical plan to generate column (equi-height) histograms for more accurate estimations.

In the end, estimateLeftSemiAntiJoin creates a new Statistics with the following estimates:

  1. Total size (in bytes) is the output size for the output schema of the join, the row count statistic (aka output rows) and column histograms.

  2. Row count is exactly the row count of the left side

  3. Column histograms is exactly the column histograms of the left side

Note
estimateLeftSemiAntiJoin is used exclusively when JoinEstimation is requested to estimate statistics and query hints for LeftSemi and LeftAnti joins.

Estimating Statistics and Query Hints of Join Logical Operator — estimate Method

estimate estimates statistics and query hints of the Join logical operator per join type:

For other join types, estimate prints out the following DEBUG message to the logs and returns None (to “announce” that no statistics could be computed).

Note
estimate is used exclusively when BasicStatsPlanVisitor is requested to estimate statistics and query hints of a Join logical operator.

FilterEstimation

admin阅读(1252)

FilterEstimation

FilterEstimation is…​FIXME

computeEqualityPossibilityByHistogram Internal Method

computeEqualityPossibilityByHistogram…​FIXME

Note
computeEqualityPossibilityByHistogram is used when…​FIXME

computeComparisonPossibilityByHistogram Internal Method

computeComparisonPossibilityByHistogram…​FIXME

Note
computeComparisonPossibilityByHistogram is used when…​FIXME

update Method

update…​FIXME

Note
update is used when…​FIXME

AggregateEstimation

admin阅读(1326)

AggregateEstimation

AggregateEstimation is…​FIXME

Estimating Statistics and Query Hints of Aggregate Logical Operator — estimate Method

estimate…​FIXME

Note
estimate is used exclusively when BasicStatsPlanVisitor is requested to estimate statistics and query hints of a Aggregate logical operator.

关注公众号:spark技术分享

联系我们联系我们