WindowFrameCoercion Type Coercion Logical Rule
WindowFrameCoercion
is a type coercion logical rule that cast the data types of the boundaries of a range window frame to the data type of the order specification in a WindowSpecDefinition in a logical plan.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
import java.time.LocalDate import java.sql.Timestamp val sales = Seq( (Timestamp.valueOf(LocalDate.of(2018, 9, 1).atStartOfDay), 5), (Timestamp.valueOf(LocalDate.of(2018, 9, 2).atStartOfDay), 10), // Mind the 2-day gap (Timestamp.valueOf(LocalDate.of(2018, 9, 5).atStartOfDay), 5) ).toDF("time", "volume") scala> sales.show +-------------------+------+ | time|volume| +-------------------+------+ |2018-09-01 00:00:00| 5| |2018-09-02 00:00:00| 10| |2018-09-05 00:00:00| 5| +-------------------+------+ scala> sales.printSchema root |-- time: timestamp (nullable = true) |-- volume: integer (nullable = false) // FIXME Use Catalyst DSL // rangeBetween with column expressions // data type of orderBy expression is date // data types of range frame boundaries is interval // WindowSpecDefinition(_, Seq(order), SpecifiedWindowFrame(RangeFrame, lower, upper)) import org.apache.spark.unsafe.types.CalendarInterval val interval = lit(CalendarInterval.fromString("interval 1 days")) import org.apache.spark.sql.expressions.Window val windowSpec = Window.orderBy($"time").rangeBetween(currentRow(), interval) val q = sales.select( $"time", (sum($"volume") over windowSpec) as "sum", (count($"volume") over windowSpec) as "count") val plan = q.queryExecution.logical scala> println(plan.numberedTreeString) 00 'Project [unresolvedalias('time, None), sum('volume) windowspecdefinition('time ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS sum#156, count('volume) windowspecdefinition('time ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS count#158] 01 +- AnalysisBarrier 02 +- Project [_1#129 AS time#132, _2#130 AS volume#133] 03 +- LocalRelation [_1#129, _2#130] import spark.sessionState.analyzer.ResolveReferences val planWithRefsResolved = ResolveReferences(plan) import spark.sessionState.analyzer.ResolveAliases val planWithAliasesResolved = ResolveReferences(planWithRefsResolved) // FIXME Looks like nothing changes in the query plan with regard to WindowFrameCoercion import org.apache.spark.sql.catalyst.analysis.TypeCoercion.WindowFrameCoercion val afterWindowFrameCoercion = WindowFrameCoercion(planWithRefsResolved) scala> println(afterWindowFrameCoercion.numberedTreeString) 00 'Project [unresolvedalias(time#132, None), sum(volume#133) windowspecdefinition(time#132 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS sum#156L, count(volume#133) windowspecdefinition(time#132 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS count#158L] 01 +- AnalysisBarrier 02 +- Project [_1#129 AS time#132, _2#130 AS volume#133] 03 +- LocalRelation [_1#129, _2#130] |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
import java.time.LocalDate import java.sql.Date val sales = Seq( (Date.valueOf(LocalDate.of(2018, 9, 1)), 5), (Date.valueOf(LocalDate.of(2018, 9, 2)), 10), // Mind the 2-day gap (Date.valueOf(LocalDate.of(2018, 9, 5)), 5) ).toDF("time", "volume") scala> sales.show +----------+------+ | time|volume| +----------+------+ |2018-09-01| 5| |2018-09-02| 10| |2018-09-05| 5| +----------+------+ scala> sales.printSchema root |-- time: date (nullable = true) |-- volume: integer (nullable = false) // FIXME Use Catalyst DSL // rangeBetween with column expressions // data type of orderBy expression is date // WindowSpecDefinition(_, Seq(order), SpecifiedWindowFrame(RangeFrame, lower, upper)) import org.apache.spark.sql.expressions.Window val windowSpec = Window.orderBy($"time").rangeBetween(currentRow(), lit(1)) val q = sales.select( $"time", (sum($"volume") over windowSpec) as "sum") val plan = q.queryExecution.logical scala> println(plan.numberedTreeString) 00 'Project [unresolvedalias('time, None), sum('volume) windowspecdefinition('time ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), 1)) AS sum#238] 01 +- AnalysisBarrier 02 +- Project [_1#222 AS time#225, _2#223 AS volume#226] 03 +- LocalRelation [_1#222, _2#223] import spark.sessionState.analyzer.ResolveReferences val planWithRefsResolved = ResolveReferences(plan) import spark.sessionState.analyzer.ResolveAliases val planWithAliasesResolved = ResolveReferences(planWithRefsResolved) // FIXME Looks like nothing changes in the query plan with regard to WindowFrameCoercion import org.apache.spark.sql.catalyst.analysis.TypeCoercion.WindowFrameCoercion val afterWindowFrameCoercion = WindowFrameCoercion(planWithAliasesResolved) scala> println(afterWindowFrameCoercion.numberedTreeString) 00 'Project [unresolvedalias(time#132, None), sum(volume#133) windowspecdefinition(time#132 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS sum#156L, count(volume#133) windowspecdefinition(time#132 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS count#158L] 01 +- AnalysisBarrier 02 +- Project [_1#129 AS time#132, _2#130 AS volume#133] 03 +- LocalRelation [_1#129, _2#130] |
Coercing Types in Logical Plan — coerceTypes
Method
1 2 3 4 5 |
coerceTypes(plan: LogicalPlan): LogicalPlan |
Note
|
coerceTypes is part of the TypeCoercionRule Contract to coerce types in a logical plan.
|
coerceTypes
traverses all Catalyst expressions (in the input LogicalPlan) and replaces the frameSpecification of every WindowSpecDefinition with a RangeFrame
window frame and the single order specification expression resolved with the lower and upper window frame boundary expressions cast to the data type of the order specification expression.
createBoundaryCast
Internal Method
1 2 3 4 5 |
createBoundaryCast(boundary: Expression, dt: DataType): Expression |
createBoundaryCast
returns a Catalyst expression per the input boundary
Expression and the dt
DataType (in the order of execution):
-
The input
boundary
expression if it is aSpecialFrameBoundary
-
The input
boundary
expression if thedt
data type is DateType or TimestampType -
Cast
unary operator with the inputboundary
expression and thedt
data type if the result type of theboundary
expression is not thedt
data type, but the result type can be cast to thedt
data type -
The input
boundary
expression
Note
|
createBoundaryCast is used exclusively when WindowFrameCoercion type coercion logical rule is requested to coerceTypes.
|