ScalaUDF — Catalyst Expression to Manage Lifecycle of User-Defined Function
ScalaUDF
is a Catalyst expression to manage the lifecycle of a user-defined function (and hook it in to Spark SQL’s Catalyst execution path).
ScalaUDF
is a ImplicitCastInputTypes
and UserDefinedExpression
.
ScalaUDF
has no representation in SQL.
ScalaUDF
is created when:
-
UserDefinedFunction
is executed -
UDFRegistration
is requested to register a Scala function as a user-defined function (inFunctionRegistry
)
1 2 3 4 5 6 7 8 9 10 11 12 |
val lengthUDF = udf { s: String => s.length }.withName("lengthUDF") val c = lengthUDF($"name") scala> println(c.expr.treeString) UDF:lengthUDF('name) +- 'name import org.apache.spark.sql.catalyst.expressions.ScalaUDF val scalaUDF = c.expr.asInstanceOf[ScalaUDF] |
Note
|
Spark SQL Analyzer uses HandleNullInputsForUDF logical evaluation rule to…FIXME |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
// Defining a zero-argument UDF val myUDF = udf { () => "Hello World" } // "Execute" the UDF // Attach it to an "execution environment", i.e. a Dataset // by specifying zero columns to execute on (since the UDF is no-arg) import org.apache.spark.sql.catalyst.expressions.ScalaUDF val scalaUDF = myUDF().expr.asInstanceOf[ScalaUDF] scala> scalaUDF.resolved res1: Boolean = true // Execute the UDF (on every row in a Dataset) // We simulate it relying on the EmptyRow that is the default InternalRow of eval scala> scalaUDF.eval() res2: Any = Hello World // Defining a UDF of one input parameter val hello = udf { s: String => s"Hello $s" } // Binding the hello UDF to a column name import org.apache.spark.sql.catalyst.expressions.ScalaUDF val helloScalaUDF = hello($"name").expr.asInstanceOf[ScalaUDF] scala> helloScalaUDF.resolved res3: Boolean = false // Resolve helloScalaUDF, i.e. the only `name` column reference scala> helloScalaUDF.children res4: Seq[org.apache.spark.sql.catalyst.expressions.Expression] = ArrayBuffer('name) // The column is free (i.e. not bound to a Dataset) // Define a Dataset that becomes the rows for the UDF val names = Seq("Jacek", "Agata").toDF("name") scala> println(names.queryExecution.analyzed.numberedTreeString) 00 Project [value#1 AS name#3] 01 +- LocalRelation [value#1] // Resolve the references using the Dataset val plan = names.queryExecution.analyzed val resolver = spark.sessionState.analyzer.resolver import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute val resolvedUDF = helloScalaUDF.transformUp { case a @ UnresolvedAttribute(names) => // we're in controlled environment // so get is safe plan.resolve(names, resolver).get } scala> resolvedUDF.resolved res6: Boolean = true scala> println(resolvedUDF.numberedTreeString) 00 UDF(name#3) 01 +- name#3: string import org.apache.spark.sql.catalyst.expressions.BindReferences val attrs = names.queryExecution.sparkPlan.output val boundUDF = BindReferences.bindReference(resolvedUDF, attrs) // Create an internal binary row, i.e. InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder val stringEncoder = ExpressionEncoder[String] val row = stringEncoder.toRow("world") // YAY! It works! scala> boundUDF.eval(row) res8: Any = Hello world // Just to show the regular execution path // i.e. how to execute a UDF in a context of a Dataset val q = names.select(hello($"name")) scala> q.show +-----------+ | UDF(name)| +-----------+ |Hello Jacek| |Hello Agata| +-----------+ |
Generating Java Source Code (ExprCode) For Code-Generated Expression Evaluation — doGenCode
Method
1 2 3 4 5 |
doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode |
Note
|
doGenCode is part of Expression Contract to generate a Java source code (ExprCode) for code-generated expression evaluation.
|
doGenCode
…FIXME
Evaluating Expression — eval
Method
1 2 3 4 5 |
eval(input: InternalRow): Any |
Note
|
eval is part of Expression Contract for the interpreted (non-code-generated) expression evaluation, i.e. evaluating a Catalyst expression to a JVM object for a given internal binary row.
|
eval
executes the Scala function on the input internal row.
Creating ScalaUDF Instance
ScalaUDF
takes the following when created:
-
Output data type
-
Child Catalyst expressions
-
Input data types (if available)
ScalaUDF
initializes the internal registries and counters.