关注 spark技术分享,
撸spark源码 玩spark最佳实践

Window Aggregation Functions

Standard Functions for Window Aggregation (Window Functions)

Window aggregate functions (aka window functions or windowed aggregates) are functions that perform a calculation over a group of records called window that are in some relation to the current record (i.e. can be in the same partition or frame as the current row).

In other words, when executed, a window function computes a value for each and every row in a window (per window specification).

Note
Window functions are also called over functions due to how they are applied using over operator.

Spark SQL supports three kinds of window functions:

  • ranking functions
  • analytic functions
  • aggregate functions
Table 1. Window Aggregate Functions in Spark SQL
Function Purpose

Ranking functions

rank

dense_rank

percent_rank

ntile

row_number

Analytic functions

cume_dist

lag

lead

For aggregate functions, you can use the existing aggregate functions as window functions, e.g. sum, avg, min, max and count.

You describe a window using the convenient factory methods in Window object that create a window specification that you can further refine with partitioning, ordering, and frame boundaries.

After you describe a window you can apply window aggregate functions like ranking functions (e.g. RANK), analytic functions (e.g. LAG), and the regular aggregate functions, e.g. sum, avg, max.

Note
Window functions are supported in structured queries using SQL and Column-based expressions.

Although similar to aggregate functions, a window function does not group rows into a single output row and retains their separate identities. A window function can access rows that are linked to the current row.

Note
The main difference between window aggregate functions and aggregate functions with grouping operators is that the former calculate values for every row in a window while the latter gives you at most the number of input rows, one value per group.
Tip
See Examples section in this document.

You can mark a function window by OVER clause after a function in SQL, e.g. avg(revenue) OVER (…​) or over method on a function in the Dataset API, e.g. rank().over(…​).

Note
Window functions belong to Window functions group in Spark’s Scala API.
Note
Window-based framework is available as an experimental feature since Spark 1.4.0.

Window object

Window object provides functions to define windows (as WindowSpec instances).

Window object lives in org.apache.spark.sql.expressions package. Import it to use Window functions.

There are two families of the functions available in Window object that create WindowSpec instance for one or many Column instances:

Partitioning Records — partitionBy Methods

partitionBy creates an instance of WindowSpec with partition expression(s) defined for one or more columns.

Ordering in Windows — orderBy Methods

orderBy allows you to control the order of records in a window.

rangeBetween Method

rangeBetween creates a WindowSpec with the frame boundaries from start (inclusive) to end (inclusive).

Note
It is recommended to use Window.unboundedPreceding, Window.unboundedFollowing and Window.currentRow to describe the frame boundaries when a frame is unbounded preceding, unbounded following and at current row, respectively.

Internally, rangeBetween creates a WindowSpec with SpecifiedWindowFrame and RangeFrame type.

Frame

At its core, a window function calculates a return value for every input row of a table based on a group of rows, called the frame. Every input row can have a unique frame associated with it.

When you define a frame you have to specify three components of a frame specification – the start and end boundaries, and the type.

Types of boundaries (two positions and three offsets):

  • UNBOUNDED PRECEDING – the first row of the partition
  • UNBOUNDED FOLLOWING – the last row of the partition
  • CURRENT ROW
  • <value> PRECEDING
  • <value> FOLLOWING

Offsets specify the offset from the current input row.

Types of frames:

  • ROW – based on physical offsets from the position of the current input row
  • RANGE – based on logical offsets from the position of the current input row

In the current implementation of WindowSpec you can use two methods to define a frame:

  • rowsBetween
  • rangeBetween

See WindowSpec for their coverage.

Window Operators in SQL Queries

The grammar of windows operators in SQL accepts the following:

  1. CLUSTER BY or PARTITION BY or DISTRIBUTE BY for partitions,
  2. ORDER BY or SORT BY for sorting order,
  3. RANGE, ROWS, RANGE BETWEEN, and ROWS BETWEEN for window frame types,
  4. UNBOUNDED PRECEDING, UNBOUNDED FOLLOWING, CURRENT ROW for frame bounds.
Tip
Consult withWindows helper in AstBuilder.

Examples

Top N per Group

Top N per Group is useful when you need to compute the first and second best-sellers in category.

Note
This example is borrowed from an excellent article Introducing Window Functions in Spark SQL.
Table 2. Table PRODUCT_REVENUE
product category revenue

Thin

cell phone

6000

Normal

tablet

1500

Mini

tablet

5500

Ultra thin

cell phone

5000

Very thin

cell phone

6000

Big

tablet

2500

Bendable

cell phone

3000

Foldable

cell phone

3000

Pro

tablet

4500

Pro2

tablet

6500

Question: What are the best-selling and the second best-selling products in every category?

The question boils down to ranking products in a category based on their revenue, and to pick the best selling and the second best-selling products based the ranking.

Revenue Difference per Category

Note
This example is the 2nd example from an excellent article Introducing Window Functions in Spark SQL.

Difference on Column

Compute a difference between values in rows in a column.

The key here is to remember that DataFrames are RDDs under the covers and hence aggregation like grouping by a key in DataFrames is RDD’s groupBy (or worse, reduceByKey or aggregateByKey transformations).

Running Total

The running total is the sum of all previous lines including the current one.

Calculate rank of row

See “Explaining” Query Plans of Windows for an elaborate example.

Interval data type for Date and Timestamp types

With the Interval data type, you could use intervals as values specified in <value> PRECEDING and <value> FOLLOWING for RANGE frame. It is specifically suited for time-series analysis with window functions.

Accessing values of earlier rows

FIXME What’s the value of rows before current one?

Moving Average

Cumulative Aggregates

Eg. cumulative sum

User-defined aggregate functions

With the window function support, you could use user-defined aggregate functions as window functions.

“Explaining” Query Plans of Windows

lag Window Function

lag returns the value in e / columnName column that is offset records before the current record. lag returns null value if the number of records in a window partition is less than offset or defaultValue.

Caution
FIXME It looks like lag with a default value has a bug — the default value’s not used at all.

lead Window Function

lead returns the value that is offset records after the current records, and defaultValue if there is less than offset records after the current record. lag returns null value if the number of records in a window partition is less than offset or defaultValue.

Caution
FIXME It looks like lead with a default value has a bug — the default value’s not used at all.

Cumulative Distribution of Records Across Window Partitions — cume_dist Window Function

cume_dist computes the cumulative distribution of the records in window partitions. This is equivalent to SQL’s CUME_DIST function.

Sequential numbering per window partition — row_number Window Function

row_number returns a sequential number starting at 1 within a window partition.

ntile Window Function

ntile computes the ntile group id (from 1 to n inclusive) in an ordered window partition.

Caution
FIXME How is ntile different from rank? What about performance?

Ranking Records per Window Partition — rank Window Function

rank functions assign the sequential rank of each distinct value per window partition. They are equivalent to RANK, DENSE_RANK and PERCENT_RANK functions in the good ol’ SQL.

rank function assigns the same rank for duplicate rows with a gap in the sequence (similarly to Olympic medal places). dense_rank is like rank for duplicate rows but compacts the ranks and removes the gaps.

currentRow Window Function

currentRow…​FIXME

unboundedFollowing Window Function

unboundedFollowing…​FIXME

unboundedPreceding Window Function

unboundedPreceding…​FIXME

赞(0) 打赏
未经允许不得转载:spark技术分享 » Window Aggregation Functions
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏