关注 spark技术分享,
撸spark源码 玩spark最佳实践

Aggregator — Contract for User-Defined Typed Aggregate Functions (UDAFs)

admin阅读(1388)

Aggregator — Contract for User-Defined Typed Aggregate Functions (UDAFs)

Aggregator is the contract for user-defined typed aggregate functions (aka user-defined typed aggregations or UDAFs in short).

After you create a custom Aggregator, you should use toColumn method to convert it to a TypedColumn that can be used with Dataset.select and KeyValueGroupedDataset.agg typed operators.

Note

Use org.apache.spark.sql.expressions.scalalang.typed object to access the type-safe aggregate functions, i.e. avg, count, sum and sumLong.

Note

Aggregator is an Experimental and Evolving contract that is evolving towards becoming a stable API, but is not a stable API yet and can change from one feature release to another release.

In other words, using the contract is as treading on thin ice.

Aggregator is used when:

Table 1. Aggregator Contract
Method Description

bufferEncoder

Used when…​FIXME

finish

Used when…​FIXME

merge

Used when…​FIXME

outputEncoder

Used when…​FIXME

reduce

Used when…​FIXME

zero

Used when…​FIXME

Table 2. Aggregators
Aggregator Description

ParameterizedTypeSum

ReduceAggregator

TopByKeyAggregator

Used exclusively in Spark MLlib

TypedAverage

TypedCount

TypedSumDouble

TypedSumLong

Converting Aggregator to TypedColumn — toColumn Method

toColumn…​FIXME

Note
toColumn is used when…​FIXME

UserDefinedAggregateFunction — Contract for User-Defined Untyped Aggregate Functions (UDAFs)

admin阅读(1280)

UserDefinedAggregateFunction — Contract for User-Defined Untyped Aggregate Functions (UDAFs)

UserDefinedAggregateFunction is the contract to define user-defined aggregate functions (UDAFs).

UserDefinedAggregateFunction is created using apply or distinct factory methods.

The lifecycle of UserDefinedAggregateFunction is entirely managed using ScalaUDAF expression container.

spark sql UserDefinedAggregateFunction.png
Figure 1. UserDefinedAggregateFunction and ScalaUDAF Expression Container
Note

Use UDFRegistration to register a (temporary) UserDefinedAggregateFunction and use it in SQL mode.

UserDefinedAggregateFunction Contract

Table 1. (Subset of) UserDefinedAggregateFunction Contract
Method Description

bufferSchema

dataType

deterministic

evaluate

initialize

inputSchema

merge

update

Creating Column for UDAF — apply Method

apply creates a Column with ScalaUDAF (inside AggregateExpression).

Note
AggregateExpression uses Complete mode and isDistinct flag is disabled.

Creating Column for UDAF with Distinct Values — distinct Method

distinct creates a Column with ScalaUDAF (inside AggregateExpression).

Note
AggregateExpression uses Complete mode and isDistinct flag is enabled.
Note
distinct is like apply but has isDistinct flag enabled.

Dataset Checkpointing

admin阅读(778)

Dataset Checkpointing

Dataset Checkpointing is a feature of Spark SQL to truncate a logical query plan that could specifically be useful for highly iterative data algorithms (e.g. Spark MLlib that uses Spark SQL’s Dataset API for data manipulation).

Note

Checkpointing is actually a feature of Spark Core (that Spark SQL uses for distributed computations) that allows a driver to be restarted on failure with previously computed state of a distributed computation described as an RDD. That has been successfully used in Spark Streaming – the now-obsolete Spark module for stream processing based on RDD API.

Checkpointing truncates the lineage of a RDD to be checkpointed. That has been successfully used in Spark MLlib in iterative machine learning algorithms like ALS.

Dataset checkpointing in Spark SQL uses checkpointing to truncate the lineage of the underlying RDD of a Dataset being checkpointed.

Checkpointing can be eager or lazy per eager flag of checkpoint operator. Eager checkpointing is the default checkpointing and happens immediately when requested. Lazy checkpointing does not and will only happen when an action is executed.

Using Dataset checkpointing requires that you specify the checkpoint directory. The directory stores the checkpoint files for RDDs to be checkpointed. Use SparkContext.setCheckpointDir to set the path to a checkpoint directory.

Checkpointing can be local or reliable which defines how reliable the checkpoint directory is. Local checkpointing uses executor storage to write checkpoint files to and due to the executor lifecycle is considered unreliable. Reliable checkpointing uses a reliable data storage like Hadoop HDFS.

Table 1. Dataset Checkpointing Types
Eager Lazy

Reliable

checkpoint

checkpoint(eager = false)

Local

localCheckpoint

localCheckpoint(eager = false)

A RDD can be recovered from a checkpoint files using SparkContext.checkpointFile. You can use SparkSession.internalCreateDataFrame method to (re)create the DataFrame from the RDD of internal binary rows.

Tip

Enable INFO logging level for org.apache.spark.rdd.ReliableRDDCheckpointData logger to see what happens while an RDD is checkpointed.

Add the following line to conf/log4j.properties:

Refer to Logging.

Specyfing Checkpoint Directory — SparkContext.setCheckpointDir Method

setCheckpointDir sets the checkpoint directory.

Internally, setCheckpointDir…​FIXME

Recovering RDD From Checkpoint Files — SparkContext.checkpointFile Method

checkpointFile reads (recovers) a RDD from a checkpoint directory.

Note
SparkContext.checkpointFile is a protected[spark] method so the code to access it has to be in org.apache.spark package.

Internally, checkpointFile creates a ReliableCheckpointRDD in a scope.

User-Friendly Names Of Cached Queries in web UI’s Storage Tab

admin阅读(1001)

User-Friendly Names Of Cached Queries in web UI’s Storage Tab

As you may have noticed, web UI’s Storage tab displays some cached queries with user-friendly RDD names (e.g. “In-memory table [name]”) while others not (e.g. “Scan JDBCRelation…​”).

spark sql caching webui storage.png
Figure 1. Cached Queries in web UI (Storage Tab)

“In-memory table [name]” RDD names are the result of SQL’s CACHE TABLE or when Catalog is requested to cache a table.

The other RDD names are due to caching a Dataset.

Dataset Caching and Persistence

admin阅读(1806)

Dataset Caching and Persistence

Table 1. Caching Operators (Basic Actions)
Operator Description

cache

Basic action to cache a Dataset

persist

Basic action to persist a Dataset

unpersist

Basic action to unpersist a cached Dataset

Note

You can also use SQL’s CACHE TABLE [tableName] to cache tableName table in memory. Unlike cache and persist operators, CACHE TABLE is an eager operation which is executed as soon as the statement is executed.

You could however use LAZY keyword to make caching lazy.

Use SQL’s REFRESH TABLE [tableName] to refresh a cached table.

Use SQL’s UNCACHE TABLE (IF EXISTS)? [tableName] to remove a table from cache.

Use SQL’s CLEAR CACHE to remove all tables from cache.

Note

Be careful what you cache, i.e. what Dataset is cached, as it gives different queries cached.

Tip

You can check whether a Dataset was cached or not using the following code:

SQL’s CACHE TABLE

SQL’s CACHE TABLE corresponds to requesting the session-specific Catalog to caching the table.

Internally, CACHE TABLE becomes CacheTableCommand runnable command that…​FIXME

Multi-Dimensional Aggregation

admin阅读(1041)

Multi-Dimensional Aggregation

Multi-dimensional aggregate operators are enhanced variants of groupBy operator that allow you to create queries for subtotals, grand totals and superset of subtotals in one go.

Multi-dimensional aggregate operators are semantically equivalent to union operator (or SQL’s UNION ALL) to combine single grouping queries.

Note

It is assumed that using one of the operators is usually more efficient (than union and groupBy) as it gives more freedom for query optimization.

Table 1. Multi-dimensional Aggregate Operators
Operator Return Type Description

cube

RelationalGroupedDataset

Calculates subtotals and a grand total for every permutation of the columns specified.

rollup

RelationalGroupedDataset

Calculates subtotals and a grand total over (ordered) combination of groups.

Beside cube and rollup multi-dimensional aggregate operators, Spark SQL supports GROUPING SETS clause in SQL mode only.

Note
SQL’s GROUPING SETS is the most general aggregate “operator” and can generate the same dataset as using a simple groupBy, cube and rollup operators.

Tip
Review the examples per operator in the following sections.
Note
Support for multi-dimensional aggregate operators was added in [SPARK-6356] Support the ROLLUP/CUBE/GROUPING SETS/grouping() in SQLContext.

rollup Operator

rollup multi-dimensional aggregate operator is an extension of groupBy operator that calculates subtotals and a grand total across specified group of n + 1 dimensions (with n being the number of columns as cols and col1 and 1 for where values become null, i.e. undefined).

Note

rollup operator is commonly used for analysis over hierarchical data; e.g. total salary by department, division, and company-wide total.

Note
rollup operator is equivalent to GROUP BY ... WITH ROLLUP in SQL (which in turn is equivalent to GROUP BY ... GROUPING SETS ((a,b,c),(a,b),(a),()) when used with 3 columns: a, b, and c).

The ROLLUP, CUBE, and GROUPING SETS operators are extensions of the GROUP BY clause. The ROLLUP, CUBE, or GROUPING SETS operators can generate the same result set as when you use UNION ALL to combine single grouping queries; however, using one of the GROUP BY operators is usually more efficient.

References to the grouping columns or expressions are replaced by null values in result rows for grouping sets in which those columns do not appear.

From Summarizing Data Using ROLLUP in Microsoft’s TechNet:

The ROLLUP operator is useful in generating reports that contain subtotals and totals. (…​)
ROLLUP generates a result set that shows aggregates for a hierarchy of values in the selected columns.

From Hive’s Cubes and Rollups:

WITH ROLLUP is used with the GROUP BY only. ROLLUP clause is used with GROUP BY to compute the aggregate at the hierarchy levels of a dimension.

GROUP BY a, b, c with ROLLUP assumes that the hierarchy is “a” drilling down to “b” drilling down to “c”.

GROUP BY a, b, c, WITH ROLLUP is equivalent to GROUP BY a, b, c GROUPING SETS ( (a, b, c), (a, b), (a), ( )).

Note
Read up on ROLLUP in Hive’s LanguageManual in Grouping Sets, Cubes, Rollups, and the GROUPING__ID Function.

The individual elements of a CUBE or ROLLUP clause may be either individual expressions, or sublists of elements in parentheses. In the latter case, the sublists are treated as single units for the purposes of generating the individual grouping sets.

Internally, rollup converts the Dataset into a DataFrame (i.e. uses RowEncoder as the encoder) and then creates a RelationalGroupedDataset (with RollupType group type).

Note
Rollup expression represents GROUP BY ... WITH ROLLUP in SQL in Spark’s Catalyst Expression tree (after AstBuilder parses a structured query with aggregation).

cube Operator

cube multi-dimensional aggregate operator is an extension of groupBy operator that allows calculating subtotals and a grand total across all combinations of specified group of n + 1 dimensions (with n being the number of columns as cols and col1 and 1 for where values become null, i.e. undefined).

cube returns RelationalGroupedDataset that you can use to execute aggregate function or operator.

Note
cube is more than rollup operator, i.e. cube does rollup with aggregation over all the missing combinations given the columns.

GROUPING SETS SQL Clause

GROUPING SETS clause generates a dataset that is equivalent to union operator of multiple groupBy operators.

Internally, GROUPING SETS clause is parsed in withAggregation parsing handler (in AstBuilder) and becomes a GroupingSets logical operator internally.

Rollup GroupingSet with CodegenFallback Expression (for rollup Operator)

Rollup expression represents rollup operator in Spark’s Catalyst Expression tree (after AstBuilder parses a structured query with aggregation).

Note
GroupingSet is an Expression with CodegenFallback support.

Data Types

admin阅读(2380)

Data Types

DataType abstract class is the base type of all built-in data types in Spark SQL, e.g. strings, longs.

DataType has two main type families:

  • Atomic Types as an internal type to represent types that are not null, UDTs, arrays, structs, and maps

  • Numeric Types with fractional and integral types

Table 1. Standard Data Types
Type Family Data Type Scala Types

Atomic Types

(except fractional and integral types)

BinaryType

BooleanType

DateType

StringType

TimestampType

java.sql.Timestamp

Fractional Types

(concrete NumericType)

DecimalType

DoubleType

FloatType

Integral Types

(concrete NumericType)

ByteType

IntegerType

LongType

ShortType

ArrayType

CalendarIntervalType

MapType

NullType

ObjectType

StructType

UserDefinedType

AnyDataType

Matches any concrete data type

Caution
FIXME What about AbstractDataType?

You can extend the type system and create your own user-defined types (UDTs).

The DataType Contract defines methods to build SQL, JSON and string representations.

Note
DataType (and the concrete Spark SQL types) live in org.apache.spark.sql.types package.

You should use DataTypes object in your code to create complex Spark SQL types, i.e. arrays or maps.

DataType has support for Scala’s pattern matching using unapply method.

DataType Contract

Any type in Spark SQL follows the DataType contract which means that the types define the following methods:

  • json and prettyJson to build JSON representations of a data type

  • defaultSize to know the default size of values of a type

  • simpleString and catalogString to build user-friendly string representations (with the latter for external catalogs)

  • sql to build SQL representation

DataTypes — Factory Methods for Data Types

DataTypes is a Java class with methods to access simple or create complex DataType types in Spark SQL, i.e. arrays and maps.

Tip
It is recommended to use DataTypes class to define DataType types in a schema.

DataTypes lives in org.apache.spark.sql.types package.

Note

Simple DataType types themselves, i.e. StringType or CalendarIntervalType, come with their own Scala’s case objects alongside their definitions.

You may also import the types package and have access to the types.

UDTs — User-Defined Types

Caution
FIXME

StructField — Single Field in StructType

admin阅读(2069)

StructField — Single Field in StructType

StructField describes a single field in a StructType with the following:

  • Name

  • DataType

  • nullable flag (enabled by default)

  • Metadata (empty by default)

A comment is part of metadata under comment key and is used to build a Hive column or when describing a table.

As of Spark 2.4.0, StructField can be converted to DDL format using toDDL method.

Converting to DDL Format — toDDL Method

toDDL gives a text in the format:

Note

toDDL is used when:

StructType

admin阅读(2138)

StructType — Data Type for Schema Definition

StructType is a built-in data type that is a collection of StructFields.

StructType is used to define a schema or its part.

You can compare two StructType instances to see whether they are equal.

StructType presents itself as <struct> or STRUCT in query plans or SQL.

Note

StructType is a Seq[StructField] and therefore all things Seq apply equally here.

Read the official documentation of Scala’s scala.collection.Seq.

As of Spark 2.4.0, StructType can be converted to DDL format using toDDL method.

fromAttributes Method

fromAttributes…​FIXME

Note
fromAttributes is used when…​FIXME

toAttributes Method

toAttributes…​FIXME

Note
toAttributes is used when…​FIXME

Adding Fields to Schema — add Method

You can add a new StructField to your StructType. There are different variants of add method that all make for a new StructType with the field added.

DataType Name Conversions

StructType as a custom DataType is used in query plans or SQL. It can present itself using simpleString, catalogString or sql (see DataType Contract).

Accessing StructField — apply Method

StructType defines its own apply method that gives you an easy access to a StructField by name.

Creating StructType from Existing StructType — apply Method

This variant of apply lets you create a StructType out of an existing StructType with the names only.

It will throw an IllegalArgumentException exception when a field could not be found.

Displaying Schema As Tree — printTreeString Method

printTreeString prints out the schema to standard output.

Internally, it uses treeString method to build the tree and then println it.

Creating StructType For DDL-Formatted Text — fromDDL Object Method

fromDDL…​FIXME

Note
fromDDL is used when…​FIXME

Converting to DDL Format — toDDL Method

toDDL converts all the fields to DDL format and concatenates them using the comma (,).

Schema — Structure of Data

admin阅读(1494)

Schema — Structure of Data

A schema is the description of the structure of your data (which together create a Dataset in Spark SQL). It can be implicit (and inferred at runtime) or explicit (and known at compile time).

A schema is described using StructType which is a collection of StructField objects (that in turn are tuples of names, types, and nullability classifier).

StructType and StructField belong to the org.apache.spark.sql.types package.

You can use the canonical string representation of SQL types to describe the types in a schema (that is inherently untyped at compile type) or use type-safe types from the org.apache.spark.sql.types package.

Tip
Read up on CatalystSqlParser that is responsible for parsing data types.

It is however recommended to use the singleton DataTypes class with static methods to create schema types.

StructType offers printTreeString that makes presenting the schema more user-friendly.

As of Spark 2.0, you can describe the schema of your strongly-typed datasets using encoders.

Implicit Schema

关注公众号:spark技术分享

联系我们联系我们