Encoder — Internal Row Converter
Encoder is the fundamental concept in the serialization and deserialization (SerDe) framework in Spark SQL 2.0. Spark SQL uses the SerDe framework for IO to make it efficient time- and space-wise.
Tip
|
Spark has borrowed the idea from the Hive SerDe library so it might be worthwhile to get familiar with Hive a little bit, too. |
Encoders are modelled in Spark SQL 2.0 as Encoder[T]
trait.
1 2 3 4 5 6 7 8 |
trait Encoder[T] extends Serializable { def schema: StructType def clsTag: ClassTag[T] } |
The type T
stands for the type of records a Encoder[T]
can deal with. An encoder of type T
, i.e. Encoder[T]
, is used to convert (encode and decode) any JVM object or primitive of type T
(that could be your domain object) to and from Spark SQL’s InternalRow which is the internal binary row format representation (using Catalyst expressions and code generation).
Note
|
Encoder is also called “a container of serde expressions in Dataset”.
|
Note
|
The one and only implementation of the Encoder trait in Spark SQL 2 is ExpressionEncoder.
|
Encoders are integral (and internal) part of any Dataset[T] (of records of type T
) with a Encoder[T]
that is used to serialize and deserialize the records of this dataset.
Note
|
Dataset[T] type is a Scala type constructor with the type parameter T . So is Encoder[T] that handles serialization and deserialization of T to the internal representation.
|
Encoders know the schema of the records. This is how they offer significantly faster serialization and deserialization (comparing to the default Java or Kryo serializers).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
// The domain object for your records in a large dataset case class Person(id: Long, name: String) import org.apache.spark.sql.Encoders scala> val personEncoder = Encoders.product[Person] personEncoder: org.apache.spark.sql.Encoder[Person] = class[id[0]: bigint, name[0]: string] scala> personEncoder.schema res0: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(name,StringType,true)) scala> personEncoder.clsTag res1: scala.reflect.ClassTag[Person] = Person import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder scala> val personExprEncoder = personEncoder.asInstanceOf[ExpressionEncoder[Person]] personExprEncoder: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[Person] = class[id[0]: bigint, name[0]: string] // ExpressionEncoders may or may not be flat scala> personExprEncoder.flat res2: Boolean = false // The Serializer part of the encoder scala> personExprEncoder.serializer res3: Seq[org.apache.spark.sql.catalyst.expressions.Expression] = List(assertnotnull(input[0, Person, true], top level non-flat input object).id AS id#0L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, Person, true], top level non-flat input object).name, true) AS name#1) // The Deserializer part of the encoder scala> personExprEncoder.deserializer res4: org.apache.spark.sql.catalyst.expressions.Expression = newInstance(class Person) scala> personExprEncoder.namedExpressions res5: Seq[org.apache.spark.sql.catalyst.expressions.NamedExpression] = List(assertnotnull(input[0, Person, true], top level non-flat input object).id AS id#2L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, Person, true], top level non-flat input object).name, true) AS name#3) // A record in a Dataset[Person] // A mere instance of Person case class // There could be a thousand of Person in a large dataset val jacek = Person(0, "Jacek") // Serialize a record to the internal representation, i.e. InternalRow scala> val row = personExprEncoder.toRow(jacek) row: org.apache.spark.sql.catalyst.InternalRow = [0,0,1800000005,6b6563614a] // Spark uses InternalRows internally for IO // Let's deserialize it to a JVM object, i.e. a Scala object import org.apache.spark.sql.catalyst.dsl.expressions._ // in spark-shell there are competing implicits // That's why DslSymbol is used explicitly in the following line scala> val attrs = Seq(DslSymbol('id).long, DslSymbol('name).string) attrs: Seq[org.apache.spark.sql.catalyst.expressions.AttributeReference] = List(id#8L, name#9) scala> val jacekReborn = personExprEncoder.resolveAndBind(attrs).fromRow(row) jacekReborn: Person = Person(0,Jacek) // Are the jacek instances same? scala> jacek == jacekReborn res6: Boolean = true |
You can create custom encoders using static methods of Encoders
object. Note however that encoders for common Scala types and their product types are already available in implicits
object.
1 2 3 4 5 6 |
val spark = SparkSession.builder.getOrCreate() import spark.implicits._ |
Tip
|
The default encoders are already imported in spark-shell. |
Encoders map columns (of your dataset) to fields (of your JVM object) by name. It is by Encoders that you can bridge JVM objects to data sources (CSV, JDBC, Parquet, Avro, JSON, Cassandra, Elasticsearch, memsql) and vice versa.
Note
|
In Spark SQL 2.0 DataFrame type is a mere type alias for Dataset[Row] with RowEncoder being the encoder.
|
Creating Custom Encoders (Encoders object)
Encoders
factory object defines methods to create Encoder
instances.
Import org.apache.spark.sql
package to have access to the Encoders
factory object.
1 2 3 4 5 6 7 8 |
import org.apache.spark.sql.Encoders scala> Encoders.LONG res1: org.apache.spark.sql.Encoder[Long] = class[value[0]: bigint] |
You can find methods to create encoders for Java’s object types, e.g. Boolean
, Integer
, Long
, Double
, String
, java.sql.Timestamp
or Byte
array, that could be composed to create more advanced encoders for Java bean classes (using bean
method).
1 2 3 4 5 6 7 8 |
import org.apache.spark.sql.Encoders scala> Encoders.STRING res2: org.apache.spark.sql.Encoder[String] = class[value[0]: string] |
You can also create encoders based on Kryo or Java serializers.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import org.apache.spark.sql.Encoders case class Person(id: Int, name: String, speaksPolish: Boolean) scala> Encoders.kryo[Person] res3: org.apache.spark.sql.Encoder[Person] = class[value[0]: binary] scala> Encoders.javaSerialization[Person] res5: org.apache.spark.sql.Encoder[Person] = class[value[0]: binary] |
You can create encoders for Scala’s tuples and case classes, Int
, Long
, Double
, etc.
1 2 3 4 5 6 7 8 |
import org.apache.spark.sql.Encoders scala> Encoders.tuple(Encoders.scalaLong, Encoders.STRING, Encoders.scalaBoolean) res9: org.apache.spark.sql.Encoder[(Long, String, Boolean)] = class[_1[0]: bigint, _2[0]: string, _3[0]: boolean] |
Further Reading and Watching
-
(video) Modern Spark DataFrame and Dataset (Intermediate Tutorial) by Adam Breindel from Databricks.