Dataset Join Operators
From PostgreSQL’s 2.6. Joins Between Tables:
Queries can access multiple tables at once, or access the same table in such a way that multiple rows of the table are being processed at the same time. A query that accesses multiple rows of the same or different tables at one time is called a join query.
You can join two datasets using the join operators with an optional join condition.
Operator | Return Type | Description |
---|---|---|
Untyped |
||
Untyped |
||
Used for a type-preserving join with two output columns for records for which a join condition holds |
You can also use SQL mode to join datasets using good ol’ SQL.
1 2 3 4 5 6 |
val spark: SparkSession = ... spark.sql("select * from t1, t2 where t1.id = t2.id") |
You can specify a join condition (aka join expression) as part of join operators or using where or filter operators.
1 2 3 4 5 6 7 |
df1.join(df2, $"df1Key" === $"df2Key") df1.join(df2).where($"df1Key" === $"df2Key") df1.join(df2).filter($"df1Key" === $"df2Key") |
You can specify the join type as part of join operators (using joinType
optional parameter).
1 2 3 4 5 |
df1.join(df2, $"df1Key" === $"df2Key", "inner") |
SQL | Name (joinType) | JoinType |
---|---|---|
|
|
|
|
||
|
||
|
||
|
||
Special case for |
|
|
Special case for |
ExistenceJoin
is an artifical join type used to express an existential sub-query, that is often referred to as existential join.
Note
|
LeftAnti and ExistenceJoin are special cases of LeftOuter. |
You can also find that Spark SQL uses the following two families of joins:
-
LeftExistence
with LeftSemi, LeftAnti and ExistenceJoin
Tip
|
Name are case-insensitive and can use the underscore (_ ) at any position, i.e. left_anti and LEFT_ANTI are equivalent.
|
Note
|
Spark SQL offers different join strategies with Broadcast Joins (aka Map-Side Joins) among them that are supposed to optimize your join queries over large distributed datasets. |
join
Operators
1 2 3 4 5 6 7 8 9 10 |
join(right: Dataset[_]): DataFrame (1) join(right: Dataset[_], usingColumn: String): DataFrame (2) join(right: Dataset[_], usingColumns: Seq[String]): DataFrame (3) join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame (4) join(right: Dataset[_], joinExprs: Column): DataFrame (5) join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame (6) |
-
Condition-less inner join
-
Inner join with a single column that exists on both sides
-
Inner join with columns that exist on both sides
-
Equi-join with explicit join type
-
Inner join
-
Join with explicit join type. Self-joins are acceptable.
join
joins two Dataset
s.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
val left = Seq((0, "zero"), (1, "one")).toDF("id", "left") val right = Seq((0, "zero"), (2, "two"), (3, "three")).toDF("id", "right") // Inner join scala> left.join(right, "id").show +---+----+-----+ | id|left|right| +---+----+-----+ | 0|zero| zero| +---+----+-----+ scala> left.join(right, "id").explain == Physical Plan == *Project [id#50, left#51, right#61] +- *BroadcastHashJoin [id#50], [id#60], Inner, BuildRight :- LocalTableScan [id#50, left#51] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [id#60, right#61] // Full outer scala> left.join(right, Seq("id"), "fullouter").show +---+----+-----+ | id|left|right| +---+----+-----+ | 1| one| null| | 3|null|three| | 2|null| two| | 0|zero| zero| +---+----+-----+ scala> left.join(right, Seq("id"), "fullouter").explain == Physical Plan == *Project [coalesce(id#50, id#60) AS id#85, left#51, right#61] +- SortMergeJoin [id#50], [id#60], FullOuter :- *Sort [id#50 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#50, 200) : +- LocalTableScan [id#50, left#51] +- *Sort [id#60 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#60, 200) +- LocalTableScan [id#60, right#61] // Left anti scala> left.join(right, Seq("id"), "leftanti").show +---+----+ | id|left| +---+----+ | 1| one| +---+----+ scala> left.join(right, Seq("id"), "leftanti").explain == Physical Plan == *BroadcastHashJoin [id#50], [id#60], LeftAnti, BuildRight :- LocalTableScan [id#50, left#51] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [id#60] |
Internally, join(right: Dataset[_])
creates a DataFrame with a condition-less Join logical operator (in the current SparkSession).
Note
|
join(right: Dataset[_]) creates a logical plan with a condition-less Join operator with two child logical plans of the both sides of the join.
|
Note
|
join(right: Dataset[_], usingColumns: Seq[String], joinType: String) creates a logical plan with a condition-less Join operator with UsingJoin join type.
|
Note
|
That is usually considered a trivially true condition and refused as acceptable. With spark.sql.selfJoinAutoResolveAmbiguity option enabled (which it is by default), |
crossJoin
Method
1 2 3 4 5 |
crossJoin(right: Dataset[_]): DataFrame |
Note
|
crossJoin creates an explicit cartesian join that can be very expensive without an extra filter (that can be pushed down).
|
Type-Preserving Joins — joinWith
Operators
1 2 3 4 5 6 |
joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] (1) joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] |
-
inner equi-join
joinWith
creates a Dataset with two columns _1
and _2
that each contain records for which condition
holds.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
case class Person(id: Long, name: String, cityId: Long) case class City(id: Long, name: String) val family = Seq( Person(0, "Agata", 0), Person(1, "Iweta", 0), Person(2, "Patryk", 2), Person(3, "Maksym", 0)).toDS val cities = Seq( City(0, "Warsaw"), City(1, "Washington"), City(2, "Sopot")).toDS val joined = family.joinWith(cities, family("cityId") === cities("id")) scala> joined.printSchema root |-- _1: struct (nullable = false) | |-- id: long (nullable = false) | |-- name: string (nullable = true) | |-- cityId: long (nullable = false) |-- _2: struct (nullable = false) | |-- id: long (nullable = false) | |-- name: string (nullable = true) scala> joined.show +------------+----------+ | _1| _2| +------------+----------+ | [0,Agata,0]|[0,Warsaw]| | [1,Iweta,0]|[0,Warsaw]| |[2,Patryk,2]| [2,Sopot]| |[3,Maksym,0]|[0,Warsaw]| +------------+----------+ |
Note
|
joinWith preserves type-safety with the original object types.
|
Note
|
joinWith creates a Dataset with Join logical plan.
|