关注 spark技术分享,
撸spark源码 玩spark最佳实践

Broadcast Joins (aka Map-Side Joins)

admin阅读(1370)

Broadcast Joins (aka Map-Side Joins)

Spark SQL uses broadcast join (aka broadcast hash join) instead of hash join to optimize join queries when the size of one side data is below spark.sql.autoBroadcastJoinThreshold.

Broadcast join can be very efficient for joins between a large table (fact) with relatively small tables (dimensions) that could then be used to perform a star-schema join. It can avoid sending all data of the large table over the network.

You can use broadcast function or SQL’s broadcast hints to mark a dataset to be broadcast when used in a join query.

Note
According to the article Map-Side Join in Spark, broadcast join is also called a replicated join (in the distributed system community) or a map-side join (in the Hadoop community).

CanBroadcast object matches a LogicalPlan with output small enough for broadcast join.

Note
Currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE [tableName] COMPUTE STATISTICS noscan has been run.

JoinSelection execution planning strategy uses spark.sql.autoBroadcastJoinThreshold property (default: 10M) to control the size of a dataset before broadcasting it to all worker nodes when performing a join.

Dataset Join Operators

admin阅读(1650)

Dataset Join Operators

From PostgreSQL’s 2.6. Joins Between Tables:

Queries can access multiple tables at once, or access the same table in such a way that multiple rows of the table are being processed at the same time. A query that accesses multiple rows of the same or different tables at one time is called a join query.

You can join two datasets using the join operators with an optional join condition.

Table 1. Join Operators
Operator Return Type Description

crossJoin

DataFrame

Untyped Row-based cross join

join

DataFrame

Untyped Row-based join

joinWith

Dataset

Used for a type-preserving join with two output columns for records for which a join condition holds

You can also use SQL mode to join datasets using good ol’ SQL.

You can specify a join condition (aka join expression) as part of join operators or using where or filter operators.

You can specify the join type as part of join operators (using joinType optional parameter).

Table 2. Join Types
SQL Name (joinType) JoinType

CROSS

cross

Cross

INNER

inner

Inner

FULL OUTER

outer, full, fullouter

FullOuter

LEFT ANTI

leftanti

LeftAnti

LEFT OUTER

leftouter, left

LeftOuter

LEFT SEMI

leftsemi

LeftSemi

RIGHT OUTER

rightouter, right

RightOuter

NATURAL

Special case for Inner, LeftOuter, RightOuter, FullOuter

NaturalJoin

USING

Special case for Inner, LeftOuter, LeftSemi, RightOuter, FullOuter, LeftAnti

UsingJoin

ExistenceJoin is an artifical join type used to express an existential sub-query, that is often referred to as existential join.

Note
LeftAnti and ExistenceJoin are special cases of LeftOuter.

You can also find that Spark SQL uses the following two families of joins:

Tip
Name are case-insensitive and can use the underscore (_) at any position, i.e. left_anti and LEFT_ANTI are equivalent.
Note
Spark SQL offers different join strategies with Broadcast Joins (aka Map-Side Joins) among them that are supposed to optimize your join queries over large distributed datasets.

join Operators

  1. Condition-less inner join

  2. Inner join with a single column that exists on both sides

  3. Inner join with columns that exist on both sides

  4. Equi-join with explicit join type

  5. Inner join

  6. Join with explicit join type. Self-joins are acceptable.

join joins two Datasets.

Internally, join(right: Dataset[_]) creates a DataFrame with a condition-less Join logical operator (in the current SparkSession).

Note
join(right: Dataset[_]) creates a logical plan with a condition-less Join operator with two child logical plans of the both sides of the join.
Note
join(right: Dataset[_], usingColumns: Seq[String], joinType: String) creates a logical plan with a condition-less Join operator with UsingJoin join type.
Note

join(right: Dataset[_], joinExprs: Column, joinType: String) accepts self-joins where joinExprs is of the form:

That is usually considered a trivially true condition and refused as acceptable.

With spark.sql.selfJoinAutoResolveAmbiguity option enabled (which it is by default), join will automatically resolve ambiguous join conditions into ones that might make sense.

crossJoin Method

crossJoin joins two Datasets using Cross join type with no condition.

Note
crossJoin creates an explicit cartesian join that can be very expensive without an extra filter (that can be pushed down).

Type-Preserving Joins — joinWith Operators

  1. inner equi-join

joinWith creates a Dataset with two columns _1 and _2 that each contain records for which condition holds.

Note
joinWith preserves type-safety with the original object types.
Note
joinWith creates a Dataset with Join logical plan.

KeyValueGroupedDataset — Typed Grouping

admin阅读(5024)

KeyValueGroupedDataset — Typed Grouping

KeyValueGroupedDataset is an experimental interface to calculate aggregates over groups of objects in a typed Dataset.

Note
RelationalGroupedDataset is used for untyped Row-based aggregates.

KeyValueGroupedDataset is created using Dataset.groupByKey operator.

Table 1. KeyValueGroupedDataset’s Aggregate Operators (KeyValueGroupedDataset API)
Operator Description

agg

cogroup

count

flatMapGroups

flatMapGroupsWithState

keys

keyAs

mapGroups

mapGroupsWithState

mapValues

reduceGroups

KeyValueGroupedDataset holds keys that were used for the object.

aggUntyped Internal Method

aggUntyped…​FIXME

Note
aggUntyped is used exclusively when KeyValueGroupedDataset.agg typed operator is used.

logicalPlan Internal Method

logicalPlan…​FIXME

Note
logicalPlan is used when…​FIXME

RelationalGroupedDataset — Untyped Row-based Grouping

admin阅读(4894)

RelationalGroupedDataset — Untyped Row-based Grouping

RelationalGroupedDataset is an interface to calculate aggregates over groups of rows in a DataFrame.

Note
KeyValueGroupedDataset is used for typed aggregates over groups of custom Scala objects (not Rows).

RelationalGroupedDataset is a result of executing the following grouping operators:

Table 1. RelationalGroupedDataset’s Aggregate Operators
Operator Description

agg

avg

count

max

mean

min

pivot

  1. New in 2.4.0

Pivots on a column (with new columns per distinct value)

sum

Note

spark.sql.retainGroupColumns configuration property controls whether to retain columns used for aggregation or not (in RelationalGroupedDataset operators).

spark.sql.retainGroupColumns is enabled by default.

Computing Aggregates Using Aggregate Column Expressions or Function Names — agg Operator

agg creates a DataFrame with the rows being the result of executing grouping expressions (specified using columns or names) over row groups.

Note
You can use untyped or typed column expressions.

Internally, agg creates a DataFrame with Aggregate or Pivot logical operators.

Creating DataFrame from Aggregate Expressions — toDF Internal Method

Caution
FIXME

Internally, toDF branches off per group type.

Caution
FIXME

For PivotType, toDF creates a DataFrame with Pivot unary logical operator.

Note

toDF is used when the following RelationalGroupedDataset operators are used:

aggregateNumericColumns Internal Method

aggregateNumericColumns…​FIXME

Note
aggregateNumericColumns is used when the following RelationalGroupedDataset operators are used: mean, max, avg, min and sum.

Creating RelationalGroupedDataset Instance

RelationalGroupedDataset takes the following when created:

  • DataFrame

  • Grouping expressions

  • Group type (to indicate the “source” operator)

    • GroupByType for groupBy

    • CubeType

    • RollupType

    • PivotType

pivot Operator

  1. Selects distinct and sorted values on pivotColumn and calls the other pivot (that results in 3 extra “scanning” jobs)

  2. Preferred as more efficient because the unique values are aleady provided

  3. New in 2.4.0

pivot pivots on a pivotColumn column, i.e. adds new columns per distinct values in pivotColumn.

Note
pivot is only supported after groupBy operation.
Note
Only one pivot operation is supported on a RelationalGroupedDataset.

Important
Use pivot with a list of distinct values to pivot on so Spark does not have to compute the list itself (and run three extra “scanning” jobs).
spark sql pivot webui.png
Figure 1. pivot in web UI (Distinct Values Defined Explicitly)
spark sql pivot webui scanning jobs.png
Figure 2. pivot in web UI — Three Extra Scanning Jobs Due to Unspecified Distinct Values
Note
spark.sql.pivotMaxValues (default: 10000) controls the maximum number of (distinct) values that will be collected without error (when doing pivot without specifying the values for the pivot column).

Internally, pivot creates a RelationalGroupedDataset with PivotType group type and pivotColumn resolved using the DataFrame’s columns with values as Literal expressions.

Note

toDF internal method maps PivotType group type to a DataFrame with Pivot unary logical operator.

strToExpr Internal Method

strToExpr…​FIXME

Note
strToExpr is used exclusively when RelationalGroupedDataset is requested to agg with aggregation functions specified by name

alias Method

alias…​FIXME

Note
alias is used exclusively when RelationalGroupedDataset is requested to create a DataFrame from aggregate expressions.

Basic Aggregation — Typed and Untyped Grouping Operators

admin阅读(1591)

Basic Aggregation — Typed and Untyped Grouping Operators

You can calculate aggregates over a group of rows in a Dataset using aggregate operators (possibly with aggregate functions).

Table 1. Aggregate Operators
Operator Return Type Description

agg

RelationalGroupedDataset

Aggregates with or without grouping (i.e. over an entire Dataset)

groupBy

RelationalGroupedDataset

Used for untyped aggregates using DataFrames. Grouping is described using column expressions or column names.

groupByKey

KeyValueGroupedDataset

Used for typed aggregates using Datasets with records grouped by a key-defining discriminator function.

Note
Aggregate functions without aggregate operators return a single value. If you want to find the aggregate values for each unique value (in a column), you should groupBy first (over this column) to build the groups.
Note

You can also use SparkSession to execute good ol’ SQL with GROUP BY should you prefer.

SQL or Dataset API’s operators go through the same query planning and optimizations, and have the same performance characteristic in the end.

Aggregates Over Subset Of or Whole Dataset — agg Operator

agg applies an aggregate function on a subset or the entire Dataset (i.e. considering the entire data set as one group).

Note
agg on a Dataset is simply a shortcut for groupBy().agg(…​).

agg can compute aggregate expressions on all the records in a Dataset.

Untyped Grouping — groupBy Operator

groupBy operator groups the rows in a Dataset by columns (as Column expressions or names).

groupBy gives a RelationalGroupedDataset to execute aggregate functions or operators.

Internally, groupBy resolves column names (possibly quoted) and creates a RelationalGroupedDataset (with groupType being GroupByType).

Note
The following uses the data setup as described in Test Setup section below.

Typed Grouping — groupByKey Operator

groupByKey groups records (of type T) by the input func and in the end returns a KeyValueGroupedDataset to apply aggregation to.

Note
groupByKey is Dataset‘s experimental API.

Test Setup

This is a setup for learning GroupedData. Paste it into Spark Shell using :paste.

  1. Cache the dataset so the following queries won’t load/recompute data over and over again.

TypedColumn

admin阅读(1969)

TypedColumn

TypedColumn is a Column with the ExpressionEncoder for the types of the input and the output.

TypedColumn is created using as operator on a Column.

name Operator

Note
name is part of Column Contract to…​FIXME.

name…​FIXME

Note
name is used when…​FIXME

Creating TypedColumn — withInputType Internal Method

withInputType…​FIXME

Note

withInputType is used when the following typed operators are used:

Creating TypedColumn Instance

TypedColumn takes the following when created:

TypedColumn initializes the internal registries and counters.

Column API — Column Operators

admin阅读(1773)

Column API — Column Operators

Column API is a set of operators to work with values in a column (of a Dataset).

Table 1. Column Operators
Operator Description

asc

asc_nulls_first

asc_nulls_last

desc

desc_nulls_first

desc_nulls_last

isin

isInCollection

(New in 2.4.0) An expression operator that is true if the value of the column is in the given values collection

isInCollection is simply a synonym of isin operator.

isin Operator

Internally, isin creates a Column with In predicate expression.

Column

admin阅读(4762)

Column

Column represents a column in a Dataset that holds a Catalyst Expression that produces a value per row.

Note
A Column is a value generator for every row in a Dataset.

A special column * references all columns in a Dataset.

With the implicits converstions imported, you can create “free” column references using Scala’s symbols.

Note
“Free” column references are Columns with no association to a Dataset.

You can also create free column references from $-prefixed strings.

Beside using the implicits conversions, you can create columns using col and column functions.

Finally, you can create a bound Column using the Dataset the column is supposed to be part of using Dataset.apply factory method or Dataset.col operator.

Note
You can use bound Column references only with the Datasets they have been created from.

You can reference nested columns using . (dot).

Table 1. Column Operators
Operator Description

as

Specifying type hint about the expected return value of the column

name

Note

Column has a reference to Catalyst’s Expression it was created for using expr method.

Tip
Read about typed column references in TypedColumn Expressions.

Specifying Type Hint — as Operator

as creates a TypedColumn (that gives a type hint about the expected return value of the column).

name Operator

name…​FIXME

Note
name is used when…​FIXME

Adding Column to Dataset — withColumn Method

withColumn method returns a new DataFrame with the new column col with colName name added.

Note
withColumn can replace an existing colName column.

You can add new columns do a Dataset using withColumn method.

Creating Column Instance For Catalyst Expression — apply Factory Method

like Operator

Caution
FIXME

Symbols As Column Names

Defining Windowing Column (Analytic Clause) — over Operator

over creates a windowing column (aka analytic clause) that allows to execute a aggregate function over a window (i.e. a group of records that are in some relation to the current record).

Tip
Read up on windowed aggregation in Spark SQL in Window Aggregate Functions.

cast Operator

cast method casts a column to a data type. It makes for type-safe maps with Row objects of the proper type (not Any).

cast uses CatalystSqlParser to parse the data type from its canonical string representation.

cast Example

generateAlias Method

generateAlias…​FIXME

Note

generateAlias is used when:

  • Column is requested to named

  • RelationalGroupedDataset is requested to alias

named Method

named…​FIXME

Note

named is used when the following operators are used:

DataFrameStatFunctions — Working With Statistic Functions

admin阅读(511)

DataFrameStatFunctions — Working With Statistic Functions

DataFrameStatFunctions is used to work with statistic functions in a structured query (a DataFrame).

Table 1. DataFrameStatFunctions API
Method Description

approxQuantile

bloomFilter

corr

countMinSketch

cov

crosstab

freqItems

sampleBy

DataFrameStatFunctions is available using stat untyped transformation.

approxQuantile Method

approxQuantile…​FIXME

bloomFilter Method

bloomFilter…​FIXME

buildBloomFilter Internal Method

buildBloomFilter…​FIXME

Note
convertToDouble is used when…​FIXME

corr Method

corr…​FIXME

countMinSketch Method

countMinSketch…​FIXME

cov Method

cov…​FIXME

crosstab Method

crosstab…​FIXME

freqItems Method

freqItems…​FIXME

sampleBy Method

sampleBy…​FIXME

DataFrameStatFunctions — Working With Statistic Functions

admin阅读(1110)

DataFrameStatFunctions — Working With Statistic Functions

DataFrameStatFunctions is used to work with statistic functions in a structured query (a DataFrame).

Table 1. DataFrameStatFunctions API
Method Description

approxQuantile

bloomFilter

corr

countMinSketch

cov

crosstab

freqItems

sampleBy

DataFrameStatFunctions is available using stat untyped transformation.

approxQuantile Method

approxQuantile…​FIXME

bloomFilter Method

bloomFilter…​FIXME

buildBloomFilter Internal Method

buildBloomFilter…​FIXME

Note
convertToDouble is used when…​FIXME

corr Method

corr…​FIXME

countMinSketch Method

countMinSketch…​FIXME

cov Method

cov…​FIXME

crosstab Method

crosstab…​FIXME

freqItems Method

freqItems…​FIXME

sampleBy Method

sampleBy…​FIXME

关注公众号:spark技术分享

联系我们联系我们