第32课:彻底解密Spark 2.1.X中Shuffle 下Task视角内存分配管理
本文根据家林大神系列课程编写 http://weibo.com/ilovepains
Spark 2.1.X内存管理包含2种类型:统一内存管理 UnifiedMemoryManager、静态内存StaticMemoryManager。这两种内存的管理方式最终要落实到Task的运行。我们先从源码角度对Spark内存管理进行回顾,从Spark Task的视角解析Task运行内存管理源码。在Spark 2.1.X中默认使用的UnifiedMemoryManager的统一内存管理的方式,从Task运行的角度来讲是Execution级别的,也就是UnifiedMemoryManager的核心是Execution Memory。Execution Memory主要做的事情是Shuffle、Sort、Aggregate等。我们为什么将Execution Memory的内存调大,理论上讲,Execution Memory的内存越大,IO就越来越少。Execution Memory相对于内存的占用比较强势。当Execution Memory空间不足而且Storage Memory空间也不足的时候,Storage Memory空间会被强制drop掉一部分数据来解决Execution 的空间不足问题。
Spark2.1.X内存管理UnifiedMemoryManager的管理方式从源码的角度看有2个核心方法:
- acquireExecutionMemory
- acquireStorageMemory
我们看一下acquireExecutionMemory的源代码:
override private[memory] def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
assertInvariants()
assert(numBytes >= 0)
val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
case MemoryMode.ON_HEAP => (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
onHeapStorageRegionSize,
maxHeapMemory)
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
offHeapStorageMemory,
maxOffHeapMemory)
}
acquireExecutionMemory根据ON_HEAP、OFF_HEAP两种不同的方式进行模式匹配,匹配到2种不同方式的时候,有自己的执行方式。内部onHeapExecutionMemoryPool, onHeapStorageMemoryPool无论是哪种方式都会调maybeGrowExecutionPool。
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
if (extraMemoryNeeded > 0) {
// There is not enough free memory in the execution pool, so try to reclaim memory from
// storage. We can reclaim any free memory from the storage pool. If the storage pool
// has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
// the memory that storage has borrowed from execution.
val memoryReclaimableFromStorage = math.max(
storagePool.memoryFree,
storagePool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
// Only reclaim as much space as is necessary and available:
val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
storagePool.decrementPoolSize(spaceToReclaim)
executionPool.incrementPoolSize(spaceToReclaim)
}
}
}
每一个Task能够使用的内存理论上是多大呢?这是很重要的一件事情,因为能推测每一个Task能使用的大小,计算公式为poolSize/(2*numActiveTasks)到 maxPollSize/numActiveTasks。
在maybeGrowExecutionPool方法中通过比较 storagePool.memoryFree和storagePool.poolSize -storageRegionSize的大小,计算一个最大值:storage Memory剩余的内存和storagememory从Executionmemory借来的内存哪个大。Storage和Execution在适当的时候可以借用彼此的memory,Storage Memory在Execution执行的时候,Execution memory有空间,Storage Memory缓存的时候向Execution memory借用一些空间。如果Execution memory内存不够,这个时候进行一个比较,整个Execution memory能借到的最大内存是Storage Memory曾经找Execution memory借的内存+Storage Memory空闲内存;根据math.min(extraMemoryNeeded,memoryReclaimableFromStorage)如果Executionmemory需要的大小小于能借到的最大内存,那以实际需要的内存为准。
我们可以看一下freeSpaceToShrinkPool方法的实现:
def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
val spaceFreedByEviction =
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
} else {
spaceFreedByReleasingUnusedMemory
}
}
在acquireStorageMemory方法分别调用executionPool.decrementPoolSize方法减少内存,及storagePool.incrementPoolSize方法增加内容:
l executionPool.decrementPoolSize(memoryBorrowedFromExecution)
l storagePool.incrementPoolSize(memoryBorrowedFromExecution)
/**
* Expands the pool by `delta` bytes.
*/
final def incrementPoolSize(delta: Long): Unit = lock.synchronized {
require(delta >= 0)
_poolSize += delta
}
/**
* Shrinks the pool by `delta` bytes.
*/
final def decrementPoolSize(delta: Long): Unit = lock.synchronized {
require(delta >= 0)
require(delta <= _poolSize)
require(_poolSize - delta >= memoryUsed)
_poolSize -= delta
}
接下来我们阐述Storage Memory。Storage Memory只有一种情况:在Execution memory空闲的时候,StorageMemory能借走Execution memory的空闲内存。
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
assertInvariants()
assert(numBytes >= 0)
val (executionPool, storagePool, maxMemory) = memoryMode match {
case MemoryMode.ON_HEAP => (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
maxOnHeapStorageMemory)
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
maxOffHeapMemory)
}
if (numBytes > maxMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxMemory bytes)")
return false
}
if (numBytes > storagePool.memoryFree) {
// There is not enough free memory in the storage pool, so try to borrow free memory from
// the execution pool.
val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
executionPool.decrementPoolSize(memoryBorrowedFromExecution)
storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
storagePool.acquireMemory(blockId, numBytes)
}
无论是storagePool还是executionPool,都要和memoryPoll打交道,我们看一下memoryPoll。
/**
* Manages bookkeeping for an adjustable-sized region of memory. This class is internal to
* the [[MemoryManager]]. See subclasses for more details.
*
* @param lock a [[MemoryManager]] instance, used for synchronization. We purposely erase the type
* to `Object` to avoid programming errors, since this object should only be used for
* synchronization purposes.
*/
private[memory] abstract class MemoryPool(lock: Object) {
@GuardedBy("lock")
private[this] var _poolSize: Long = 0
/**
* Returns the current size of the pool, in bytes.
*/
final def poolSize: Long = lock.synchronized {
_poolSize
}
/**
* Returns the amount of free memory in the pool, in bytes.
*/
final def memoryFree: Long = lock.synchronized {
_poolSize - memoryUsed
}
/**
* Expands the pool by `delta` bytes.
*/
final def incrementPoolSize(delta: Long): Unit = lock.synchronized {
require(delta >= 0)
_poolSize += delta
}
/**
* Shrinks the pool by `delta` bytes.
*/
final def decrementPoolSize(delta: Long): Unit = lock.synchronized {
require(delta >= 0)
require(delta <= _poolSize)
require(_poolSize - delta >= memoryUsed)
_poolSize -= delta
}
/**
* Returns the amount of used memory in this pool (in bytes).
*/
def memoryUsed: Long
}
我们看一下StorageMemoryPool,其对于内存的记录和管理,一方面是内存使用的记录,一方面是可调整大小内存(要么借进新的内存,要么借出内存)的管理。StorageMemoryPool有acquireMemory,releaseMemory等方法。我们在shuffle的时候,如果我们使用了UnifiedMemoryManager的新型内存管理方式,同时开启了OFF_HEAP,对我们内存的使用有很大的影响,如果有offHeapExecutionMemoryPool,这个时候还存不存在一个概念,从StorageMemory中获取内存?实际上不需要找StorageMemory要内存。
每个Task能够分配的内存大小:poolSize/(2*numActiveTasks) 到 maxPollSize/numActiveTasks。
l val maxMemoryPerTask = maxPoolSize / numActiveTasks
l val minMemoryPerTask = poolSize / (2 * numActiveTasks)
如果ShuffleTask计算比较复杂的情况,业务逻辑比较复杂,使用新型的UnifiedMemoryManager内存管理方式能取得较好的效果;但是如果计算的业务逻辑不复杂,shuffle计算比较简单,这个时候可以回退到旧的内存管理方式,使用StaticMemory Management效果会更好,因为缓存空间更大。
在这个基础之上,从Task的角度来看,Task有个TaskMemoryManager。TaskMemoryManager管理单个任务分配的内存。 TaskMemoryManager类中的大多数复杂情况涉及到64位长的off-heap 非堆地址编码。在off-heap 非堆模式下,内存可以直接寻址64位。在堆模式下,内存是对象内的基本对象引用和64位偏移的组合寻址。这是一个问题,当我们想存储指针的数据结构内的其他结构,如在hashmaps或排序缓冲区的记录内的指针。即使我们决定使用128位来解决内存问题,我们不能只存储基本对象的地址,由于堆内存存在GC,因此它不能保证保持稳定。相反,我们使用64位地址方式来编码记录指针:off-heap 非堆模式,只需存储原始地址,并在堆模式上使用的地址的上13位存储“页码”和较低的51位来存储此页内的偏移量。这些页数字是用来索引为“页表”阵内以MemoryManager来检索基本对象。 这使我们能够解决8192页。在堆模式下,最大页大小受最大长度的数组限制,我们能够解决8192 2^32 字节的地址,大约35百万兆字节内存。
TaskMemoryManager 肯定要申请内存,管理内存以及释放内存,需要一个MemoryBlock[]类型的内存的pageTable,对堆的内存的分配、释放,这个都是JVM进行管理的,我们new出一个MemoryBlock,只是对象的引用,不是具体内存空间的地址。堆内存有GC的问题,是JVM的死穴,那我们做堆外内存,JAVA提供了ByteBufferBlock的工具类。
接下来我们看一下MemoryLocation,MemoryLocation是内存位置:跟踪内存地址(off-heap 非堆地址分配);或者JVM对象的偏移量(in-heap堆分配)。
public class MemoryLocation {
@Nullable
Object obj;
long offset;
public MemoryLocation(@Nullable Object obj, long offset) {
this.obj = obj;
this.offset = offset;
}
public MemoryLocation() {
this(null, 0);
}
public void setObjAndOffset(Object newObj, long newOffset) {
this.obj = newObj;
this.offset = newOffset;
}
public final Object getBaseObject() {
return obj;
}
public final long getBaseOffset() {
return offset;
}
}
我们看一下MemoryBlock,MemoryBlock是一个连续的内存块,从{@link MemoryLocation} 开始具有固定大小的存储单元。pageNumber变量是可选的页码;当TaskMemoryManager分配内存使用MemoryBlock来表示页,pageNumber是公共变量,在不同的包里面可以通过TaskMemoryManager修改。
ublic class MemoryBlock extends MemoryLocation {
private final long length;
/**
* Optional page number; used when this MemoryBlock represents a page allocated by a
* TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager,
* which lives in a different package.
*/
public int pageNumber = -1;
public MemoryBlock(@Nullable Object obj, long offset, long length) {
super(obj, offset);
this.length = length;
}
/**
* Returns the size of the memory block.
*/
public long size() {
return length;
}
/**
* Creates a memory block pointing to the memory used by the long array.
*/
public static MemoryBlock fromLongArray(final long[] array) {
return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
}
/**
* Fills the memory block with the specified byte value.
*/
public void fill(byte value) {
Platform.setMemory(obj, offset, length, value);
}
}
接下来我们我们看一下UnsafeMemoryAllocator,一个简单的 {@linkMemoryAllocator} 使用 {@code Unsafe} 分配off-heap非堆内存分配。
public class UnsafeMemoryAllocator implements MemoryAllocator {
@Override
public MemoryBlock allocate(long size) throws OutOfMemoryError {
long address = Platform.allocateMemory(size);
MemoryBlock memory = new MemoryBlock(null, address, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
return memory;
}
@Override
public void free(MemoryBlock memory) {
assert (memory.obj == null) :
"baseObject not null; are you trying to use the off-heap allocator to free on-heap memory";
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
}
Platform.freeMemory(memory.offset);
}
}
再回到TaskMemoryManager,关注一下关键点:
1,pageTable:与操作系统的页表类似,此数组将页编号映射为基准对象指针,允许我们把哈希表的内部64位地址表示、以及baseObject+offset偏移表示,这样可以支持
堆地址和非堆地址。当使用“非堆地址分配”时,此映射中的每个条目将为“null”。使用堆分配器时,此映射中的项将指向页的基准对象。当新数据页被分配时,条目将被添加到该map。
2,PAGE_TABLE_SIZE是页表中的条目数。
3,PAGE_NUMBER_BITS:用于处理页表的位数。
OFFSET_BITS:用于在数据页中编码偏移量的位数。实际为51
public class TaskMemoryManager {
private static final Logger logger = LoggerFactory.getLogger(TaskMemoryManager.class);
/** The number of bits used to address the page table. */
private static final int PAGE_NUMBER_BITS = 13;
/** The number of bits used to encode offsets in data pages. */
@VisibleForTesting
static final int OFFSET_BITS = 64 - PAGE_NUMBER_BITS; // 51
/** The number of entries in the page table. */
private static final int PAGE_TABLE_SIZE = 1 << PAGE_NUMBER_BITS;
/**
* Maximum supported data page size (in bytes). In principle, the maximum addressable page size is
* (1L << OFFSET_BITS) bytes, which is 2+ petabytes. However, the on-heap allocator's
* maximum page size is limited by the maximum amount of data that can be stored in a long[]
* array, which is (2^32 - 1) * 8 bytes (or 16 gigabytes). Therefore, we cap this at 16 gigabytes.
*/
public static final long MAXIMUM_PAGE_SIZE_BYTES = ((1L << 31) - 1) * 8L;
/** Bit mask for the lower 51 bits of a long. */
private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;