ExpressionEncoder — Expression-Based Encoder
ExpressionEncoder[T] is a generic Encoder of JVM objects of the type T to and from internal binary rows.
ExpressionEncoder[T] uses expressions for a serializer and a deserializer.
|
Note
|
ExpressionEncoder is the only supported implementation of Encoder which is explicitly enforced when Dataset is created (even though Dataset data structure accepts a bare Encoder[T]).
|
|
1 2 3 4 5 6 7 8 9 10 11 12 |
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder val stringEncoder = ExpressionEncoder[String] scala> val row = stringEncoder.toRow("hello world") row: org.apache.spark.sql.catalyst.InternalRow = [0,100000000b,6f77206f6c6c6568,646c72] import org.apache.spark.sql.catalyst.expressions.UnsafeRow scala> val unsafeRow = row match { case ur: UnsafeRow => ur } unsafeRow: org.apache.spark.sql.catalyst.expressions.UnsafeRow = [0,100000000b,6f77206f6c6c6568,646c72] |
ExpressionEncoder uses serializer expressions to encode (aka serialize) a JVM object of type T to an internal binary row format (i.e. InternalRow).
|
Note
|
It is assumed that all serializer expressions contain at least one and the same BoundReference. |
ExpressionEncoder uses a deserializer expression to decode (aka deserialize) a JVM object of type T from internal binary row format.
ExpressionEncoder is flat when serializer uses a single expression (which also means that the objects of a type T are not created using constructor parameters only like Product or DefinedByConstructorParams types).
Internally, a ExpressionEncoder creates a UnsafeProjection (for the input serializer), a InternalRow (of size 1), and a safe Projection (for the input deserializer). They are all internal lazy attributes of the encoder.
| Property | Description |
|---|---|
|
Used exclusively when |
|
|
Used exclusively when |
|
|
Used…FIXME |
|
Note
|
Encoders object contains the default ExpressionEncoders for Scala and Java primitive types, e.g. boolean, long, String, java.sql.Date, java.sql.Timestamp, Array[Byte].
|
Creating ExpressionEncoder — apply Method
|
1 2 3 4 5 |
apply[T : TypeTag](): ExpressionEncoder[T] |
|
Caution
|
FIXME |
Creating ExpressionEncoder Instance
ExpressionEncoder takes the following when created:
-
Serializer expressions (to convert objects of type
Tto internal rows) -
Deserializer expression (to convert internal rows to objects of type
T) -
Scala’s ClassTag for the JVM type
T
Creating Deserialize Expression — ScalaReflection.deserializerFor Method
|
1 2 3 4 5 |
deserializerFor[T: TypeTag]: Expression |
deserializerFor creates an expression to deserialize from internal binary row format to a Scala object of type T.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import org.apache.spark.sql.catalyst.ScalaReflection.deserializerFor val timestampDeExpr = deserializerFor[java.sql.Timestamp] scala> println(timestampDeExpr.numberedTreeString) 00 staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - root class: "java.sql.Timestamp"), true) 01 +- upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - root class: "java.sql.Timestamp") 02 +- getcolumnbyordinal(0, TimestampType) val tuple2DeExpr = deserializerFor[(java.sql.Timestamp, Double)] scala> println(tuple2DeExpr.numberedTreeString) 00 newInstance(class scala.Tuple2) 01 :- staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - field (class: "java.sql.Timestamp", name: "_1"), - root class: "scala.Tuple2"), true) 02 : +- upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - field (class: "java.sql.Timestamp", name: "_1"), - root class: "scala.Tuple2") 03 : +- getcolumnbyordinal(0, TimestampType) 04 +- upcast(getcolumnbyordinal(1, DoubleType), DoubleType, - field (class: "scala.Double", name: "_2"), - root class: "scala.Tuple2") 05 +- getcolumnbyordinal(1, DoubleType) |
Internally, deserializerFor calls the recursive internal variant of deserializerFor with a single-element walked type path with - root class: "[clsName]"
|
Tip
|
Read up on Scala’s TypeTags in TypeTags and Manifests.
|
|
Note
|
deserializerFor is used exclusively when ExpressionEncoder is created for a Scala type T.
|
Recursive Internal deserializerFor Method
|
1 2 3 4 5 6 7 8 |
deserializerFor( tpe: `Type`, path: Option[Expression], walkedTypePath: Seq[String]): Expression |
| JVM Type (Scala or Java) | Deserialize Expressions |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
User Defined Types (UDTs) |
|
Creating Serialize Expression — ScalaReflection.serializerFor Method
|
1 2 3 4 5 |
serializerFor[T: TypeTag](inputObject: Expression): CreateNamedStruct |
serializerFor creates a CreateNamedStruct expression to serialize a Scala object of type T to internal binary row format.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import org.apache.spark.sql.catalyst.ScalaReflection.serializerFor import org.apache.spark.sql.catalyst.expressions.BoundReference import org.apache.spark.sql.types.TimestampType val boundRef = BoundReference(ordinal = 0, dataType = TimestampType, nullable = true) val timestampSerExpr = serializerFor[java.sql.Timestamp](boundRef) scala> println(timestampSerExpr.numberedTreeString) 00 named_struct(value, input[0, timestamp, true]) 01 :- value 02 +- input[0, timestamp, true] |
Internally, serializerFor calls the recursive internal variant of serializerFor with a single-element walked type path with - root class: "[clsName]" and pattern match on the result expression.
|
Caution
|
FIXME the pattern match part |
|
Tip
|
Read up on Scala’s TypeTags in TypeTags and Manifests.
|
|
Note
|
serializerFor is used exclusively when ExpressionEncoder is created for a Scala type T.
|
Recursive Internal serializerFor Method
|
1 2 3 4 5 6 7 8 9 |
serializerFor( inputObject: Expression, tpe: `Type`, walkedTypePath: Seq[String], seenTypeSet: Set[`Type`] = Set.empty): Expression |
serializerFor creates an expression for serializing an object of type T to an internal row.
|
Caution
|
FIXME |
Encoding JVM Object to Internal Binary Row Format — toRow Method
|
1 2 3 4 5 |
toRow(t: T): InternalRow |
toRow encodes (aka serializes) a JVM object t as an internal binary row.
Internally, toRow sets the only JVM object to be t in inputRow and converts the inputRow to a unsafe binary row (using extractProjection).
In case of any exception while serializing, toRow reports a RuntimeException:
|
1 2 3 4 5 6 |
Error while encoding: [initial exception] [multi-line serializer] |
|
Note
|
|
Decoding JVM Object From Internal Binary Row Format — fromRow Method
|
1 2 3 4 5 |
fromRow(row: InternalRow): T |
fromRow decodes (aka deserializes) a JVM object from a row InternalRow (with the required values only).
Internally, fromRow uses constructProjection with row and gets the 0th element of type ObjectType that is then cast to the output type T.
In case of any exception while deserializing, fromRow reports a RuntimeException:
|
1 2 3 4 5 6 |
Error while decoding: [initial exception] [deserializer] |
|
Note
|
|
Creating ExpressionEncoder For Tuple — tuple Method
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] tuple[T](e: ExpressionEncoder[T]): ExpressionEncoder[Tuple1[T]] tuple[T1, T2]( e1: ExpressionEncoder[T1], e2: ExpressionEncoder[T2]): ExpressionEncoder[(T1, T2)] tuple[T1, T2, T3]( e1: ExpressionEncoder[T1], e2: ExpressionEncoder[T2], e3: ExpressionEncoder[T3]): ExpressionEncoder[(T1, T2, T3)] tuple[T1, T2, T3, T4]( e1: ExpressionEncoder[T1], e2: ExpressionEncoder[T2], e3: ExpressionEncoder[T3], e4: ExpressionEncoder[T4]): ExpressionEncoder[(T1, T2, T3, T4)] tuple[T1, T2, T3, T4, T5]( e1: ExpressionEncoder[T1], e2: ExpressionEncoder[T2], e3: ExpressionEncoder[T3], e4: ExpressionEncoder[T4], e5: ExpressionEncoder[T5]): ExpressionEncoder[(T1, T2, T3, T4, T5)] |
tuple…FIXME
|
Note
|
tuple is used when…FIXME
|
resolveAndBind Method
|
1 2 3 4 5 6 7 |
resolveAndBind( attrs: Seq[Attribute] = schema.toAttributes, analyzer: Analyzer = SimpleAnalyzer): ExpressionEncoder[T] |
resolveAndBind…FIXME
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
// A very common use case case class Person(id: Long, name: String) import org.apache.spark.sql.Encoders val schema = Encoders.product[Person].schema import org.apache.spark.sql.catalyst.encoders.{RowEncoder, ExpressionEncoder} import org.apache.spark.sql.Row val encoder: ExpressionEncoder[Row] = RowEncoder.apply(schema).resolveAndBind() import org.apache.spark.sql.catalyst.InternalRow val row = InternalRow(1, "Jacek") val deserializer = encoder.deserializer scala> deserializer.eval(row) java.lang.UnsupportedOperationException: Only code-generated evaluation is supported at org.apache.spark.sql.catalyst.expressions.objects.CreateExternalRow.eval(objects.scala:1105) ... 54 elided import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext val ctx = new CodegenContext val code = deserializer.genCode(ctx).code |
|
Note
|
|
spark技术分享