WindowFrameCoercion Type Coercion Logical Rule
WindowFrameCoercion is a type coercion logical rule that cast the data types of the boundaries of a range window frame to the data type of the order specification in a WindowSpecDefinition in a logical plan.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
import java.time.LocalDate import java.sql.Timestamp val sales = Seq( (Timestamp.valueOf(LocalDate.of(2018, 9, 1).atStartOfDay), 5), (Timestamp.valueOf(LocalDate.of(2018, 9, 2).atStartOfDay), 10), // Mind the 2-day gap (Timestamp.valueOf(LocalDate.of(2018, 9, 5).atStartOfDay), 5) ).toDF("time", "volume") scala> sales.show +-------------------+------+ | time|volume| +-------------------+------+ |2018-09-01 00:00:00| 5| |2018-09-02 00:00:00| 10| |2018-09-05 00:00:00| 5| +-------------------+------+ scala> sales.printSchema root |-- time: timestamp (nullable = true) |-- volume: integer (nullable = false) // FIXME Use Catalyst DSL // rangeBetween with column expressions // data type of orderBy expression is date // data types of range frame boundaries is interval // WindowSpecDefinition(_, Seq(order), SpecifiedWindowFrame(RangeFrame, lower, upper)) import org.apache.spark.unsafe.types.CalendarInterval val interval = lit(CalendarInterval.fromString("interval 1 days")) import org.apache.spark.sql.expressions.Window val windowSpec = Window.orderBy($"time").rangeBetween(currentRow(), interval) val q = sales.select( $"time", (sum($"volume") over windowSpec) as "sum", (count($"volume") over windowSpec) as "count") val plan = q.queryExecution.logical scala> println(plan.numberedTreeString) 00 'Project [unresolvedalias('time, None), sum('volume) windowspecdefinition('time ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS sum#156, count('volume) windowspecdefinition('time ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS count#158] 01 +- AnalysisBarrier 02 +- Project [_1#129 AS time#132, _2#130 AS volume#133] 03 +- LocalRelation [_1#129, _2#130] import spark.sessionState.analyzer.ResolveReferences val planWithRefsResolved = ResolveReferences(plan) import spark.sessionState.analyzer.ResolveAliases val planWithAliasesResolved = ResolveReferences(planWithRefsResolved) // FIXME Looks like nothing changes in the query plan with regard to WindowFrameCoercion import org.apache.spark.sql.catalyst.analysis.TypeCoercion.WindowFrameCoercion val afterWindowFrameCoercion = WindowFrameCoercion(planWithRefsResolved) scala> println(afterWindowFrameCoercion.numberedTreeString) 00 'Project [unresolvedalias(time#132, None), sum(volume#133) windowspecdefinition(time#132 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS sum#156L, count(volume#133) windowspecdefinition(time#132 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS count#158L] 01 +- AnalysisBarrier 02 +- Project [_1#129 AS time#132, _2#130 AS volume#133] 03 +- LocalRelation [_1#129, _2#130] |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
import java.time.LocalDate import java.sql.Date val sales = Seq( (Date.valueOf(LocalDate.of(2018, 9, 1)), 5), (Date.valueOf(LocalDate.of(2018, 9, 2)), 10), // Mind the 2-day gap (Date.valueOf(LocalDate.of(2018, 9, 5)), 5) ).toDF("time", "volume") scala> sales.show +----------+------+ | time|volume| +----------+------+ |2018-09-01| 5| |2018-09-02| 10| |2018-09-05| 5| +----------+------+ scala> sales.printSchema root |-- time: date (nullable = true) |-- volume: integer (nullable = false) // FIXME Use Catalyst DSL // rangeBetween with column expressions // data type of orderBy expression is date // WindowSpecDefinition(_, Seq(order), SpecifiedWindowFrame(RangeFrame, lower, upper)) import org.apache.spark.sql.expressions.Window val windowSpec = Window.orderBy($"time").rangeBetween(currentRow(), lit(1)) val q = sales.select( $"time", (sum($"volume") over windowSpec) as "sum") val plan = q.queryExecution.logical scala> println(plan.numberedTreeString) 00 'Project [unresolvedalias('time, None), sum('volume) windowspecdefinition('time ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), 1)) AS sum#238] 01 +- AnalysisBarrier 02 +- Project [_1#222 AS time#225, _2#223 AS volume#226] 03 +- LocalRelation [_1#222, _2#223] import spark.sessionState.analyzer.ResolveReferences val planWithRefsResolved = ResolveReferences(plan) import spark.sessionState.analyzer.ResolveAliases val planWithAliasesResolved = ResolveReferences(planWithRefsResolved) // FIXME Looks like nothing changes in the query plan with regard to WindowFrameCoercion import org.apache.spark.sql.catalyst.analysis.TypeCoercion.WindowFrameCoercion val afterWindowFrameCoercion = WindowFrameCoercion(planWithAliasesResolved) scala> println(afterWindowFrameCoercion.numberedTreeString) 00 'Project [unresolvedalias(time#132, None), sum(volume#133) windowspecdefinition(time#132 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS sum#156L, count(volume#133) windowspecdefinition(time#132 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, currentrow$(), interval 1 days)) AS count#158L] 01 +- AnalysisBarrier 02 +- Project [_1#129 AS time#132, _2#130 AS volume#133] 03 +- LocalRelation [_1#129, _2#130] |
Coercing Types in Logical Plan — coerceTypes Method
|
1 2 3 4 5 |
coerceTypes(plan: LogicalPlan): LogicalPlan |
|
Note
|
coerceTypes is part of the TypeCoercionRule Contract to coerce types in a logical plan.
|
coerceTypes traverses all Catalyst expressions (in the input LogicalPlan) and replaces the frameSpecification of every WindowSpecDefinition with a RangeFrame window frame and the single order specification expression resolved with the lower and upper window frame boundary expressions cast to the data type of the order specification expression.
createBoundaryCast Internal Method
|
1 2 3 4 5 |
createBoundaryCast(boundary: Expression, dt: DataType): Expression |
createBoundaryCast returns a Catalyst expression per the input boundary Expression and the dt DataType (in the order of execution):
-
The input
boundaryexpression if it is aSpecialFrameBoundary -
The input
boundaryexpression if thedtdata type is DateType or TimestampType -
Castunary operator with the inputboundaryexpression and thedtdata type if the result type of theboundaryexpression is not thedtdata type, but the result type can be cast to thedtdata type -
The input
boundaryexpression
|
Note
|
createBoundaryCast is used exclusively when WindowFrameCoercion type coercion logical rule is requested to coerceTypes.
|
spark技术分享