Aggregator — Contract for User-Defined Typed Aggregate Functions (UDAFs)
Aggregator
is the contract for user-defined typed aggregate functions (aka user-defined typed aggregations or UDAFs in short).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
package org.apache.spark.sql.expressions abstract class Aggregator[-IN, BUF, OUT] extends Serializable { // only required methods that have no implementation def bufferEncoder: Encoder[BUF] def finish(reduction: BUF): OUT def merge(b1: BUF, b2: BUF): BUF def outputEncoder: Encoder[OUT] def reduce(b: BUF, a: IN): BUF def zero: BUF } |
After you create a custom Aggregator
, you should use toColumn method to convert it to a TypedColumn
that can be used with Dataset.select and KeyValueGroupedDataset.agg typed operators.
1 2 3 4 5 6 7 8 9 10 11 12 |
// From Spark MLlib's org.apache.spark.ml.recommendation.ALSModel // Step 1. Create Aggregator val topKAggregator: Aggregator[Int, Int, Float] = ??? val recs = ratings .as[(Int, Int, Float)] .groupByKey(_._1) .agg(topKAggregator.toColumn) // <-- use the custom Aggregator .toDF("id", "recommendations") |
Note
|
Use
|
Note
|
In other words, using the contract is as treading on thin ice. |
Aggregator
is used when:
-
SimpleTypedAggregateExpression and ComplexTypedAggregateExpression are created
-
TypedAggregateExpression
is requested for the aggregator
Method | Description |
---|---|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
Aggregator | Description |
---|---|
Used exclusively in Spark MLlib |
|