关注 spark技术分享,
撸spark源码 玩spark最佳实践

彻底搞懂spark的shuffle过程(shuffle writer 的 UnsafeShuffleWriter)

首发个人公众号  spark技术分享 ,  同步个人网站  coolplayer.net ,未经本人同意,禁止一切转载


很久之前的  这篇文章  里面就应该写上了,但是觉得这部分实在太有趣,就单开一篇。

因为 UnsafeShuffleWriter 涉及到统一内存管理,对这块不甚了解的可以参考之前的 这篇文章

整体流程

UnsafeShuffleWriter 里面维护着一个 ShuffleExternalSorter, 用来做外部排序, 我在上一篇文章里面已经讲过什么是外部排序了,  外部排序就是要先部分排序数据并把数据输出到磁盘,然后最后再进行merge 全局排序, 既然这里也是外部排序,跟 SortShuffleWriter 有什么区别呢, 这里只根据 record 的 partition id 先在内存 ShuffleInMemorySorter 中进行排序, 排好序的数据经过序列化压缩输出到换一个临时文件的一段,并且记录每个分区段的seek位置,方便后续可以单独读取每个分区的数据,读取流经过解压反序列化,就可以正常读取了。

整个过程就是不断地在 ShuffleInMemorySorter 插入数据,如果没有内存就申请内存,如果申请不到内存就 spill 到文件中,最终合并成一个 依据 partition id 全局有序 的大文件。

SortShuffleWriter 和  UnsafeShuffleWriter 对比

区别 UnsafeShuffleWriter SortShuffleWriter
排序方式 最终只是 partition 级别的排序 先 partition 排序,相同分区 key有序
aggregation 没有饭序列化,没有aggregation 支持 aggregation

使用 UnsafeShuffleWriter 的条件

  • 没有指定 aggregation 或者key排序, 因为 key 没有编码到排序指针中,所以只有 partition 级别的排序

  • 原始数据首先被序列化处理,并且再也不需要反序列,在其对应的元数据被排序后,需要Serializer支持relocation,在指定位置读取对应数据。 KryoSerializer 和 spark sql 自定义的序列化器 支持这个特性。

  • 分区数目必须小于 16777216 ,因为 partition number 使用24bit 表示的。

  • 以为每个分区使用 27 位来表示 record offset, 所以一个 record 不能大于这个值。

内存排序并输出文件


彻底搞懂spark的shuffle过程(shuffle writer 的 UnsafeShuffleWriter)

我们不妨看向对记录排序的例子。一个标准的排序步骤需要为记录储存一组的指针,并使用quicksort 来互换指针直到所有记录被排序。基于顺序扫描的特性,排序通常能获得一个不错的缓存命中率。然而,排序一组指针的缓存命中率却很低,因为每个比较运算都需要对两个指针解引用,而这两个指针对应的却是内存中两个随机位置的数据。

那么,我们该如何提高排序中的缓存本地性?其中一个方法就是通过指针顺序地储存每个记录的sort key。我们使用 8个字节(partition id 作为 key, 和数据真正的指针)来代表一条数据,放在一个 sort array 中,每次对比排序的操作只需要线性的查找每对pointer-key,从而不会产生任何的随机扫描。 这样如果对所有记录的 partion 进行排序的时候, 直接对这个数据里面的进行排序,就好了,极大的提高了性能。

当然 这里对数据排序, UnsafeShuffleWriter 使用的是 RadixSort, 这个很简单,我就不介绍了, 不同清楚的可以参考下 这个文档 http://bubkoo.com/2014/01/15/sort-algorithm/radix-sort/

彻底搞懂spark的shuffle过程(shuffle writer 的 UnsafeShuffleWriter)


上面是申请内存的过程,申请到的内存作为 一个 page 记录在  allocatedPages 中,spill的时候进行 free 这些内存, 有一个当前使用的 currentPage, 如果不够用了,就继续去申请。


彻底搞懂spark的shuffle过程(shuffle writer 的 UnsafeShuffleWriter)


大家可以看下上面的图, 每次插入一条 record 到page 中, 就把 partionId + pageNumber + offset in page, 作为一个元素插入到 LongArray 中, 最终读取数据的时候, 对LongArray 进行 RadixSort 排序,  排序后依次根据指针元素索引原始数据,就做到 partition 级别有序了。

彻底搞懂spark的shuffle过程(shuffle writer 的 UnsafeShuffleWriter)


spill 文件的时候, UnsafeShuffleInMemorySorter 生成一个数据迭代器, 会返回一个根据partition id 排过序迭代器,该迭代器粒度每个元素就是一个指针,对应 PackedRecordPointer 这个数据结构, 这个 PackedRecordPointer 定义的数据结构就是  [24 bit partition number][13 bit memory page number][27 bit offset in page]  然后到根据该指针可以拿到真实的record, 在一开始进入UnsafeShuffleExternalSorter 就已经被序列化了,所以在这里就纯粹变成写字节数组了。一个文件里不同的partiton的数据用fileSegment来表示,对应的信息存在 SpillInfo 数据结构中。

合并文件


彻底搞懂spark的shuffle过程(shuffle writer 的 UnsafeShuffleWriter)


每个spill 文件的分区索引都保存在 SpillInfo 数据结构中, Task结束前,我们要做一次mergeSpills操作, 如果 fastMergeEnabled  并且压缩方式支持 concatenation of compressed data, 就可以直接 简单地连接相同分区的压缩数据到一起,而且不用解压反序列化。使用一种高效的数据拷贝技术,比如  NIO’s transferTo 就可以避免解压和 buffer 拷贝。

欢迎关注 spark技术分享            

                       彻底搞懂spark的shuffle过程(shuffle writer 的 UnsafeShuffleWriter)



原文始发于微信公众号(spark技术分享):彻底搞懂spark的shuffle过程(shuffle writer 的 UnsafeShuffleWriter)

赞(0) 打赏
未经允许不得转载:spark技术分享 » 彻底搞懂spark的shuffle过程(shuffle writer 的 UnsafeShuffleWriter)
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏