scala> spark.range(4).map(n => (n, n % 2)).groupByKey(_._2).agg(typed.sum(_._2)).explain(true)
== Parsed Logical Plan ==
'Aggregate [value#454L], [value#454L, unresolvedalias(typedsumdouble(org.apache.spark.sql.execution.aggregate.TypedSumDouble@4fcb0de4, Some(unresolveddeserializer(newInstance(class scala.Tuple2), _1#450L, _2#451L)), Some(class scala.Tuple2), Some(StructType(StructField(_1,LongType,true), StructField(_2,LongType,false))), input[0, double, true] AS value#457, unresolveddeserializer(upcast(getcolumnbyordinal(0, DoubleType), DoubleType, - root class: "scala.Double"), value#457), input[0, double, true] AS value#456, DoubleType, DoubleType, false), Some(<function1>))]
+- AppendColumns <function1>, class scala.Tuple2, [StructField(_1,LongType,true), StructField(_2,LongType,false)], newInstance(class scala.Tuple2), [input[0, bigint, true] AS value#454L]
+- SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._1.longValue AS _1#450L, assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._2 AS _2#451L]
+- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#449: scala.Tuple2
+- DeserializeToObject newInstance(class java.lang.Long), obj#448: java.lang.Long
+- Range (0, 4, step=1, splits=Some(8))
== Analyzed Logical Plan ==
value: bigint, TypedSumDouble(scala.Tuple2): double
Aggregate [value#454L], [value#454L, typedsumdouble(org.apache.spark.sql.execution.aggregate.TypedSumDouble@4fcb0de4, Some(newInstance(class scala.Tuple2)), Some(class scala.Tuple2), Some(StructType(StructField(_1,LongType,true), StructField(_2,LongType,false))), input[0, double, true] AS value#457, cast(value#457 as double), input[0, double, true] AS value#456, DoubleType, DoubleType, false) AS TypedSumDouble(scala.Tuple2)#462]
+- AppendColumns <function1>, class scala.Tuple2, [StructField(_1,LongType,true), StructField(_2,LongType,false)], newInstance(class scala.Tuple2), [input[0, bigint, true] AS value#454L]
+- SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._1.longValue AS _1#450L, assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._2 AS _2#451L]
+- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#449: scala.Tuple2
+- DeserializeToObject newInstance(class java.lang.Long), obj#448: java.lang.Long
+- Range (0, 4, step=1, splits=Some(8))
== Optimized Logical Plan ==
Aggregate [value#454L], [value#454L, typedsumdouble(org.apache.spark.sql.execution.aggregate.TypedSumDouble@4fcb0de4, Some(newInstance(class scala.Tuple2)), Some(class scala.Tuple2), Some(StructType(StructField(_1,LongType,true), StructField(_2,LongType,false))), input[0, double, true] AS value#457, value#457, input[0, double, true] AS value#456, DoubleType, DoubleType, false) AS TypedSumDouble(scala.Tuple2)#462]
+- AppendColumnsWithObject <function1>, [assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._1.longValue AS _1#450L, assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._2 AS _2#451L], [input[0, bigint, true] AS value#454L]
+- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#449: scala.Tuple2
+- DeserializeToObject newInstance(class java.lang.Long), obj#448: java.lang.Long
+- Range (0, 4, step=1, splits=Some(8))
== Physical Plan ==
*HashAggregate(keys=[value#454L], functions=[typedsumdouble(org.apache.spark.sql.execution.aggregate.TypedSumDouble@4fcb0de4, Some(newInstance(class scala.Tuple2)), Some(class scala.Tuple2), Some(StructType(StructField(_1,LongType,true), StructField(_2,LongType,false))), input[0, double, true] AS value#457, value#457, input[0, double, true] AS value#456, DoubleType, DoubleType, false)], output=[value#454L, TypedSumDouble(scala.Tuple2)#462])
+- Exchange hashpartitioning(value#454L, 200)
+- *HashAggregate(keys=[value#454L], functions=[partial_typedsumdouble(org.apache.spark.sql.execution.aggregate.TypedSumDouble@4fcb0de4, Some(newInstance(class scala.Tuple2)), Some(class scala.Tuple2), Some(StructType(StructField(_1,LongType,true), StructField(_2,LongType,false))), input[0, double, true] AS value#457, value#457, input[0, double, true] AS value#456, DoubleType, DoubleType, false)], output=[value#454L, value#463])
+- AppendColumnsWithObject <function1>, [assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._1.longValue AS _1#450L, assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._2 AS _2#451L], [input[0, bigint, true] AS value#454L]
+- MapElements <function1>, obj#449: scala.Tuple2
+- DeserializeToObject newInstance(class java.lang.Long), obj#448: java.lang.Long
+- *Range (0, 4, step=1, splits=Some(8))