概要
Spark 内存管理之Tungsten介绍了Tungsten项目中内存管理相关的部分,最后讲到使用sun.misc.Unsafe申请内存,返回的内存地址和大小封装为MemoryBlock,这篇关注Spark如何使用申请到的大块内存,也是就是BytesToBytesMap。
BytesToBytesMap简介
BytesToBytesMap是Spark实现的HashMap,降低了JVM的对象和GC开销。JDK中HashMap是基于数组加链表(JDK1.8使用红黑树优化)实现的,熟悉JDK中的HashMap,再理解BytesToBytesMap很容易。
RDD[(K, V)]相关的操作,例如aggregate、shuffle、sort等中间值的处理,底层就是基于BytesToBytesMap,或者类似的结构完成的,BytesToBytesMap对于Spark是非常重要的数据结构。
BytesToBytesMap实现
BytesToBytesMap的实现原理如下
LongArray
LongArray是long型数组,如下
每条记录,LongArray会占用两个long分别存放这条记录的fullKeyAddress和hashcode信息,其中fullKeyAddress又包含两部分,如下
fullKeyAddress | pageNum | Record所在的page信息,仅对on-heap有用,off-heap不需要关心page,只需要内存地址即可获得记录 |
OffsetAddress | Record的内存地址 | |
hashcode | Record对应的hashcode,使用key相关信息生成(keyBase, keyOffset, keyLength) |
dataPages
dataPages是一个数组,维护Spark 内存管理之Tungsten中提到的MemoryBlock,如下
每个MemoryBlock被称为一个Page。
Record
如原理图中所示,每条记录包含了5部分信息,以及其占用空间大小,其中len(k)和len(v)是8 bytes的整数倍。最后一部分pointer to next pair是spark 2.1版本中添加的,为了支持一个K对应多个V(参考[SPARK-14052] [SQL] build a BytesToBytesMap directly in HashedRelation)。
插入及读取数据
和JDK的HashMap实现原理几乎一致,读取数据流程如下
- 根据Key相关信息计算hashcode,hashcode & 容量 得到存储Record的地址i。
- 如LongArray中介绍,2 * i存放的是fullKeyAddress。
- 判断fullKeyAddress处的值,若为0则表示i这个位置没有值,返回Location对象,Location包含了读取及插入i处数据的操作。
- 若2 * i处不为0,表示i位置已插入值,接下来判断2 * i + 1处的存储的hashcode和生成的hashcode是否相等,以及key相关信息是否相等(相当于JDK HashMap调用equals方法)。
- 若4中判断相等,找到对应位置返回Location对象,可以插入及读取数据,BytesToBytesMap不支持删除数据。
- 若4中判断不相等,说明发生了hash冲突,和JDK HashMap使用链表解决hash冲突不同,BytesToBytesMap使用开放定址法,即i + 1探测下一个位置,重复步骤2。
- 最后,若是首次插入数据,插入当前的Page,根据pageNum和内存地址OffsetAddress生成fullKeyAddress。
hash冲突的处理
BytesToBytesMap使用开放地址法解决hash冲突,即探测当前位置的下一个位置。
MapIterator
和HashMap相似,BytesToBytesMap提供了迭代器MapIterator,遍历其所有元素。
最大容量
BytesToBytesMap支持最多2^29个key,2^30是小于Integer.MAX_VALUE的2的最大指数,同时每条记录LongArray需要两个位置存储对应信息,所以最多支持2^29,如果key的数量大于2^29,应考虑UnsafeExternalSorter,效率可能会更优。
总结
介绍Spark中集合BytesToBytesMap的实现原理,BytesToBytesMap是Spark中处理K-V数据的重要集合,此外,Spark中另一个集合AppendOnlyMap,实现原理和BytesToBytesMap完全一致,不同的是AppendOnlyMap使用JDK的Array数组存储数据。