关注 spark技术分享,
撸spark源码 玩spark最佳实践

UserDefinedFunction

admin阅读(2199)

UserDefinedFunction

UserDefinedFunction represents a user-defined function.

UserDefinedFunction is created when:

UserDefinedFunction can also have a name.

UserDefinedFunction is nullable by default, but can be changed as non-nullable.

Executing UserDefinedFunction (Creating Column with ScalaUDF Expression) — apply Method

apply creates a Column with ScalaUDF expression.

Note
apply is used when…​FIXME

Marking UserDefinedFunction as NonNullable — asNonNullable Method

asNonNullable…​FIXME

Note
asNonNullable is used when…​FIXME

Naming UserDefinedFunction — withName Method

withName…​FIXME

Note
withName is used when…​FIXME

Creating UserDefinedFunction Instance

UserDefinedFunction takes the following when created:

UserDefinedFunction initializes the internal registries and counters.

User-Defined Functions (UDFs)

admin阅读(1366)

UDFs — User-Defined Functions

User-Defined Functions (aka UDF) is a feature of Spark SQL to define new Column-based functions that extend the vocabulary of Spark SQL’s DSL for transforming Datasets.

Important

Use the higher-level standard Column-based functions (with Dataset operators) whenever possible before reverting to developing user-defined functions since UDFs are a blackbox for Spark SQL and it cannot (and does not even try to) optimize them.

As Reynold Xin from the Apache Spark project has once said on Spark’s dev mailing list:

There are simple cases in which we can analyze the UDFs byte code and infer what it is doing, but it is pretty difficult to do in general.

You define a new UDF by defining a Scala function as an input parameter of udf function. It accepts Scala functions of up to 10 input parameters.

You can register UDFs to use in SQL-based query expressions via UDFRegistration (that is available through SparkSession.udf attribute).

You can query for available standard and user-defined functions using the Catalog interface (that is available through SparkSession.catalog attribute).

Note
UDFs play a vital role in Spark MLlib to define new Transformers that are function objects that transform DataFrames into DataFrames by introducing new columns.

udf Functions (in functions object)

org.apache.spark.sql.functions object comes with udf function to let you define a UDF for a Scala function f.

Tip
Define custom UDFs based on “standalone” Scala functions (e.g. toUpperUDF) so you can test the Scala functions using Scala way (without Spark SQL’s “noise”) and once they are defined reuse the UDFs in UnaryTransformers.

Window Aggregation Functions

admin阅读(1100)

Standard Functions for Window Aggregation (Window Functions)

Window aggregate functions (aka window functions or windowed aggregates) are functions that perform a calculation over a group of records called window that are in some relation to the current record (i.e. can be in the same partition or frame as the current row).

In other words, when executed, a window function computes a value for each and every row in a window (per window specification).

Note
Window functions are also called over functions due to how they are applied using over operator.

Spark SQL supports three kinds of window functions:

  • ranking functions
  • analytic functions
  • aggregate functions
Table 1. Window Aggregate Functions in Spark SQL
Function Purpose

Ranking functions

rank

dense_rank

percent_rank

ntile

row_number

Analytic functions

cume_dist

lag

lead

For aggregate functions, you can use the existing aggregate functions as window functions, e.g. sum, avg, min, max and count.

You describe a window using the convenient factory methods in Window object that create a window specification that you can further refine with partitioning, ordering, and frame boundaries.

After you describe a window you can apply window aggregate functions like ranking functions (e.g. RANK), analytic functions (e.g. LAG), and the regular aggregate functions, e.g. sum, avg, max.

Note
Window functions are supported in structured queries using SQL and Column-based expressions.

Although similar to aggregate functions, a window function does not group rows into a single output row and retains their separate identities. A window function can access rows that are linked to the current row.

Note
The main difference between window aggregate functions and aggregate functions with grouping operators is that the former calculate values for every row in a window while the latter gives you at most the number of input rows, one value per group.
Tip
See Examples section in this document.

You can mark a function window by OVER clause after a function in SQL, e.g. avg(revenue) OVER (…​) or over method on a function in the Dataset API, e.g. rank().over(…​).

Note
Window functions belong to Window functions group in Spark’s Scala API.
Note
Window-based framework is available as an experimental feature since Spark 1.4.0.

Window object

Window object provides functions to define windows (as WindowSpec instances).

Window object lives in org.apache.spark.sql.expressions package. Import it to use Window functions.

There are two families of the functions available in Window object that create WindowSpec instance for one or many Column instances:

Partitioning Records — partitionBy Methods

partitionBy creates an instance of WindowSpec with partition expression(s) defined for one or more columns.

Ordering in Windows — orderBy Methods

orderBy allows you to control the order of records in a window.

rangeBetween Method

rangeBetween creates a WindowSpec with the frame boundaries from start (inclusive) to end (inclusive).

Note
It is recommended to use Window.unboundedPreceding, Window.unboundedFollowing and Window.currentRow to describe the frame boundaries when a frame is unbounded preceding, unbounded following and at current row, respectively.

Internally, rangeBetween creates a WindowSpec with SpecifiedWindowFrame and RangeFrame type.

Frame

At its core, a window function calculates a return value for every input row of a table based on a group of rows, called the frame. Every input row can have a unique frame associated with it.

When you define a frame you have to specify three components of a frame specification – the start and end boundaries, and the type.

Types of boundaries (two positions and three offsets):

  • UNBOUNDED PRECEDING – the first row of the partition
  • UNBOUNDED FOLLOWING – the last row of the partition
  • CURRENT ROW
  • <value> PRECEDING
  • <value> FOLLOWING

Offsets specify the offset from the current input row.

Types of frames:

  • ROW – based on physical offsets from the position of the current input row
  • RANGE – based on logical offsets from the position of the current input row

In the current implementation of WindowSpec you can use two methods to define a frame:

  • rowsBetween
  • rangeBetween

See WindowSpec for their coverage.

Window Operators in SQL Queries

The grammar of windows operators in SQL accepts the following:

  1. CLUSTER BY or PARTITION BY or DISTRIBUTE BY for partitions,
  2. ORDER BY or SORT BY for sorting order,
  3. RANGE, ROWS, RANGE BETWEEN, and ROWS BETWEEN for window frame types,
  4. UNBOUNDED PRECEDING, UNBOUNDED FOLLOWING, CURRENT ROW for frame bounds.
Tip
Consult withWindows helper in AstBuilder.

Examples

Top N per Group

Top N per Group is useful when you need to compute the first and second best-sellers in category.

Note
This example is borrowed from an excellent article Introducing Window Functions in Spark SQL.
Table 2. Table PRODUCT_REVENUE
product category revenue

Thin

cell phone

6000

Normal

tablet

1500

Mini

tablet

5500

Ultra thin

cell phone

5000

Very thin

cell phone

6000

Big

tablet

2500

Bendable

cell phone

3000

Foldable

cell phone

3000

Pro

tablet

4500

Pro2

tablet

6500

Question: What are the best-selling and the second best-selling products in every category?

The question boils down to ranking products in a category based on their revenue, and to pick the best selling and the second best-selling products based the ranking.

Revenue Difference per Category

Note
This example is the 2nd example from an excellent article Introducing Window Functions in Spark SQL.

Difference on Column

Compute a difference between values in rows in a column.

The key here is to remember that DataFrames are RDDs under the covers and hence aggregation like grouping by a key in DataFrames is RDD’s groupBy (or worse, reduceByKey or aggregateByKey transformations).

Running Total

The running total is the sum of all previous lines including the current one.

Calculate rank of row

See “Explaining” Query Plans of Windows for an elaborate example.

Interval data type for Date and Timestamp types

With the Interval data type, you could use intervals as values specified in <value> PRECEDING and <value> FOLLOWING for RANGE frame. It is specifically suited for time-series analysis with window functions.

Accessing values of earlier rows

FIXME What’s the value of rows before current one?

Moving Average

Cumulative Aggregates

Eg. cumulative sum

User-defined aggregate functions

With the window function support, you could use user-defined aggregate functions as window functions.

“Explaining” Query Plans of Windows

lag Window Function

lag returns the value in e / columnName column that is offset records before the current record. lag returns null value if the number of records in a window partition is less than offset or defaultValue.

Caution
FIXME It looks like lag with a default value has a bug — the default value’s not used at all.

lead Window Function

lead returns the value that is offset records after the current records, and defaultValue if there is less than offset records after the current record. lag returns null value if the number of records in a window partition is less than offset or defaultValue.

Caution
FIXME It looks like lead with a default value has a bug — the default value’s not used at all.

Cumulative Distribution of Records Across Window Partitions — cume_dist Window Function

cume_dist computes the cumulative distribution of the records in window partitions. This is equivalent to SQL’s CUME_DIST function.

Sequential numbering per window partition — row_number Window Function

row_number returns a sequential number starting at 1 within a window partition.

ntile Window Function

ntile computes the ntile group id (from 1 to n inclusive) in an ordered window partition.

Caution
FIXME How is ntile different from rank? What about performance?

Ranking Records per Window Partition — rank Window Function

rank functions assign the sequential rank of each distinct value per window partition. They are equivalent to RANK, DENSE_RANK and PERCENT_RANK functions in the good ol’ SQL.

rank function assigns the same rank for duplicate rows with a gap in the sequence (similarly to Olympic medal places). dense_rank is like rank for duplicate rows but compacts the ranks and removes the gaps.

currentRow Window Function

currentRow…​FIXME

unboundedFollowing Window Function

unboundedFollowing…​FIXME

unboundedPreceding Window Function

unboundedPreceding…​FIXME

Regular Functions (Non-Aggregate Functions)

admin阅读(1123)

Regular Functions (Non-Aggregate Functions)

Table 1. (Subset of) Regular Functions
Name Description

array

broadcast

coalesce

Gives the first non-null value among the given columns or null.

col and column

Creating Columns

expr

lit

map

monotonically_increasing_id

struct

typedLit

when

broadcast Function

broadcast function marks the input Dataset as small enough to be used in broadcast join.

Note
broadcast standard function is a special case of Dataset.hint operator that allows for attaching any hint to a logical plan.

coalesce Function

coalesce gives the first non-null value among the given columns or null.

coalesce requires at least one column and all columns have to be of the same or compatible types.

Internally, coalesce creates a Column with a Coalesce expression (with the children being the expressions of the input Column).

Example: coalesce Function

Creating Columns — col and column Functions

col and column methods create a Column that you can later use to reference a column in a dataset.

expr Function

expr function parses the input expr SQL statement to a Column it represents.

Internally, expr uses the active session’s sqlParser or creates a new SparkSqlParser to call parseExpression method.

lit Function

lit function…​FIXME

struct Functions

struct family of functions allows you to create a new struct column based on a collection of Column or their names.

Note
The difference between struct and another similar array function is that the types of the columns can be different (in struct).

typedLit Function

typedLit…​FIXME

array Function

array…​FIXME

map Function

map…​FIXME

when Function

when…​FIXME

monotonically_increasing_id Function

monotonically_increasing_id returns monotonically increasing 64-bit integers. The generated IDs are guaranteed to be monotonically increasing and unique, but not consecutive (unless all rows are in the same single partition which you rarely want due to the amount of the data).

The current implementation uses the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition. That assumes that the data set has less than 1 billion partitions, and each partition has less than 8 billion records.

Internally, monotonically_increasing_id creates a Column with a MonotonicallyIncreasingID non-deterministic leaf expression.

Date and Time Functions

admin阅读(1427)

Date and Time Functions

Table 1. (Subset of) Standard Functions for Date and Time
Name Description

current_date

Gives current date as a date column

current_timestamp

date_format

to_date

Converts column to date type (with an optional date format)

to_timestamp

Converts column to timestamp type (with an optional timestamp format)

unix_timestamp

Converts current or specified time to Unix timestamp (in seconds)

window

Generates time windows (i.e. tumbling, sliding and delayed windows)

Current Date As Date Column — current_date Function

current_date function gives the current date as a date column.

Internally, current_date creates a Column with CurrentDate Catalyst leaf expression.

date_format Function

Internally, date_format creates a Column with DateFormatClass binary expression. DateFormatClass takes the expression from dateExpr column and format.

current_timestamp Function

Caution
FIXME
Note
current_timestamp is also now function in SQL.

Converting Current or Specified Time to Unix Timestamp — unix_timestamp Function

  1. Gives current timestamp (in seconds)

  2. Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds)

unix_timestamp converts the current or specified time in the specified format to a Unix timestamp (in seconds).

unix_timestamp supports a column of type Date, Timestamp or String.

unix_timestamp returns null if conversion fails.

Note

unix_timestamp is also supported in SQL mode.

Internally, unix_timestamp creates a Column with UnixTimestamp binary expression (possibly with CurrentTimestamp).

Generating Time Windows — window Function

  1. Creates a tumbling time window with slideDuration as windowDuration and 0 second for startTime

  2. Creates a sliding time window with 0 second for startTime

  3. Creates a delayed time window

window generates tumbling, sliding or delayed time windows of windowDuration duration given a timeColumn timestamp specifying column.

Note

Tumbling windows are a series of fixed-sized, non-overlapping and contiguous time intervals.

Note

Tumbling windows group elements of a stream into finite sets where each set corresponds to an interval.

Tumbling windows discretize a stream into non-overlapping windows.

timeColumn should be of TimestampType, i.e. with java.sql.Timestamp values.

Tip
Use java.sql.Timestamp.from or java.sql.Timestamp.valueOf factory methods to create Timestamp instances.

windowDuration and slideDuration are strings specifying the width of the window for duration and sliding identifiers, respectively.

Tip
Use CalendarInterval for valid window identifiers.
Note
window is available as of Spark 2.0.0.

Internally, window creates a Column (with TimeWindow expression) available as window alias.

Example — Traffic Sensor

Note
The example is borrowed from Introducing Stream Windows in Apache Flink.

The example shows how to use window function to model a traffic sensor that counts every 15 seconds the number of vehicles passing a certain location.

Converting Column To DateType — to_date Function

to_date converts the column into DateType (by casting to DateType).

Note
fmt follows the formatting styles.

Internally, to_date creates a Column with ParseToDate expression (and Literal expression for fmt).

Tip
Use ParseToDate expression to use a column for the values of fmt.

Converting Column To TimestampType — to_timestamp Function

to_timestamp converts the column into TimestampType (by casting to TimestampType).

Note
fmt follows the formatting styles.

Internally, to_timestamp creates a Column with ParseToTimestamp expression (and Literal expression for fmt).

Tip
Use ParseToTimestamp expression to use a column for the values of fmt.

Collection Functions

admin阅读(1108)

Standard Functions for Collections (Collection Functions)

Table 1. (Subset of) Standard Functions for Handling Collections
Name Description

array_contains

explode

explode_outer

Creates a new row for each element in the given array or map column.

If the array/map is null or empty then null is produced.

from_json

Extract data from arbitrary JSON-encoded values into a StructType or ArrayType of StructType elements with the specified schema

map_keys

map_values

posexplode

posexplode_outer

reverse

Returns a reversed string or an array with reverse order of elements

Note
Support for reversing arrays is new in 2.4.0.

size

Returns the size of the given array or map. Returns -1 if null.

reverse Collection Function

reverse…​FIXME

size Collection Function

size returns the size of the given array or map. Returns -1 if null.

Internally, size creates a Column with Size unary expression.

posexplode Collection Function

posexplode…​FIXME

posexplode_outer Collection Function

posexplode_outer…​FIXME

explode Collection Function

Caution
FIXME

Note
explode function is an equivalent of flatMap operator for Dataset.

explode_outer Collection Function

explode_outer generates a new row for each element in e array or map column.

Note
Unlike explode, explode_outer generates null when the array or map is null or empty.

Internally, explode_outer creates a Column with GeneratorOuter and Explode Catalyst expressions.

Extracting Data from Arbitrary JSON-Encoded Values — from_json Collection Function

  1. Calls <2> with StructType converted to DataType

  2. (fixme)

  3. Calls <1> with empty options

  4. Relays to the other from_json with empty options

  5. Uses schema as DataType in the JSON format or falls back to StructType in the DDL format

from_json parses a column with a JSON-encoded value into a StructType or ArrayType of StructType elements with the specified schema.

Note

A schema can be one of the following:

  1. DataType as a Scala object or in the JSON format

  2. StructType in the DDL format

Note
options controls how a JSON is parsed and contains the same options as the json format.

Internally, from_json creates a Column with JsonToStructs unary expression.

Note
from_json (creates a JsonToStructs that) uses a JSON parser in FAILFAST parsing mode that simply fails early when a corrupted/malformed record is found (and hence does not support columnNameOfCorruptRecord JSON option).

Note
from_json corresponds to SQL’s from_json.

array_contains Collection Function

array_contains creates a Column for a column argument as an array and the value of same type as the type of the elements of the array.

Internally, array_contains creates a Column with a ArrayContains expression.

array_contains corresponds to SQL’s array_contains.

Tip
Use SQL’s array_contains to use values from columns for the column and value arguments.

map_keys Collection Function

map_keys…​FIXME

map_values Collection Function

map_values…​FIXME

Standard Functions — functions Object

admin阅读(1293)

Standard Functions — functions Object

org.apache.spark.sql.functions object defines built-in standard functions to work with (values produced by) columns.

You can access the standard functions using the following import statement in your Scala application:

Table 1. (Subset of) Standard Functions in Spark SQL
Name Description

Aggregate functions

approx_count_distinct

avg

collect_list

collect_set

corr

count

countDistinct

covar_pop

covar_samp

first

Returns the first value in a group. Returns the first non-null value when ignoreNulls flag on. If all values are null, then returns null.

grouping

Indicates whether a given column is aggregated or not

grouping_id

Computes the level of grouping

kurtosis

last

max

mean

min

skewness

stddev

stddev_pop

stddev_samp

sum

sumDistinct

variance

var_pop

var_samp

Collection functions

array_contains

array_distinct

(New in 2.4.0)

array_except

(New in 2.4.0)

array_intersect

(New in 2.4.0)

array_join

(New in 2.4.0)

array_max

(New in 2.4.0)

array_min

(New in 2.4.0)

array_position

(New in 2.4.0)

array_remove

(New in 2.4.0)

array_repeat

(New in 2.4.0)

array_sort

(New in 2.4.0)

array_union

(New in 2.4.0)

arrays_zip

(New in 2.4.0)

arrays_overlap

(New in 2.4.0)

element_at

(New in 2.4.0)

explode

explode_outer

Creates a new row for each element in the given array or map column. If the array/map is null or empty then null is produced.

flatten

(New in 2.4.0)

from_json

  1. New in 2.4.0

Parses a column with a JSON string into a StructType or ArrayType of StructType elements with the specified schema.

map_concat

(New in 2.4.0)

map_from_entries

(New in 2.4.0)

map_keys

map_values

posexplode

posexplode_outer

reverse

Returns a reversed string or an array with reverse order of elements

Note
Support for reversing arrays is new in 2.4.0.

schema_of_json

(New in 2.4.0)

sequence

(New in 2.4.0)

shuffle

(New in 2.4.0)

size

Returns the size of the given array or map. Returns -1 if null.

slice

(New in 2.4.0)

Date and time functions

current_date

current_timestamp

from_utc_timestamp

  1. New in 2.4.0

months_between

  1. New in 2.4.0

to_date

to_timestamp

to_utc_timestamp

  1. New in 2.4.0

unix_timestamp

Converts current or specified time to Unix timestamp (in seconds)

window

Generates tumbling time windows

Math functions

bin

Converts the value of a long column to binary format

Regular functions (Non-aggregate functions)

array

broadcast

coalesce

Gives the first non-null value among the given columns or null

col and column

Creating Columns

expr

lit

map

monotonically_increasing_id

Returns monotonically increasing 64-bit integers that are guaranteed to be monotonically increasing and unique, but not consecutive.

struct

typedLit

when

String functions

split

upper

UDF functions

udf

Creating UDFs

callUDF

Executing an UDF by name with variable-length list of columns

Window functions

cume_dist

Computes the cumulative distribution of records across window partitions

currentRow

dense_rank

Computes the rank of records per window partition

lag

lead

ntile

Computes the ntile group

percent_rank

Computes the rank of records per window partition

rank

Computes the rank of records per window partition

row_number

Computes the sequential numbering per window partition

unboundedFollowing

unboundedPreceding

Tip
The page gives only a brief ovierview of the many functions available in functions object and so you should read the official documentation of the functions object.

Executing UDF by Name and Variable-Length Column List — callUDF Function

callUDF executes an UDF by udfName and variable-length list of columns.

Defining UDFs — udf Function

The udf family of functions allows you to create user-defined functions (UDFs) based on a user-defined function in Scala. It accepts f function of 0 to 10 arguments and the input and output types are automatically inferred (given the types of the respective input and output types of the function f).

Since Spark 2.0.0, there is another variant of udf function:

udf(f: AnyRef, dataType: DataType) allows you to use a Scala closure for the function argument (as f) and explicitly declaring the output data type (as dataType).

split Function

split function splits str column using pattern. It returns a new Column.

Note
.$|()[{^?*+\ are RegEx’s meta characters and are considered special.

upper Function

upper function converts a string column into one with all letter upper. It returns a new Column.

Note
The following example uses two functions that accept a Column and return another to showcase how to chain them.

Converting Long to Binary Format (in String Representation) — bin Function

  1. Calls the first bin with columnName as a Column

bin converts the long value in a column to its binary format (i.e. as an unsigned integer in base 2) with no extra leading 0s.

Internally, bin creates a Column with Bin unary expression.

Note
Bin unary expression uses java.lang.Long.toBinaryString for the conversion.
Note

Bin expression supports code generation (aka CodeGen).

Window Utility Object — Defining Window Specification

admin阅读(1357)

Window Utility Object — Defining Window Specification

Window utility object is a set of static methods to define a window specification.

Table 1. Window API
Method Description

currentRow

Value representing the current row that is used to define frame boundaries.

orderBy

Creates a WindowSpec with the ordering defined.

partitionBy

Creates a WindowSpec with the partitioning defined.

rangeBetween

Creates a WindowSpec with the frame boundaries defined, from start (inclusive) to end (inclusive). Both start and end are relative to the current row based on the actual value of the ORDER BY expression(s).

rowsBetween

Creates a WindowSpec with the frame boundaries defined, from start (inclusive) to end (inclusive). Both start and end are positions relative to the current row based on the position of the row within the partition.

unboundedFollowing

Value representing the last row in a partition (equivalent to “UNBOUNDED FOLLOWING” in SQL) that is used to define frame boundaries.

unboundedPreceding

Value representing the first row in a partition (equivalent to “UNBOUNDED PRECEDING” in SQL) that is used to define frame boundaries.

Creating “Empty” WindowSpec — spec Internal Method

spec creates an “empty” WindowSpec, i.e. with empty partition and ordering specifications, and a UnspecifiedFrame.

Note

spec is used when:

WindowSpec — Window Specification

admin阅读(1349)

WindowSpec — Window Specification

WindowSpec is a window specification that defines which rows are included in a window (frame), i.e. the set of rows that are associated with the current row by some relation.

WindowSpec takes the following when created:

  • Partition specification (Seq[Expression]) which defines which records are in the same partition. With no partition defined, all records belong to a single partition

  • Ordering Specification (Seq[SortOrder]) which defines how records in a partition are ordered that in turn defines the position of a record in a partition. The ordering could be ascending (ASC in SQL or asc in Scala) or descending (DESC or desc).

  • Frame Specification (WindowFrame) which defines the rows to be included in the frame for the current row, based on their relative position to the current row. For example, “the three rows preceding the current row to the current row” describes a frame including the current input row and three rows appearing before the current row.

You use Window object to create a WindowSpec.

Once the initial version of a WindowSpec is created, you use the methods to further configure the window specification.

Table 1. WindowSpec API
Method Description

orderBy

partitionBy

rangeBetween

rowsBetween

With a window specification fully defined, you use Column.over operator that associates the WindowSpec with an aggregate or window function.

withAggregate Internal Method

withAggregate…​FIXME

Note
withAggregate is used exclusively when Column.over operator is used.

关注公众号:spark技术分享

联系我们联系我们