设为首页 加入收藏

TOP

第32课:彻底解密Spark 2.1.X中Shuffle 下Task视角内存分配管理
2019-03-29 01:22:50 】 浏览:73
Tags:彻底 解密 Spark 2.1.X Shuffle Task 视角 内存 分配 管理
版权声明:王家林大咖2018年新书《SPARK大数据商业实战三部曲》清华大学出版,微信公众号:从零起步学习人工智能 https://blog.csdn.net/duan_zhihua/article/details/71438570

第32课:彻底解密Spark 2.1.X中Shuffle 下Task视角内存分配管理

本文根据家林大神系列课程编写 http://weibo.com/ilovepains

Spark 2.1.X内存管理包含2种类型:统一内存管理 UnifiedMemoryManager、静态内存StaticMemoryManager。这两种内存的管理方式最终要落实到Task的运行。我们先从源码角度对Spark内存管理进行回顾,从Spark Task的视角解析Task运行内存管理源码。在Spark 2.1.X中默认使用的UnifiedMemoryManager的统一内存管理的方式,从Task运行的角度来讲是Execution级别的,也就是UnifiedMemoryManager的核心是Execution Memory。Execution Memory主要做的事情是Shuffle、Sort、Aggregate等。我们为什么将Execution Memory的内存调大,理论上讲,Execution Memory的内存越大,IO就越来越少。Execution Memory相对于内存的占用比较强势。当Execution Memory空间不足而且Storage Memory空间也不足的时候,Storage Memory空间会被强制drop掉一部分数据来解决Execution 的空间不足问题。

Spark2.1.X内存管理UnifiedMemoryManager的管理方式从源码的角度看有2个核心方法:

  • acquireExecutionMemory
  • acquireStorageMemory

我们看一下acquireExecutionMemory的源代码:

override private[memory] def acquireExecutionMemory(
    numBytes: Long,
    taskAttemptId: Long,
    memoryMode: MemoryMode): Long = synchronized {
  assertInvariants()
  assert(numBytes >= 0)
  val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
    case MemoryMode.ON_HEAP => (
      onHeapExecutionMemoryPool,
      onHeapStorageMemoryPool,
      onHeapStorageRegionSize,
      maxHeapMemory)
    case MemoryMode.OFF_HEAP => (
      offHeapExecutionMemoryPool,
      offHeapStorageMemoryPool,
      offHeapStorageMemory,
      maxOffHeapMemory)
  }

acquireExecutionMemory根据ON_HEAP、OFF_HEAP两种不同的方式进行模式匹配,匹配到2种不同方式的时候,有自己的执行方式。内部onHeapExecutionMemoryPool, onHeapStorageMemoryPool无论是哪种方式都会调maybeGrowExecutionPool。


def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
  if (extraMemoryNeeded > 0) {
    // There is not enough free memory in the execution pool, so try to reclaim memory from
    // storage. We can reclaim any free memory from the storage pool. If the storage pool
    // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
    // the memory that storage has borrowed from execution.
    val memoryReclaimableFromStorage = math.max(
      storagePool.memoryFree,
      storagePool.poolSize - storageRegionSize)
    if (memoryReclaimableFromStorage > 0) {
      // Only reclaim as much space as is necessary and available:
      val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
        math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
      storagePool.decrementPoolSize(spaceToReclaim)
      executionPool.incrementPoolSize(spaceToReclaim)
    }
  }
}


每一个Task能够使用的内存理论上是多大呢?这是很重要的一件事情,因为能推测每一个Task能使用的大小,计算公式为poolSize/(2*numActiveTasks)到 maxPollSize/numActiveTasks。

在maybeGrowExecutionPool方法中通过比较 storagePool.memoryFree和storagePool.poolSize -storageRegionSize的大小,计算一个最大值:storage Memory剩余的内存和storagememory从Executionmemory借来的内存哪个大。Storage和Execution在适当的时候可以借用彼此的memory,Storage Memory在Execution执行的时候,Execution memory有空间,Storage Memory缓存的时候向Execution memory借用一些空间。如果Execution memory内存不够,这个时候进行一个比较,整个Execution memory能借到的最大内存是Storage Memory曾经找Execution memory借的内存+Storage Memory空闲内存;根据math.min(extraMemoryNeeded,memoryReclaimableFromStorage)如果Executionmemory需要的大小小于能借到的最大内存,那以实际需要的内存为准。

我们可以看一下freeSpaceToShrinkPool方法的实现:

def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
  val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
  val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
  if (remainingSpaceToFree > 0) {
    // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
    val spaceFreedByEviction =
      memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
    // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
    // not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
    spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
  } else {
    spaceFreedByReleasingUnusedMemory
  }
}



在acquireStorageMemory方法分别调用executionPool.decrementPoolSize方法减少内存,及storagePool.incrementPoolSize方法增加内容:

l executionPool.decrementPoolSize(memoryBorrowedFromExecution)

l storagePool.incrementPoolSize(memoryBorrowedFromExecution)

/**
 * Expands the pool by `delta` bytes.
 */
final def incrementPoolSize(delta: Long): Unit = lock.synchronized {
  require(delta >= 0)
  _poolSize += delta
}

/**
 * Shrinks the pool by `delta` bytes.
 */
final def decrementPoolSize(delta: Long): Unit = lock.synchronized {
  require(delta >= 0)
  require(delta <= _poolSize)
  require(_poolSize - delta >= memoryUsed)
  _poolSize -= delta
}

接下来我们阐述Storage Memory。Storage Memory只有一种情况:在Execution memory空闲的时候,StorageMemory能借走Execution memory的空闲内存。



override def acquireStorageMemory(
    blockId: BlockId,
    numBytes: Long,
    memoryMode: MemoryMode): Boolean = synchronized {
  assertInvariants()
  assert(numBytes >= 0)
  val (executionPool, storagePool, maxMemory) = memoryMode match {
    case MemoryMode.ON_HEAP => (
      onHeapExecutionMemoryPool,
      onHeapStorageMemoryPool,
      maxOnHeapStorageMemory)
    case MemoryMode.OFF_HEAP => (
      offHeapExecutionMemoryPool,
      offHeapStorageMemoryPool,
      maxOffHeapMemory)
  }
  if (numBytes > maxMemory) {
    // Fail fast if the block simply won't fit
    logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
      s"memory limit ($maxMemory bytes)")
    return false
  }
  if (numBytes > storagePool.memoryFree) {
    // There is not enough free memory in the storage pool, so try to borrow free memory from
    // the execution pool.
    val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
    executionPool.decrementPoolSize(memoryBorrowedFromExecution)
    storagePool.incrementPoolSize(memoryBorrowedFromExecution)
  }
  storagePool.acquireMemory(blockId, numBytes)
}

无论是storagePool还是executionPool,都要和memoryPoll打交道,我们看一下memoryPoll。



/**
 * Manages bookkeeping for an adjustable-sized region of memory. This class is internal to
 * the [[MemoryManager]]. See subclasses for more details.
 *
 * @param lock a [[MemoryManager]] instance, used for synchronization. We purposely erase the type
 *             to `Object` to avoid programming errors, since this object should only be used for
 *             synchronization purposes.
 */
private[memory] abstract class MemoryPool(lock: Object) {

  @GuardedBy("lock")
  private[this] var _poolSize: Long = 0

  /**
   * Returns the current size of the pool, in bytes.
   */
  final def poolSize: Long = lock.synchronized {
    _poolSize
  }

  /**
   * Returns the amount of free memory in the pool, in bytes.
   */
  final def memoryFree: Long = lock.synchronized {
    _poolSize - memoryUsed
  }

  /**
   * Expands the pool by `delta` bytes.
   */
  final def incrementPoolSize(delta: Long): Unit = lock.synchronized {
    require(delta >= 0)
    _poolSize += delta
  }

  /**
   * Shrinks the pool by `delta` bytes.
   */
  final def decrementPoolSize(delta: Long): Unit = lock.synchronized {
    require(delta >= 0)
    require(delta <= _poolSize)
    require(_poolSize - delta >= memoryUsed)
    _poolSize -= delta
  }

  /**
   * Returns the amount of used memory in this pool (in bytes).
   */
  def memoryUsed: Long
}


我们看一下StorageMemoryPool,其对于内存的记录和管理,一方面是内存使用的记录,一方面是可调整大小内存(要么借进新的内存,要么借出内存)的管理。StorageMemoryPool有acquireMemory,releaseMemory等方法。我们在shuffle的时候,如果我们使用了UnifiedMemoryManager的新型内存管理方式,同时开启了OFF_HEAP,对我们内存的使用有很大的影响,如果有offHeapExecutionMemoryPool,这个时候还存不存在一个概念,从StorageMemory中获取内存?实际上不需要找StorageMemory要内存。



每个Task能够分配的内存大小:poolSize/(2*numActiveTasks) 到 maxPollSize/numActiveTasks。

l val maxMemoryPerTask = maxPoolSize / numActiveTasks

l val minMemoryPerTask = poolSize / (2 * numActiveTasks)


如果ShuffleTask计算比较复杂的情况,业务逻辑比较复杂,使用新型的UnifiedMemoryManager内存管理方式能取得较好的效果;但是如果计算的业务逻辑不复杂,shuffle计算比较简单,这个时候可以回退到旧的内存管理方式,使用StaticMemory Management效果会更好,因为缓存空间更大。


在这个基础之上,从Task的角度来看,Task有个TaskMemoryManagerTaskMemoryManager管理单个任务分配的内存。 TaskMemoryManager类中的大多数复杂情况涉及到64位长的off-heap 非堆地址编码。在off-heap 非堆模式下,内存可以直接寻址64位。在堆模式下,内存是对象内的基本对象引用和64位偏移的组合寻址。这是一个问题,当我们想存储指针的数据结构内的其他结构,如在hashmaps或排序缓冲区的记录内的指针。即使我们决定使用128位来解决内存问题,我们不能只存储基本对象的地址,由于堆内存存在GC,因此它不能保证保持稳定。相反,我们使用64位地址方式来编码记录指针:off-heap 非堆模式,只需存储原始地址,并在堆模式上使用的地址的上13位存储“页码”和较低的51位来存储此页内的偏移量。这些页数字是用来索引为“页表”阵内以MemoryManager来检索基本对象。 这使我们能够解决8192页。在堆模式下,最大页大小受最大长度的数组限制,我们能够解决8192 2^32 字节的地址,大约35百万兆字节内存。

TaskMemoryManager 肯定要申请内存,管理内存以及释放内存,需要一个MemoryBlock[]类型的内存的pageTable,对堆的内存的分配、释放,这个都是JVM进行管理的,我们new出一个MemoryBlock,只是对象的引用,不是具体内存空间的地址。堆内存有GC的问题,是JVM的死穴,那我们做堆外内存,JAVA提供了ByteBufferBlock的工具类。

接下来我们看一下MemoryLocationMemoryLocation是内存位置:跟踪内存地址(off-heap 非堆地址分配);或者JVM对象的偏移量(in-heap堆分配)。


public class MemoryLocation {

  @Nullable
  Object obj;

  long offset;

  public MemoryLocation(@Nullable Object obj, long offset) {
    this.obj = obj;
    this.offset = offset;
  }

  public MemoryLocation() {
    this(null, 0);
  }

  public void setObjAndOffset(Object newObj, long newOffset) {
    this.obj = newObj;
    this.offset = newOffset;
  }

  public final Object getBaseObject() {
    return obj;
  }

  public final long getBaseOffset() {
    return offset;
  }
}


我们看一下MemoryBlock,MemoryBlock是一个连续的内存块,从{@link MemoryLocation} 开始具有固定大小的存储单元。pageNumber变量是可选的页码;当TaskMemoryManager分配内存使用MemoryBlock来表示页,pageNumber是公共变量,在不同的包里面可以通过TaskMemoryManager修改。


ublic class MemoryBlock extends MemoryLocation {

  private final long length;

  /**
   * Optional page number; used when this MemoryBlock represents a page allocated by a
   * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager,
   * which lives in a different package.
   */
  public int pageNumber = -1;

  public MemoryBlock(@Nullable Object obj, long offset, long length) {
    super(obj, offset);
    this.length = length;
  }

  /**
   * Returns the size of the memory block.
   */
  public long size() {
    return length;
  }

  /**
   * Creates a memory block pointing to the memory used by the long array.
   */
  public static MemoryBlock fromLongArray(final long[] array) {
    return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
  }

  /**
   * Fills the memory block with the specified byte value.
   */
  public void fill(byte value) {
    Platform.setMemory(obj, offset, length, value);
  }
}


接下来我们我们看一下UnsafeMemoryAllocator,一个简单的 {@linkMemoryAllocator} 使用 {@code Unsafe} 分配off-heap非堆内存分配。



public class UnsafeMemoryAllocator implements MemoryAllocator {

  @Override
  public MemoryBlock allocate(long size) throws OutOfMemoryError {
    long address = Platform.allocateMemory(size);
    MemoryBlock memory = new MemoryBlock(null, address, size);
    if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
      memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
    }
    return memory;
  }

  @Override
  public void free(MemoryBlock memory) {
    assert (memory.obj == null) :
      "baseObject not null; are you trying to use the off-heap allocator to free on-heap memory";
    if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
      memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
    }
    Platform.freeMemory(memory.offset);
  }
}


再回到TaskMemoryManager,关注一下关键点:

1pageTable:与操作系统的页表类似,此数组将页编号映射为基准对象指针,允许我们把哈希表的内部64位地址表示、以及baseObject+offset偏移表示,这样可以支持

堆地址和非堆地址。当使用“非堆地址分配”时,此映射中的每个条目将为“null”。使用堆分配器时,此映射中的项将指向页的基准对象。当新数据页被分配时,条目将被添加到该map

2PAGE_TABLE_SIZE是页表中的条目数。


3PAGE_NUMBER_BITS:用于处理页表的位数。

OFFSET_BITS:用于在数据页中编码偏移量的位数。实际为51



public class TaskMemoryManager {

  private static final Logger logger = LoggerFactory.getLogger(TaskMemoryManager.class);

  /** The number of bits used to address the page table. */
  private static final int PAGE_NUMBER_BITS = 13;

  /** The number of bits used to encode offsets in data pages. */
  @VisibleForTesting
  static final int OFFSET_BITS = 64 - PAGE_NUMBER_BITS;  // 51

  /** The number of entries in the page table. */
  private static final int PAGE_TABLE_SIZE = 1 << PAGE_NUMBER_BITS;

  /**
   * Maximum supported data page size (in bytes). In principle, the maximum addressable page size is
   * (1L &lt;&lt; OFFSET_BITS) bytes, which is 2+ petabytes. However, the on-heap allocator's
   * maximum page size is limited by the maximum amount of data that can be stored in a long[]
   * array, which is (2^32 - 1) * 8 bytes (or 16 gigabytes). Therefore, we cap this at 16 gigabytes.
   */
  public static final long MAXIMUM_PAGE_SIZE_BYTES = ((1L << 31) - 1) * 8L;

  /** Bit mask for the lower 51 bits of a long. */
  private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;



】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark学习——利用Scala语言开发S.. 下一篇spark yarn 参数分析

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目