Catalyst DSL — Implicit Conversions for Catalyst Data Structures
Catalyst DSL is a collection of Scala implicit conversions for constructing Catalyst data structures, i.e. expressions and logical plans, more easily.
The goal of Catalyst DSL is to make working with Spark SQL’s building blocks easier (e.g. for testing or Spark SQL internals exploration).
| Name | Description |
|---|---|
|
Creates expressions
|
|
|
Adds operators to expressions for complex expressions |
|
|
Creates logical plans |
Catalyst DSL is part of org.apache.spark.sql.catalyst.dsl package object.
|
1 2 3 4 5 6 7 |
import org.apache.spark.sql.catalyst.dsl.expressions._ scala> :type $"hello" org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute |
|
Important
|
Some implicit conversions from the Catalyst DSL interfere with the implicits conversions from
Use You can also disable an implicit conversion using a trick described in How can an implicit be unimported from the Scala repl?
|
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ // ExpressionConversions import org.apache.spark.sql.catalyst.expressions.Literal scala> val trueLit: Literal = true trueLit: org.apache.spark.sql.catalyst.expressions.Literal = true import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute scala> val name: UnresolvedAttribute = 'name name: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute = 'name // NOTE: This conversion may not work, e.g. in spark-shell // There is another implicit conversion StringToColumn in SQLImplicits // It is automatically imported in spark-shell // See :imports val id: UnresolvedAttribute = $"id" import org.apache.spark.sql.catalyst.expressions.Expression scala> val expr: Expression = sum('id) expr: org.apache.spark.sql.catalyst.expressions.Expression = sum('id) // implicit class DslSymbol scala> 'hello.s res2: String = hello scala> 'hello.attr res4: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute = 'hello // implicit class DslString scala> "helo".expr res0: org.apache.spark.sql.catalyst.expressions.Expression = helo scala> "helo".attr res1: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute = 'helo // logical plans scala> val t1 = table("t1") t1: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 'UnresolvedRelation `t1` scala> val p = t1.select('*).serialize[String].where('id % 2 == 0) p: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 'Filter false +- 'SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#1] +- 'Project ['*] +- 'UnresolvedRelation `t1` // FIXME Does not work because SimpleAnalyzer's catalog is empty // the p plan references a t1 table import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer scala> p.analyze |
ImplicitOperators Implicit Conversions
Operators for expressions, i.e. in.
ExpressionConversions Implicit Conversions
ExpressionConversions implicit conversions add ImplicitOperators operators to Catalyst expressions.
Type Conversions to Literal Expressions
ExpressionConversions adds conversions of Scala native types (e.g. Boolean, Long, String, Date, Timestamp) and Spark SQL types (i.e. Decimal) to Literal expressions.
|
1 2 3 4 5 |
// DEMO FIXME |
Converting Symbols to UnresolvedAttribute and AttributeReference Expressions
ExpressionConversions adds conversions of Scala’s Symbol to UnresolvedAttribute and AttributeReference expressions.
|
1 2 3 4 5 |
// DEMO FIXME |
Converting $-Prefixed String Literals to UnresolvedAttribute Expressions
ExpressionConversions adds conversions of $"col name" to an UnresolvedAttribute expression.
|
1 2 3 4 5 |
// DEMO FIXME |
Adding Aggregate And Non-Aggregate Functions to Expressions
|
1 2 3 4 5 |
star(names: String*): Expression |
ExpressionConversions adds the aggregate and non-aggregate functions to Catalyst expressions (e.g. sum, count, upper, star, callFunction, windowSpec, windowExpr)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import org.apache.spark.sql.catalyst.dsl.expressions._ val s = star() import org.apache.spark.sql.catalyst.analysis.UnresolvedStar assert(s.isInstanceOf[UnresolvedStar]) val s = star("a", "b") scala> println(s) WrappedArray(a, b).* |
Creating UnresolvedFunction Expressions — function and distinctFunction Methods
ExpressionConversions allows creating UnresolvedFunction expressions with function and distinctFunction operators.
|
1 2 3 4 5 6 |
function(exprs: Expression*): UnresolvedFunction distinctFunction(exprs: Expression*): UnresolvedFunction |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
import org.apache.spark.sql.catalyst.dsl.expressions._ // Works with Scala Symbols only val f = 'f.function() scala> :type f org.apache.spark.sql.catalyst.analysis.UnresolvedFunction scala> f.isDistinct res0: Boolean = false val g = 'g.distinctFunction() scala> g.isDistinct res1: Boolean = true |
Creating AttributeReference Expressions With nullability On or Off — notNull and canBeNull Methods
ExpressionConversions adds canBeNull and notNull operators to create a AttributeReference with nullability turned on or off, respectively.
|
1 2 3 4 5 6 |
notNull: AttributeReference canBeNull: AttributeReference |
|
1 2 3 4 5 |
// DEMO FIXME |
Creating BoundReference — at Method
|
1 2 3 4 5 |
at(ordinal: Int): BoundReference |
ExpressionConversions adds at method to AttributeReferences to create BoundReference expressions.
|
1 2 3 4 5 6 7 8 |
import org.apache.spark.sql.catalyst.dsl.expressions._ val boundRef = 'hello.string.at(4) scala> println(boundRef) input[4, string, true] |
plans Implicit Conversions for Logical Plans
Creating UnresolvedHint Logical Operator — hint Method
plans adds hint method to create a UnresolvedHint logical operator.
|
1 2 3 4 5 |
hint(name: String, parameters: Any*): LogicalPlan |
Creating Join Logical Operator — join Method
join creates a Join logical operator.
|
1 2 3 4 5 6 7 8 |
join( otherPlan: LogicalPlan, joinType: JoinType = Inner, condition: Option[Expression] = None): LogicalPlan |
Creating UnresolvedRelation Logical Operator — table Method
table creates a UnresolvedRelation logical operator.
|
1 2 3 4 5 6 |
table(ref: String): LogicalPlan table(db: String, ref: String): LogicalPlan |
|
1 2 3 4 5 6 7 8 9 |
import org.apache.spark.sql.catalyst.dsl.plans._ val t1 = table("t1") scala> println(t1.treeString) 'UnresolvedRelation `t1` |
DslLogicalPlan Implicit Class
|
1 2 3 4 5 |
implicit class DslLogicalPlan(val logicalPlan: LogicalPlan) |
DslLogicalPlan implicit class is part of plans implicit conversions with extension methods (of logical operators) to build entire logical plans.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
select(exprs: Expression*): LogicalPlan where(condition: Expression): LogicalPlan filter[T: Encoder](func: T => Boolean): LogicalPlan filter[T: Encoder](func: FilterFunction[T]): LogicalPlan serialize[T: Encoder]: LogicalPlan deserialize[T: Encoder]: LogicalPlan limit(limitExpr: Expression): LogicalPlan join( otherPlan: LogicalPlan, joinType: JoinType = Inner, condition: Option[Expression] = None): LogicalPlan cogroup[Key: Encoder, Left: Encoder, Right: Encoder, Result: Encoder]( otherPlan: LogicalPlan, func: (Key, Iterator[Left], Iterator[Right]) => TraversableOnce[Result], leftGroup: Seq[Attribute], rightGroup: Seq[Attribute], leftAttr: Seq[Attribute], rightAttr: Seq[Attribute]): LogicalPlan orderBy(sortExprs: SortOrder*): LogicalPlan sortBy(sortExprs: SortOrder*): LogicalPlan groupBy(groupingExprs: Expression*)(aggregateExprs: Expression*): LogicalPlan window( windowExpressions: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder]): LogicalPlan subquery(alias: Symbol): LogicalPlan except(otherPlan: LogicalPlan): LogicalPlan intersect(otherPlan: LogicalPlan): LogicalPlan union(otherPlan: LogicalPlan): LogicalPlan generate( generator: Generator, unrequiredChildIndex: Seq[Int] = Nil, outer: Boolean = false, alias: Option[String] = None, outputNames: Seq[String] = Nil): LogicalPlan insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan as(alias: String): LogicalPlan coalesce(num: Integer): LogicalPlan repartition(num: Integer): LogicalPlan distribute(exprs: Expression*)(n: Int): LogicalPlan hint(name: String, parameters: Any*): LogicalPlan |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
// Import plans object // That loads implicit class DslLogicalPlan // And so every LogicalPlan is the "target" of the DslLogicalPlan methods import org.apache.spark.sql.catalyst.dsl.plans._ val t1 = table(ref = "t1") // HACK: Disable symbolToColumn implicit conversion // It is imported automatically in spark-shell (and makes demos impossible) // implicit def symbolToColumn(s: Symbol): org.apache.spark.sql.ColumnName trait ThatWasABadIdea implicit def symbolToColumn(ack: ThatWasABadIdea) = ack import org.apache.spark.sql.catalyst.dsl.expressions._ val id = 'id.long val logicalPlan = t1.select(id) scala> println(logicalPlan.numberedTreeString) 00 'Project [id#1L] 01 +- 'UnresolvedRelation `t1` val t2 = table("t2") import org.apache.spark.sql.catalyst.plans.LeftSemi val logicalPlan = t1.join(t2, joinType = LeftSemi, condition = Some(id)) scala> println(logicalPlan.numberedTreeString) 00 'Join LeftSemi, id#1: bigint 01 :- 'UnresolvedRelation `t1` 02 +- 'UnresolvedRelation `t2` |
Analyzing Logical Plan — analyze Method
|
1 2 3 4 5 |
analyze: LogicalPlan |
analyze resolves attribute references.
analyze method is part of DslLogicalPlan implicit class.
Internally, analyze uses EliminateSubqueryAliases logical optimization and SimpleAnalyzer logical analyzer.
|
1 2 3 4 5 |
// DEMO FIXME |
spark技术分享