关注 spark技术分享,
撸spark源码 玩spark最佳实践

groupBy Streaming Aggregation with Append Output Mode

Demo: groupBy Streaming Aggregation with Append Output Mode

The following example code shows a groupBy streaming aggregation with Append output mode.

Append output mode requires that a streaming aggregation defines a watermark (using withWatermark operator) on at least one of the grouping expressions (directly or using window function).

Note
withWatermark operator has to be used before the aggregation operator (for the watermark to be used).

In Append output mode the current watermark level is used to:

  1. Output saved state rows that became expired (as Expired state in the below events table)

  2. Drop late events, i.e. don’t save them to a state store or include in aggregation (as Late events in the below events table)

Note
Sorting is only supported on streaming aggregated Datasets with Complete output mode.
Table 1. Streaming Batches, Events, Watermark and State Rows
Batch / Events Current Watermark Level [ms] Expired State, Late Events and Saved State Rows
event_time id batch

1

1

1

15

2

1

0

Saved State Rows

event_time id batch

1

1

1

15

2

1

event_time id batch

1

1

2

15

2

2

35

3

2

5000

(Maximum event time 15 minus the delayThreshold as defined using withWatermark operator, i.e. 10)

Expired State

event_time id batch

1

1

1


Late Events

event_time id batch

1

1

2


Saved State Rows

event_time id batch

15

2

1

15

2

2

35

3

2

event_time id batch

15

1

3

15

2

3

20

3

3

26

4

3

25000

(Maximum event time from the previous batch is 35 and 10 seconds of delayThreshold)

Expired State

event_time id batch

15

2

1

15

2

2


Late Events

event_time id batch

15

1

3

15

2

3

20

3

3


Saved State Rows

event_time id batch

35

3

2

26

4

3

event_time id batch

36

1

4

25000

(Maximum event time from the previous batch is 26)

Saved State Rows

event_time id batch

35

3

2

26

4

3

36

1

4

event_time id batch

50

1

5

26000

(Maximum event time from the previous batch is 36)

Expired State

event_time id batch

26

4

3


Saved State Rows

event_time id batch

35

3

2

36

1

4

50

1

5

Note
Event time watermark may advance based on the maximum event time from the previous events (from the previous batch exactly as the level advances every trigger so the earlier levels are already counted in).
Note
Event time watermark can only change when the maximum event time is bigger than the current watermark minus the delayThreshold (as defined using withWatermark operator).
Tip

Use the following to publish events to Kafka.

赞(0) 打赏
未经允许不得转载:spark技术分享 » groupBy Streaming Aggregation with Append Output Mode
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏