Catalyst DSL — Implicit Conversions for Catalyst Data Structures
Catalyst DSL is a collection of Scala implicit conversions for constructing Catalyst data structures, i.e. expressions and logical plans, more easily.
The goal of Catalyst DSL is to make working with Spark SQL’s building blocks easier (e.g. for testing or Spark SQL internals exploration).
Name | Description |
---|---|
Creates expressions
|
|
Adds operators to expressions for complex expressions |
|
Creates logical plans |
Catalyst DSL is part of org.apache.spark.sql.catalyst.dsl
package object.
1 2 3 4 5 6 7 |
import org.apache.spark.sql.catalyst.dsl.expressions._ scala> :type $"hello" org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute |
Important
|
Some implicit conversions from the Catalyst DSL interfere with the implicits conversions from
Use You can also disable an implicit conversion using a trick described in How can an implicit be unimported from the Scala repl?
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ // ExpressionConversions import org.apache.spark.sql.catalyst.expressions.Literal scala> val trueLit: Literal = true trueLit: org.apache.spark.sql.catalyst.expressions.Literal = true import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute scala> val name: UnresolvedAttribute = 'name name: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute = 'name // NOTE: This conversion may not work, e.g. in spark-shell // There is another implicit conversion StringToColumn in SQLImplicits // It is automatically imported in spark-shell // See :imports val id: UnresolvedAttribute = $"id" import org.apache.spark.sql.catalyst.expressions.Expression scala> val expr: Expression = sum('id) expr: org.apache.spark.sql.catalyst.expressions.Expression = sum('id) // implicit class DslSymbol scala> 'hello.s res2: String = hello scala> 'hello.attr res4: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute = 'hello // implicit class DslString scala> "helo".expr res0: org.apache.spark.sql.catalyst.expressions.Expression = helo scala> "helo".attr res1: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute = 'helo // logical plans scala> val t1 = table("t1") t1: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 'UnresolvedRelation `t1` scala> val p = t1.select('*).serialize[String].where('id % 2 == 0) p: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 'Filter false +- 'SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#1] +- 'Project ['*] +- 'UnresolvedRelation `t1` // FIXME Does not work because SimpleAnalyzer's catalog is empty // the p plan references a t1 table import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer scala> p.analyze |
ImplicitOperators
Implicit Conversions
Operators for expressions, i.e. in
.
ExpressionConversions
Implicit Conversions
ExpressionConversions
implicit conversions add ImplicitOperators operators to Catalyst expressions.
Type Conversions to Literal Expressions
ExpressionConversions
adds conversions of Scala native types (e.g. Boolean
, Long
, String
, Date
, Timestamp
) and Spark SQL types (i.e. Decimal
) to Literal expressions.
1 2 3 4 5 |
// DEMO FIXME |
Converting Symbols to UnresolvedAttribute and AttributeReference Expressions
ExpressionConversions
adds conversions of Scala’s Symbol
to UnresolvedAttribute and AttributeReference
expressions.
1 2 3 4 5 |
// DEMO FIXME |
Converting $-Prefixed String Literals to UnresolvedAttribute Expressions
ExpressionConversions
adds conversions of $"col name"
to an UnresolvedAttribute expression.
1 2 3 4 5 |
// DEMO FIXME |
Adding Aggregate And Non-Aggregate Functions to Expressions
1 2 3 4 5 |
star(names: String*): Expression |
ExpressionConversions
adds the aggregate and non-aggregate functions to Catalyst expressions (e.g. sum
, count
, upper
, star
, callFunction
, windowSpec
, windowExpr
)
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import org.apache.spark.sql.catalyst.dsl.expressions._ val s = star() import org.apache.spark.sql.catalyst.analysis.UnresolvedStar assert(s.isInstanceOf[UnresolvedStar]) val s = star("a", "b") scala> println(s) WrappedArray(a, b).* |
Creating UnresolvedFunction Expressions — function
and distinctFunction
Methods
ExpressionConversions
allows creating UnresolvedFunction expressions with function
and distinctFunction
operators.
1 2 3 4 5 6 |
function(exprs: Expression*): UnresolvedFunction distinctFunction(exprs: Expression*): UnresolvedFunction |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
import org.apache.spark.sql.catalyst.dsl.expressions._ // Works with Scala Symbols only val f = 'f.function() scala> :type f org.apache.spark.sql.catalyst.analysis.UnresolvedFunction scala> f.isDistinct res0: Boolean = false val g = 'g.distinctFunction() scala> g.isDistinct res1: Boolean = true |
Creating AttributeReference Expressions With nullability On or Off — notNull
and canBeNull
Methods
ExpressionConversions
adds canBeNull
and notNull
operators to create a AttributeReference
with nullability
turned on or off, respectively.
1 2 3 4 5 6 |
notNull: AttributeReference canBeNull: AttributeReference |
1 2 3 4 5 |
// DEMO FIXME |
Creating BoundReference — at
Method
1 2 3 4 5 |
at(ordinal: Int): BoundReference |
ExpressionConversions
adds at
method to AttributeReferences
to create BoundReference expressions.
1 2 3 4 5 6 7 8 |
import org.apache.spark.sql.catalyst.dsl.expressions._ val boundRef = 'hello.string.at(4) scala> println(boundRef) input[4, string, true] |
plans
Implicit Conversions for Logical Plans
Creating UnresolvedHint Logical Operator — hint
Method
plans
adds hint
method to create a UnresolvedHint logical operator.
1 2 3 4 5 |
hint(name: String, parameters: Any*): LogicalPlan |
Creating Join Logical Operator — join
Method
join
creates a Join logical operator.
1 2 3 4 5 6 7 8 |
join( otherPlan: LogicalPlan, joinType: JoinType = Inner, condition: Option[Expression] = None): LogicalPlan |
Creating UnresolvedRelation Logical Operator — table
Method
table
creates a UnresolvedRelation logical operator.
1 2 3 4 5 6 |
table(ref: String): LogicalPlan table(db: String, ref: String): LogicalPlan |
1 2 3 4 5 6 7 8 9 |
import org.apache.spark.sql.catalyst.dsl.plans._ val t1 = table("t1") scala> println(t1.treeString) 'UnresolvedRelation `t1` |
DslLogicalPlan
Implicit Class
1 2 3 4 5 |
implicit class DslLogicalPlan(val logicalPlan: LogicalPlan) |
DslLogicalPlan
implicit class is part of plans implicit conversions with extension methods (of logical operators) to build entire logical plans.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
select(exprs: Expression*): LogicalPlan where(condition: Expression): LogicalPlan filter[T: Encoder](func: T => Boolean): LogicalPlan filter[T: Encoder](func: FilterFunction[T]): LogicalPlan serialize[T: Encoder]: LogicalPlan deserialize[T: Encoder]: LogicalPlan limit(limitExpr: Expression): LogicalPlan join( otherPlan: LogicalPlan, joinType: JoinType = Inner, condition: Option[Expression] = None): LogicalPlan cogroup[Key: Encoder, Left: Encoder, Right: Encoder, Result: Encoder]( otherPlan: LogicalPlan, func: (Key, Iterator[Left], Iterator[Right]) => TraversableOnce[Result], leftGroup: Seq[Attribute], rightGroup: Seq[Attribute], leftAttr: Seq[Attribute], rightAttr: Seq[Attribute]): LogicalPlan orderBy(sortExprs: SortOrder*): LogicalPlan sortBy(sortExprs: SortOrder*): LogicalPlan groupBy(groupingExprs: Expression*)(aggregateExprs: Expression*): LogicalPlan window( windowExpressions: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder]): LogicalPlan subquery(alias: Symbol): LogicalPlan except(otherPlan: LogicalPlan): LogicalPlan intersect(otherPlan: LogicalPlan): LogicalPlan union(otherPlan: LogicalPlan): LogicalPlan generate( generator: Generator, unrequiredChildIndex: Seq[Int] = Nil, outer: Boolean = false, alias: Option[String] = None, outputNames: Seq[String] = Nil): LogicalPlan insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan as(alias: String): LogicalPlan coalesce(num: Integer): LogicalPlan repartition(num: Integer): LogicalPlan distribute(exprs: Expression*)(n: Int): LogicalPlan hint(name: String, parameters: Any*): LogicalPlan |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
// Import plans object // That loads implicit class DslLogicalPlan // And so every LogicalPlan is the "target" of the DslLogicalPlan methods import org.apache.spark.sql.catalyst.dsl.plans._ val t1 = table(ref = "t1") // HACK: Disable symbolToColumn implicit conversion // It is imported automatically in spark-shell (and makes demos impossible) // implicit def symbolToColumn(s: Symbol): org.apache.spark.sql.ColumnName trait ThatWasABadIdea implicit def symbolToColumn(ack: ThatWasABadIdea) = ack import org.apache.spark.sql.catalyst.dsl.expressions._ val id = 'id.long val logicalPlan = t1.select(id) scala> println(logicalPlan.numberedTreeString) 00 'Project [id#1L] 01 +- 'UnresolvedRelation `t1` val t2 = table("t2") import org.apache.spark.sql.catalyst.plans.LeftSemi val logicalPlan = t1.join(t2, joinType = LeftSemi, condition = Some(id)) scala> println(logicalPlan.numberedTreeString) 00 'Join LeftSemi, id#1: bigint 01 :- 'UnresolvedRelation `t1` 02 +- 'UnresolvedRelation `t2` |
Analyzing Logical Plan — analyze
Method
1 2 3 4 5 |
analyze: LogicalPlan |
analyze
resolves attribute references.
analyze
method is part of DslLogicalPlan implicit class.
Internally, analyze
uses EliminateSubqueryAliases logical optimization and SimpleAnalyzer
logical analyzer.
1 2 3 4 5 |
// DEMO FIXME |