首发个人公众号 spark技术分享 , 同步个人网站 coolplayer.net ,未经本人同意,禁止一切转载
很久之前的 这篇文章 里面就应该写上了,但是觉得这部分实在太有趣,就单开一篇。
因为 UnsafeShuffleWriter 涉及到统一内存管理,对这块不甚了解的可以参考之前的 这篇文章
整体流程
UnsafeShuffleWriter 里面维护着一个 ShuffleExternalSorter, 用来做外部排序, 我在上一篇文章里面已经讲过什么是外部排序了, 外部排序就是要先部分排序数据并把数据输出到磁盘,然后最后再进行merge 全局排序, 既然这里也是外部排序,跟 SortShuffleWriter 有什么区别呢, 这里只根据 record 的 partition id 先在内存 ShuffleInMemorySorter 中进行排序, 排好序的数据经过序列化压缩输出到换一个临时文件的一段,并且记录每个分区段的seek位置,方便后续可以单独读取每个分区的数据,读取流经过解压反序列化,就可以正常读取了。
整个过程就是不断地在 ShuffleInMemorySorter 插入数据,如果没有内存就申请内存,如果申请不到内存就 spill 到文件中,最终合并成一个 依据 partition id 全局有序 的大文件。
SortShuffleWriter 和 UnsafeShuffleWriter 对比
区别 | UnsafeShuffleWriter | SortShuffleWriter |
---|---|---|
排序方式 | 最终只是 partition 级别的排序 | 先 partition 排序,相同分区 key有序 |
aggregation | 没有饭序列化,没有aggregation | 支持 aggregation |
使用 UnsafeShuffleWriter 的条件
-
没有指定 aggregation 或者key排序, 因为 key 没有编码到排序指针中,所以只有 partition 级别的排序
-
原始数据首先被序列化处理,并且再也不需要反序列,在其对应的元数据被排序后,需要Serializer支持relocation,在指定位置读取对应数据。 KryoSerializer 和 spark sql 自定义的序列化器 支持这个特性。
-
分区数目必须小于 16777216 ,因为 partition number 使用24bit 表示的。
-
以为每个分区使用 27 位来表示 record offset, 所以一个 record 不能大于这个值。
内存排序并输出文件
我们不妨看向对记录排序的例子。一个标准的排序步骤需要为记录储存一组的指针,并使用quicksort 来互换指针直到所有记录被排序。基于顺序扫描的特性,排序通常能获得一个不错的缓存命中率。然而,排序一组指针的缓存命中率却很低,因为每个比较运算都需要对两个指针解引用,而这两个指针对应的却是内存中两个随机位置的数据。
那么,我们该如何提高排序中的缓存本地性?其中一个方法就是通过指针顺序地储存每个记录的sort key。我们使用 8个字节(partition id 作为 key, 和数据真正的指针)来代表一条数据,放在一个 sort array 中,每次对比排序的操作只需要线性的查找每对pointer-key,从而不会产生任何的随机扫描。 这样如果对所有记录的 partion 进行排序的时候, 直接对这个数据里面的进行排序,就好了,极大的提高了性能。
当然 这里对数据排序, UnsafeShuffleWriter 使用的是 RadixSort, 这个很简单,我就不介绍了, 不同清楚的可以参考下 这个文档 http://bubkoo.com/2014/01/15/sort-algorithm/radix-sort/
上面是申请内存的过程,申请到的内存作为 一个 page 记录在 allocatedPages 中,spill的时候进行 free 这些内存, 有一个当前使用的 currentPage, 如果不够用了,就继续去申请。
大家可以看下上面的图, 每次插入一条 record 到page 中, 就把 partionId + pageNumber + offset in page, 作为一个元素插入到 LongArray 中, 最终读取数据的时候, 对LongArray 进行 RadixSort 排序, 排序后依次根据指针元素索引原始数据,就做到 partition 级别有序了。
spill 文件的时候, UnsafeShuffleInMemorySorter 生成一个数据迭代器, 会返回一个根据partition id 排过序迭代器,该迭代器粒度每个元素就是一个指针,对应 PackedRecordPointer 这个数据结构, 这个 PackedRecordPointer 定义的数据结构就是 [24 bit partition number][13 bit memory page number][27 bit offset in page]
然后到根据该指针可以拿到真实的record, 在一开始进入UnsafeShuffleExternalSorter 就已经被序列化了,所以在这里就纯粹变成写字节数组了。一个文件里不同的partiton的数据用fileSegment来表示,对应的信息存在 SpillInfo 数据结构中。
合并文件
每个spill 文件的分区索引都保存在 SpillInfo 数据结构中, Task结束前,我们要做一次mergeSpills操作, 如果 fastMergeEnabled 并且压缩方式支持 concatenation of compressed data, 就可以直接 简单地连接相同分区的压缩数据到一起,而且不用解压反序列化。使用一种高效的数据拷贝技术,比如 NIO’s transferTo 就可以避免解压和 buffer 拷贝。
欢迎关注 spark技术分享
原文始发于微信公众号(spark技术分享):彻底搞懂spark的shuffle过程(shuffle writer 的 UnsafeShuffleWriter)