关注 spark技术分享,
撸spark源码 玩spark最佳实践

BroadcastHashJoinExec

admin阅读(1774)

BroadcastHashJoinExec Binary Physical Operator for Broadcast Hash Join

BroadcastHashJoinExec is a binary physical operator to perform a broadcast hash join.

BroadcastHashJoinExec is created after applying JoinSelection execution planning strategy to ExtractEquiJoinKeys-destructurable logical query plans (i.e. INNER, CROSS, LEFT OUTER, LEFT SEMI, LEFT ANTI) of which the right physical operator can be broadcast.

BroadcastHashJoinExec supports Java code generation (aka codegen).

BroadcastHashJoinExec requires that partition requirements for the two children physical operators match BroadcastDistribution (with a HashedRelationBroadcastMode) and UnspecifiedDistribution (for left and right sides of a join or vice versa).

Table 1. BroadcastHashJoinExec’s Performance Metrics
Key Name (in web UI) Description

numOutputRows

number of output rows

avgHashProbe

avg hash probe

spark sql BroadcastHashJoinExec webui query details.png
Figure 1. BroadcastHashJoinExec in web UI (Details for Query)
Note
The prefix for variable names for BroadcastHashJoinExec operators in CodegenSupport-generated code is bhj.

Table 2. BroadcastHashJoinExec’s Required Child Output Distributions
BuildSide Left Child Right Child

BuildLeft

BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys

UnspecifiedDistribution

BuildRight

UnspecifiedDistribution

BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

Note
doExecute is part of SparkPlan Contract to generate the runtime representation of a structured query as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute…​FIXME

Generating Java Source Code for Inner Join — codegenInner Internal Method

codegenInner…​FIXME

Note
codegenInner is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the “consume” path in whole-stage code generation.

Generating Java Source Code for Left or Right Outer Join — codegenOuter Internal Method

codegenOuter…​FIXME

Note
codegenOuter is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the “consume” path in whole-stage code generation.

Generating Java Source Code for Left Semi Join — codegenSemi Internal Method

codegenSemi…​FIXME

Note
codegenSemi is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the “consume” path in whole-stage code generation.

Generating Java Source Code for Anti Join — codegenAnti Internal Method

codegenAnti…​FIXME

Note
codegenAnti is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the “consume” path in whole-stage code generation.

codegenExistence Internal Method

codegenExistence…​FIXME

Note
codegenExistence is used exclusively when BroadcastHashJoinExec is requested to generate the Java code for the “consume” path in whole-stage code generation.

genStreamSideJoinKey Internal Method

genStreamSideJoinKey…​FIXME

Note
genStreamSideJoinKey is used when BroadcastHashJoinExec is requested to generate the Java source code for inner, outer, left semi, anti and existence joins (for the “consume” path in whole-stage code generation).

Creating BroadcastHashJoinExec Instance

BroadcastHashJoinExec takes the following when created:

BroadcastExchangeExec

admin阅读(2713)

BroadcastExchangeExec Unary Physical Operator for Broadcast Joins

BroadcastExchangeExec is a Exchange unary physical operator to collect and broadcast rows of a child relation (to worker nodes).

BroadcastExchangeExec is created exclusively when EnsureRequirements physical query plan optimization ensures BroadcastDistribution of the input data of a physical operator (that can really be either BroadcastHashJoinExec or BroadcastNestedLoopJoinExec operators).

Table 1. BroadcastExchangeExec’s Performance Metrics
Key Name (in web UI) Description

broadcastTime

time to broadcast (ms)

buildTime

time to build (ms)

collectTime

time to collect (ms)

dataSize

data size (bytes)

spark sql BroadcastExchangeExec webui details for query.png
Figure 1. BroadcastExchangeExec in web UI (Details for Query)

BroadcastExchangeExec uses BroadcastPartitioning partitioning scheme (with the input BroadcastMode).

Waiting Until Relation Has Been Broadcast — doExecuteBroadcast Method

doExecuteBroadcast waits until the rows are broadcast.

Note
doExecuteBroadcast waits spark.sql.broadcastTimeout (defaults to 5 minutes).
Note
doExecuteBroadcast is part of SparkPlan Contract to return the result of a structured query as a broadcast variable.

Lazily-Once-Initialized Asynchronously-Broadcast relationFuture Internal Attribute

When “materialized” (aka executed), relationFuture finds the current execution id and sets it to the Future thread.

relationFuture requests child physical operator to executeCollectIterator.

relationFuture records the time for executeCollectIterator in collectTime metrics.

Note
relationFuture accepts a relation with up to 512 millions rows and 8GB in size, and reports a SparkException if the conditions are violated.

relationFuture requests the input BroadcastMode to transform the internal rows to create a relation, e.g. HashedRelation or a Array[InternalRow].

relationFuture calculates the data size:

  • For a HashedRelation, relationFuture requests it to estimatedSize

  • For a Array[InternalRow], relationFuture transforms the InternalRows to UnsafeRows and requests each to getSizeInBytes that it sums all up.

relationFuture records the data size as the dataSize metric.

relationFuture records the buildTime metric.

relationFuture requests the SparkContext to broadcast the relation and records the time in broadcastTime metrics.

In the end, relationFuture requests SQLMetrics to post a SparkListenerDriverAccumUpdates (with the execution id and the SQL metrics) and returns the broadcast internal rows.

Note
Since initialization of relationFuture happens on the driver, posting a SparkListenerDriverAccumUpdates is the only way how all the SQL metrics could be accessible to other subsystems using SparkListener listeners (incl. web UI).

In case of OutOfMemoryError, relationFuture reports another OutOfMemoryError with the following message:

Note
relationFuture is executed on a separate thread from a custom scala.concurrent.ExecutionContext (built from a cached java.util.concurrent.ThreadPoolExecutor with the prefix broadcast-exchange and up to 128 threads).
Note
relationFuture is used when BroadcastExchangeExec is requested to prepare for execution (that triggers asynchronous execution of the child operator and broadcasting the result) and execute broadcast (that waits until the broadcasting has finished).

Broadcasting Relation (Rows) Asynchronously — doPrepare Method

Note
doPrepare is part of SparkPlan Contract to prepare a physical operator for execution.

doPrepare simply “materializes” the internal lazily-once-initialized asynchronous broadcast.

Creating BroadcastExchangeExec Instance

BroadcastExchangeExec takes the following when created:

SQLMetric — SQL Execution Metric of Physical Operator

admin阅读(1522)

SQLMetric — SQL Execution Metric of Physical Operator

SQLMetric is a SQL metric for monitoring execution of a physical operator.

SQLMetric is an accumulator (and that is the mechanism to propagate SQL metric updates on the executors to the driver, e.g. web UI).

Note
Use Details for Query page in SQL tab in web UI to see the SQL execution metrics of a structured query.
Note

SQL metrics are collected using SparkListener. If there are no tasks, Spark SQL cannot collect any metrics. Updates to metrics on the driver-side require explicit call of SQLMetrics.postDriverMetricUpdates.

This is why executing some physical operators (e.g. LocalTableScanExec) may not have SQL metrics in web UI’s Details for Query in SQL tab.

Compare the following SQL queries and their execution pages.


SQLMetric takes a metric type and an initial value when created.

Table 1. Metric Types and Corresponding Create Methods
Metric Type Create Method Failed Values Counted? Description

size

createSizeMetric

no

Used when…​

sum

createMetric

no

Used when…​

timing

createTimingMetric

no

Used when…​

reset Method

reset…​FIXME

Note
reset is used when…​FIXME

Posting Driver-Side Metric Updates — SQLMetrics.postDriverMetricUpdates Method

postDriverMetricUpdates posts a SparkListenerDriverAccumUpdates event to LiveListenerBus when executionId is specified.

Note
postDriverMetricUpdates method belongs to SQLMetrics object.
Note

postDriverMetricUpdates is used when:

CodeGeneratorWithInterpretedFallback

admin阅读(1253)

CodeGeneratorWithInterpretedFallback

CodeGeneratorWithInterpretedFallback is the base of codegen object generators that can create objects for codegen and interpreted evaluation paths.

Table 1. CodeGeneratorWithInterpretedFallback Contract
Method Description

createCodeGeneratedObject

Used when…​FIXME

createInterpretedObject

Used when…​FIXME

Note
UnsafeProjection is the one and only known implementation of the CodeGeneratorWithInterpretedFallback Contract in Apache Spark.

Note

CodeGeneratorWithInterpretedFallback is a Scala type constructor (aka generic type) that accepts two types referred as IN and OUT.

createObject Method

createObject…​FIXME

Note
createObject is used exclusively when UnsafeProjection is requested to create an UnsafeProjection for Catalyst expressions.

InterpretedProjection

admin阅读(1382)

InterpretedProjection

InterpretedProjection is a Projection that…​FIXME

InterpretedProjection takes expressions when created.

InterpretedProjection is created when:

  • UserDefinedGenerator is requested to initializeConverters

  • ConvertToLocalRelation logical optimization is executed (to transform Project logical operators)

  • HiveGenericUDTF is evaluated

  • ScriptTransformationExec physical operator is executed

Initializing Nondeterministic Expressions — initialize Method

Note
initialize is part of Projection Contract to…​FIXME.

initialize requests Nondeterministic expressions (in expressions) to initialize with the partitionIndex.

GenerateMutableProjection

admin阅读(1415)

GenerateMutableProjection

GenerateMutableProjection is…​FIXME

Creating MutableProjection — create Internal Method

create…​FIXME

Note
create is used when…​FIXME

GenerateUnsafeProjection

admin阅读(1592)

GenerateUnsafeProjection

GenerateUnsafeProjection is a CodeGenerator that generates the bytecode for a UnsafeProjection for given expressions (i.e. CodeGenerator[Seq[Expression], UnsafeProjection]).

Tip

Enable DEBUG logging level for org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

Generating UnsafeProjection — generate Method

generate canonicalize the input expressions followed by generating a JVM bytecode for a UnsafeProjection for the expressions.

Note

generate is used when:

canonicalize Method

canonicalize removes unnecessary Alias expressions.

Internally, canonicalize uses ExpressionCanonicalizer rule executor (that in turn uses just one CleanExpressions expression rule).

Generating JVM Bytecode For UnsafeProjection For Given Expressions (With Optional Subexpression Elimination) — create Method

  1. Calls the former create with subexpressionEliminationEnabled flag off

create first creates a CodegenContext and an Java source code for the input expressions.

create creates a code body with public java.lang.Object generate(Object[] references) method that creates a SpecificUnsafeProjection.

create creates a CodeAndComment with the code body and comment placeholders.

You should see the following DEBUG message in the logs:

Tip

Enable DEBUG logging level for org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator logger to see the message above.

create requests CodegenContext for references and requests the compiled class to create a SpecificUnsafeProjection for the input references that in the end is the final UnsafeProjection.

Note
(Single-argument) create is part of CodeGenerator Contract.

Creating ExprCode for Expressions (With Optional Subexpression Elimination) — createCode Method

createCode requests the input CodegenContext to generate a Java source code for code-generated evaluation of every expression in the input expressions.

createCode…​FIXME

Note

createCode is used when:

UnsafeProjection — Generic Function to Project InternalRows to UnsafeRows

admin阅读(1488)

UnsafeProjection — Generic Function to Encode InternalRows to UnsafeRows

UnsafeProjection is a Projection function that encodes InternalRows as UnsafeRows.

Note

Spark SQL uses UnsafeProjection factory object to create concrete adhoc UnsafeProjection instances.

The base UnsafeProjection has no concrete named implementations and create factory methods delegate all calls to GenerateUnsafeProjection.generate in the end.

Creating UnsafeProjection — create Factory Method

  1. create takes the DataTypes from schema and calls the 2nd create

  2. create creates a BoundReference per field in fields and calls the 5th create

  3. create calls the 5th create

  4. create calls the 5th create

  5. The main create that does the heavy work

create transforms all CreateNamedStruct expressions to CreateNamedStructUnsafe in every BoundReference in the input exprs.

In the end, create requests GenerateUnsafeProjection to generate a UnsafeProjection.

Note
A variant of create takes subexpressionEliminationEnabled flag (that usually is subexpressionEliminationEnabled flag of SparkPlan).

Projection Contract — Functions to Produce InternalRow for InternalRow

admin阅读(1333)

Projection Contract — Functions to Produce InternalRow for InternalRow

Projection is a contract of Scala functions that produce an internal binary row for a given internal row.

Projection can optionally be initialized with the current partition index (which by default does nothing).

Note
initialize is overriden by InterpretedProjection and InterpretedMutableProjection projections that are used in interpreted expression evaluation.
Table 1. Projections
Projection Description

UnsafeProjection

InterpretedProjection

IdentityProjection

MutableProjection

InterpretedMutableProjection

Appears not to be used anymore

关注公众号:spark技术分享

联系我们联系我们