ExpressionEncoder — Expression-Based Encoder
ExpressionEncoder[T]
is a generic Encoder of JVM objects of the type T
to and from internal binary rows.
ExpressionEncoder[T]
uses expressions for a serializer and a deserializer.
Note
|
ExpressionEncoder is the only supported implementation of Encoder which is explicitly enforced when Dataset is created (even though Dataset data structure accepts a bare Encoder[T] ).
|
1 2 3 4 5 6 7 8 9 10 11 12 |
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder val stringEncoder = ExpressionEncoder[String] scala> val row = stringEncoder.toRow("hello world") row: org.apache.spark.sql.catalyst.InternalRow = [0,100000000b,6f77206f6c6c6568,646c72] import org.apache.spark.sql.catalyst.expressions.UnsafeRow scala> val unsafeRow = row match { case ur: UnsafeRow => ur } unsafeRow: org.apache.spark.sql.catalyst.expressions.UnsafeRow = [0,100000000b,6f77206f6c6c6568,646c72] |
ExpressionEncoder
uses serializer expressions to encode (aka serialize) a JVM object of type T
to an internal binary row format (i.e. InternalRow
).
Note
|
It is assumed that all serializer expressions contain at least one and the same BoundReference. |
ExpressionEncoder
uses a deserializer expression to decode (aka deserialize) a JVM object of type T
from internal binary row format.
ExpressionEncoder
is flat when serializer uses a single expression (which also means that the objects of a type T
are not created using constructor parameters only like Product
or DefinedByConstructorParams
types).
Internally, a ExpressionEncoder
creates a UnsafeProjection (for the input serializer), a InternalRow (of size 1
), and a safe Projection
(for the input deserializer). They are all internal lazy attributes of the encoder.
Property | Description |
---|---|
Used exclusively when |
|
Used exclusively when |
|
Used…FIXME |
Note
|
Encoders object contains the default ExpressionEncoders for Scala and Java primitive types, e.g. boolean , long , String , java.sql.Date , java.sql.Timestamp , Array[Byte] .
|
Creating ExpressionEncoder — apply
Method
1 2 3 4 5 |
apply[T : TypeTag](): ExpressionEncoder[T] |
Caution
|
FIXME |
Creating ExpressionEncoder Instance
ExpressionEncoder
takes the following when created:
-
Serializer expressions (to convert objects of type
T
to internal rows) -
Deserializer expression (to convert internal rows to objects of type
T
) -
Scala’s ClassTag for the JVM type
T
Creating Deserialize Expression — ScalaReflection.deserializerFor
Method
1 2 3 4 5 |
deserializerFor[T: TypeTag]: Expression |
deserializerFor
creates an expression to deserialize from internal binary row format to a Scala object of type T
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import org.apache.spark.sql.catalyst.ScalaReflection.deserializerFor val timestampDeExpr = deserializerFor[java.sql.Timestamp] scala> println(timestampDeExpr.numberedTreeString) 00 staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - root class: "java.sql.Timestamp"), true) 01 +- upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - root class: "java.sql.Timestamp") 02 +- getcolumnbyordinal(0, TimestampType) val tuple2DeExpr = deserializerFor[(java.sql.Timestamp, Double)] scala> println(tuple2DeExpr.numberedTreeString) 00 newInstance(class scala.Tuple2) 01 :- staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - field (class: "java.sql.Timestamp", name: "_1"), - root class: "scala.Tuple2"), true) 02 : +- upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - field (class: "java.sql.Timestamp", name: "_1"), - root class: "scala.Tuple2") 03 : +- getcolumnbyordinal(0, TimestampType) 04 +- upcast(getcolumnbyordinal(1, DoubleType), DoubleType, - field (class: "scala.Double", name: "_2"), - root class: "scala.Tuple2") 05 +- getcolumnbyordinal(1, DoubleType) |
Internally, deserializerFor
calls the recursive internal variant of deserializerFor with a single-element walked type path with - root class: "[clsName]"
Tip
|
Read up on Scala’s TypeTags in TypeTags and Manifests.
|
Note
|
deserializerFor is used exclusively when ExpressionEncoder is created for a Scala type T .
|
Recursive Internal deserializerFor
Method
1 2 3 4 5 6 7 8 |
deserializerFor( tpe: `Type`, path: Option[Expression], walkedTypePath: Seq[String]): Expression |
JVM Type (Scala or Java) | Deserialize Expressions |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
User Defined Types (UDTs) |
|
Creating Serialize Expression — ScalaReflection.serializerFor
Method
1 2 3 4 5 |
serializerFor[T: TypeTag](inputObject: Expression): CreateNamedStruct |
serializerFor
creates a CreateNamedStruct expression to serialize a Scala object of type T
to internal binary row format.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import org.apache.spark.sql.catalyst.ScalaReflection.serializerFor import org.apache.spark.sql.catalyst.expressions.BoundReference import org.apache.spark.sql.types.TimestampType val boundRef = BoundReference(ordinal = 0, dataType = TimestampType, nullable = true) val timestampSerExpr = serializerFor[java.sql.Timestamp](boundRef) scala> println(timestampSerExpr.numberedTreeString) 00 named_struct(value, input[0, timestamp, true]) 01 :- value 02 +- input[0, timestamp, true] |
Internally, serializerFor
calls the recursive internal variant of serializerFor with a single-element walked type path with - root class: "[clsName]"
and pattern match on the result expression.
Caution
|
FIXME the pattern match part |
Tip
|
Read up on Scala’s TypeTags in TypeTags and Manifests.
|
Note
|
serializerFor is used exclusively when ExpressionEncoder is created for a Scala type T .
|
Recursive Internal serializerFor
Method
1 2 3 4 5 6 7 8 9 |
serializerFor( inputObject: Expression, tpe: `Type`, walkedTypePath: Seq[String], seenTypeSet: Set[`Type`] = Set.empty): Expression |
serializerFor
creates an expression for serializing an object of type T
to an internal row.
Caution
|
FIXME |
Encoding JVM Object to Internal Binary Row Format — toRow
Method
1 2 3 4 5 |
toRow(t: T): InternalRow |
toRow
encodes (aka serializes) a JVM object t
as an internal binary row.
Internally, toRow
sets the only JVM object to be t
in inputRow and converts the inputRow
to a unsafe binary row (using extractProjection).
In case of any exception while serializing, toRow
reports a RuntimeException
:
1 2 3 4 5 6 |
Error while encoding: [initial exception] [multi-line serializer] |
Note
|
|
Decoding JVM Object From Internal Binary Row Format — fromRow
Method
1 2 3 4 5 |
fromRow(row: InternalRow): T |
fromRow
decodes (aka deserializes) a JVM object from a row
InternalRow (with the required values only).
Internally, fromRow
uses constructProjection with row
and gets the 0th element of type ObjectType
that is then cast to the output type T
.
In case of any exception while deserializing, fromRow
reports a RuntimeException
:
1 2 3 4 5 6 |
Error while decoding: [initial exception] [deserializer] |
Note
|
|
Creating ExpressionEncoder For Tuple — tuple
Method
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] tuple[T](e: ExpressionEncoder[T]): ExpressionEncoder[Tuple1[T]] tuple[T1, T2]( e1: ExpressionEncoder[T1], e2: ExpressionEncoder[T2]): ExpressionEncoder[(T1, T2)] tuple[T1, T2, T3]( e1: ExpressionEncoder[T1], e2: ExpressionEncoder[T2], e3: ExpressionEncoder[T3]): ExpressionEncoder[(T1, T2, T3)] tuple[T1, T2, T3, T4]( e1: ExpressionEncoder[T1], e2: ExpressionEncoder[T2], e3: ExpressionEncoder[T3], e4: ExpressionEncoder[T4]): ExpressionEncoder[(T1, T2, T3, T4)] tuple[T1, T2, T3, T4, T5]( e1: ExpressionEncoder[T1], e2: ExpressionEncoder[T2], e3: ExpressionEncoder[T3], e4: ExpressionEncoder[T4], e5: ExpressionEncoder[T5]): ExpressionEncoder[(T1, T2, T3, T4, T5)] |
tuple
…FIXME
Note
|
tuple is used when…FIXME
|
resolveAndBind
Method
1 2 3 4 5 6 7 |
resolveAndBind( attrs: Seq[Attribute] = schema.toAttributes, analyzer: Analyzer = SimpleAnalyzer): ExpressionEncoder[T] |
resolveAndBind
…FIXME
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
// A very common use case case class Person(id: Long, name: String) import org.apache.spark.sql.Encoders val schema = Encoders.product[Person].schema import org.apache.spark.sql.catalyst.encoders.{RowEncoder, ExpressionEncoder} import org.apache.spark.sql.Row val encoder: ExpressionEncoder[Row] = RowEncoder.apply(schema).resolveAndBind() import org.apache.spark.sql.catalyst.InternalRow val row = InternalRow(1, "Jacek") val deserializer = encoder.deserializer scala> deserializer.eval(row) java.lang.UnsupportedOperationException: Only code-generated evaluation is supported at org.apache.spark.sql.catalyst.expressions.objects.CreateExternalRow.eval(objects.scala:1105) ... 54 elided import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext val ctx = new CodegenContext val code = deserializer.genCode(ctx).code |
Note
|
|