关注 spark技术分享,
撸spark源码 玩spark最佳实践

Stack

admin阅读(1506)

Stack Generator Expression

Stack is…​FIXME

Generating Java Source Code (ExprCode) For Code-Generated Expression Evaluation — doGenCode Method

Note
doGenCode is part of Expression Contract to generate a Java source code (ExprCode) for code-generated expression evaluation.

doGenCode…​FIXME

SortOrder

admin阅读(1874)

SortOrder Unevaluable Unary Expression

SortOrder is a unary expression that represents the following operators in a logical plan:

SortOrder is used to specify the output data ordering requirements of a physical operator.

SortOrder is an unevaluable expression and cannot be evaluated (i.e. produce a value given an internal row).

Note
An unevaluable expression cannot be evaluated to produce a value (neither in interpreted nor code-generated expression evaluations) and has to be resolved (replaced) to some other expressions or logical operators at analysis or optimization phases or they fail analysis.

SortOrder is never foldable (as an unevaluable expression with no evaluation).

Tip
Use asc, asc_nullsLast, desc or desc_nullsFirst operators from the Catalyst DSL to create a SortOrder expression, e.g. for testing or Spark SQL internals exploration.

Creating SortOrder Instance — apply Factory Method

apply is a convenience method to create a SortOrder with the defaultNullOrdering of the SortDirection.

Note
apply is used exclusively in window function.

Catalyst DSL — asc, asc_nullsLast, desc and desc_nullsFirst Operators

asc, asc_nullsLast, desc and desc_nullsFirst create a SortOrder expression with the Ascending or Descending sort direction, respectively.

Creating SortOrder Instance

SortOrder takes the following when created:

SortDirection Contract

SortDirection is the base of sort directions.

Table 1. SortDirection Contract
Method Description

defaultNullOrdering

Used when…​FIXME

sql

Used when…​FIXME

Ascending and Descending Sort Directions

There are two sorting directions available, i.e. Ascending and Descending.

SizeBasedWindowFunction Contract — Declarative Window Aggregate Functions with Window Size

admin阅读(1290)

SizeBasedWindowFunction Contract — Declarative Window Aggregate Functions with Window Size

SizeBasedWindowFunction is the extension of the AggregateWindowFunction Contract for window functions that require the size of the current window for calculation.

Table 1. SizeBasedWindowFunction Contract
Property Description

n

Size of the current window as a AttributeReference expression with window__partition__size name, IntegerType data type and not nullable

Table 2. SizeBasedWindowFunctions (Direct Implementations)
SizeBasedWindowFunction Description

CumeDist

Window function expression for cume_dist standard function (Dataset API) and cume_dist SQL function

NTile

PercentRank

SimpleTypedAggregateExpression

admin阅读(1439)

SimpleTypedAggregateExpression

SimpleTypedAggregateExpression is…​FIXME

SimpleTypedAggregateExpression is created when…​FIXME

Table 1. SimpleTypedAggregateExpression’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

evaluateExpression

Expression

resultObjToRow

UnsafeProjection

Creating SimpleTypedAggregateExpression Instance

SimpleTypedAggregateExpression takes the following when created:

ScalaUDAF

admin阅读(1452)

ScalaUDAF — Catalyst Expression Adapter for UserDefinedAggregateFunction

ScalaUDAF is a Catalyst expression adapter to manage the lifecycle of UserDefinedAggregateFunction and hook it in Spark SQL’s Catalyst execution path.

ScalaUDAF is created when:

ScalaUDAF is a ImperativeAggregate.

Table 1. ScalaUDAF’s ImperativeAggregate Methods
Method Name Behaviour

initialize

Requests UserDefinedAggregateFunction to initialize

merge

Requests UserDefinedAggregateFunction to merge

update

Requests UserDefinedAggregateFunction to update

When evaluated, ScalaUDAF…​FIXME

ScalaUDAF has no representation in SQL.

Table 2. ScalaUDAF’s Properties
Name Description

aggBufferAttributes

AttributeReferences of aggBufferSchema

aggBufferSchema

bufferSchema of UserDefinedAggregateFunction

dataType

DataType of UserDefinedAggregateFunction

deterministic

deterministic of UserDefinedAggregateFunction

inputAggBufferAttributes

Copy of aggBufferAttributes

inputTypes

Data types from inputSchema of UserDefinedAggregateFunction

nullable

Always enabled (i.e. true)

Table 3. ScalaUDAF’s Internal Registries and Counters
Name Description

inputAggregateBuffer

Used when…​FIXME

inputProjection

Used when…​FIXME

inputToScalaConverters

Used when…​FIXME

mutableAggregateBuffer

Used when…​FIXME

Creating ScalaUDAF Instance

ScalaUDAF takes the following when created:

ScalaUDAF initializes the internal registries and counters.

initialize Method

initialize sets the input buffer internal binary row as underlyingBuffer of MutableAggregationBufferImpl and requests the UserDefinedAggregateFunction to initialize (with the MutableAggregationBufferImpl).

spark sql ScalaUDAF initialize.png
Figure 1. ScalaUDAF initializes UserDefinedAggregateFunction
Note
initialize is part of ImperativeAggregate Contract.

update Method

update sets the input buffer internal binary row as underlyingBuffer of MutableAggregationBufferImpl and requests the UserDefinedAggregateFunction to update.

Note
update uses inputProjection on the input input and converts it using inputToScalaConverters.
spark sql ScalaUDAF update.png
Figure 2. ScalaUDAF updates UserDefinedAggregateFunction
Note
update is part of ImperativeAggregate Contract.

merge Method

merge first sets:

spark sql ScalaUDAF merge.png
Figure 3. ScalaUDAF requests UserDefinedAggregateFunction to merge
Note
merge is part of ImperativeAggregate Contract.

ScalaUDF

admin阅读(1204)

ScalaUDF — Catalyst Expression to Manage Lifecycle of User-Defined Function

ScalaUDF is a Catalyst expression to manage the lifecycle of a user-defined function (and hook it in to Spark SQL’s Catalyst execution path).

ScalaUDF is a ImplicitCastInputTypes and UserDefinedExpression.

ScalaUDF has no representation in SQL.

ScalaUDF is created when:

Note
Spark SQL Analyzer uses HandleNullInputsForUDF logical evaluation rule to…​FIXME

Generating Java Source Code (ExprCode) For Code-Generated Expression Evaluation — doGenCode Method

Note
doGenCode is part of Expression Contract to generate a Java source code (ExprCode) for code-generated expression evaluation.

doGenCode…​FIXME

Evaluating Expression — eval Method

Note
eval is part of Expression Contract for the interpreted (non-code-generated) expression evaluation, i.e. evaluating a Catalyst expression to a JVM object for a given internal binary row.

eval executes the Scala function on the input internal row.

Creating ScalaUDF Instance

ScalaUDF takes the following when created:

  • A Scala function (as Scala’s AnyRef)

  • Output data type

  • Child Catalyst expressions

  • Input data types (if available)

  • Name (if defined)

  • nullable flag (turned on by default)

  • udfDeterministic flag (turned on by default)

ScalaUDF initializes the internal registries and counters.

ScalarSubquery ExecSubqueryExpression

admin阅读(1639)

ScalarSubquery (ExecSubqueryExpression) Expression

ScalarSubquery is an ExecSubqueryExpression that can give exactly one value (i.e. the value of executing SubqueryExec subquery that can result in a single row and a single column or null if no row were computed).

Important
Spark SQL uses the name of ScalarSubquery twice to represent an ExecSubqueryExpression (this page) and a SubqueryExpression. It is confusing and you should not be anymore.

ScalarSubquery is created exclusively when PlanSubqueries physical optimization is executed (and plans a ScalarSubquery expression).

ScalarSubquery expression cannot be evaluated, i.e. produce a value given an internal row.

ScalarSubquery uses…​FIXME…​for the data type.

Table 1. ScalarSubquery’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

result

The value of the single column in a single row after collecting the rows from executing the subquery plan or null if no rows were collected.

updated

Flag that says whether ScalarSubquery was updated with collected result of executing the subquery plan.

Creating ScalarSubquery Instance

ScalarSubquery takes the following when created:

Updating ScalarSubquery With Collected Result — updateResult Method

Note
updateResult is part of ExecSubqueryExpression Contract to fill an Catalyst expression with a collected result from executing a subquery plan.

updateResult requests SubqueryExec physical plan to execute and collect internal rows.

updateResult sets result to the value of the only column of the single row or null if no row were collected.

In the end, updateResult marks the ScalarSubquery instance as updated.

updateResult reports a RuntimeException when there are more than 1 rows in the result.

updateResult reports an AssertionError when the number of fields is not exactly 1.

Evaluating Expression — eval Method

Note
eval is part of Expression Contract for the interpreted (non-code-generated) expression evaluation, i.e. evaluating a Catalyst expression to a JVM object for a given internal binary row.

eval simply returns result value.

eval reports an IllegalArgumentException if the ScalarSubquery expression has not been updated yet.

Generating Java Source Code (ExprCode) For Code-Generated Expression Evaluation — doGenCode Method

Note
doGenCode is part of Expression Contract to generate a Java source code (ExprCode) for code-generated expression evaluation.

doGenCode first makes sure that the updated flag is on (true). If not, doGenCode throws an IllegalArgumentException exception with the following message:

doGenCode then creates a Literal (for the result and the dataType) and simply requests it to generate a Java source code.

ScalarSubquery SubqueryExpression

admin阅读(1655)

ScalarSubquery (SubqueryExpression) Expression

ScalarSubquery is a SubqueryExpression that returns a single row and a single column only.

ScalarSubquery represents a structured query that can be used as a “column”.

Important
Spark SQL uses the name of ScalarSubquery twice to represent a SubqueryExpression (this page) and an ExecSubqueryExpression. You’ve been warned.

ScalarSubquery is created exclusively when AstBuilder is requested to parse a subquery expression.

Creating ScalarSubquery Instance

ScalarSubquery takes the following when created:

RuntimeReplaceable Contract

admin阅读(1403)

RuntimeReplaceable Contract — Replaceable SQL Expressions

RuntimeReplaceable is the marker contract for unary expressions that are replaced by Catalyst Optimizer with their child expression (that can then be evaluated).

Note
Catalyst Optimizer uses ReplaceExpressions logical optimization to replace RuntimeReplaceable expressions.

RuntimeReplaceable contract allows for expression aliases, i.e. expressions that are fairly complex in the inside than on the outside, and is used to provide compatibility with other SQL databases by supporting SQL functions with their more complex Catalyst expressions (that are already supported by Spark SQL).

Note
RuntimeReplaceables are tied up to their SQL functions in FunctionRegistry.

RuntimeReplaceable expressions cannot be evaluated (i.e. produce a value given an internal row) and therefore have to be replaced in the query execution pipeline.

Note
To make sure the explain plan and expression SQL works correctly, a RuntimeReplaceable implementation should override flatArguments and sql methods.
Table 1. RuntimeReplaceables
RuntimeReplaceable Standard Function SQL Function

IfNull

ifnull

Left

left

NullIf

nullif

Nvl

nvl

Nvl2

nvl2

ParseToDate

to_date

to_date

ParseToTimestamp

to_timestamp

to_timestamp

Right

right

关注公众号:spark技术分享

联系我们联系我们