Dataset API has a set of operators that are of particular use in Apache Spark’s Structured Streaming that together constitute so-called Streaming Dataset API.
Table 1. Streaming Operators
Operator |
Description |
dropDuplicates
|
|
dropDuplicates(): Dataset[T] dropDuplicates(colNames: Seq[String]): Dataset[T] dropDuplicates(col1: String, cols: String*): Dataset[T] |
Drops duplicate records (given a subset of columns)
|
explain
|
|
explain(): Unit explain(extended: Boolean): Unit |
|
groupBy
|
|
groupBy(cols: Column*): RelationalGroupedDataset groupBy(col1: String, cols: String*): RelationalGroupedDataset |
Aggregates rows by a untyped grouping function
|
groupByKey
|
|
groupByKey(func: T => K): KeyValueGroupedDataset[K, T] |
Aggregates rows by a typed grouping function
|
withWatermark
|
|
withWatermark(eventTime: String, delayThreshold: String): Dataset[T] |
Defines a streaming watermark for late events (on the given eventTime column)
|
writeStream
|
|
writeStream: DataStreamWriter[T] |
Creates a DataStreamWriter for persisting the result of a streaming query to an external data system
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
val rates = spark .readStream .format("rate") .option("rowsPerSecond", 1) .load // stream processing // replace [operator] with the operator of your choice rates.[operator] // output stream import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ val sq = rates .writeStream .format("console") .option("truncate", false) .trigger(Trigger.ProcessingTime(10.seconds)) .outputMode(OutputMode.Complete) .queryName("rate-console") .start // eventually... sq.stop |