MemoryStore负责将没有序列化的java对象数组或序列化的ByteBuffer存储到内存
MemoryManager 1.1、MemeoryStore的数据结构- entries 保存blockId对应的Block数据
- maxMemory 总的可用存储内存
memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
- memoryUsed 所有使用的存储内存
- memoryManager.storageMemoryUsed
- blocksMemoryUsed 使用的存储内存, 不包括unrolling Memory
- currentUnrollMemory
-
onHeapUnrollMemoryMap.values.sum + offHeapUnrollMemoryMap.values.sum
-
onHeapUnrollMemoryMap
-
offHeapUnrollMemoryMap
-
unrollMemoryThreshold 初始内存, 有
spark.storage.unrollMemoryThreshold
决定
-
/**
* Stores blocks in memory, either as Arrays of deserialized Java objects or as
* serialized ByteBuffers.
*/
private[spark] class MemoryStore(
conf: SparkConf,
blockInfoManager: BlockInfoManager,
serializerManager: SerializerManager,
memoryManager: MemoryManager,
blockEvictionHandler: BlockEvictionHandler)
extends Logging {
// Note: all changes to memory allocations, notably putting blocks, evicting blocks, and
// acquiring or releasing unroll memory, must be synchronized on `memoryManager`!
private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
// A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
// All accesses of this map are assumed to have manually synchronized on `memoryManager`
private val onHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()
// Note: off-heap unroll memory is only used in putIteratorAsBytes() because off-heap caching
// always stores serialized values.
private val offHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()
// Initial memory to request before unrolling any block
private val unrollMemoryThreshold: Long =
conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
/** Total amount of memory available for storage, in bytes. */
private def maxMemory: Long = {
memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
}
if (maxMemory < unrollMemoryThreshold) {
logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " +
s"threshold ${Utils.bytesToString(unrollMemoryThreshold)} needed to store a block in " +
s"memory. Please configure Spark with more memory.")
}
logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory)))
/** Total storage memory used including unroll memory, in bytes. */
private def memoryUsed: Long = memoryManager.storageMemoryUsed
/**
* Amount of storage memory, in bytes, used for caching blocks.
* This does not include memory used for unrolling.
*/
private def blocksMemoryUsed: Long = memoryManager.synchronized {
memoryUsed - currentUnrollMemory
}
/**
* Return the amount of memory currently occupied for unrolling blocks across all tasks.
*/
def currentUnrollMemory: Long = memoryManager.synchronized {
onHeapUnrollMemoryMap.values.sum + offHeapUnrollMemoryMap.values.sum
}
// 。。。。。。。
}
1.2、一些方法
1.2.1、数据存储 putBytes
首先,通过MemoryManager来申请Storage内存,调用putBytes方法,会根据size大小去申请Storage内存,如果申请成功,则会将blockId对应的Block数据保存在内部的LinkedHashMap[BlockId, MemoryEntry[_]]映射表中,然后以SerializedMemoryEntry这种序列化的格式存储,实际SerializedMemoryEntry就是简单指向Buffer中数据的引用对象; 如果无法申请到size大小的Storage内存,则存储失败,对于出现这种失败的情况,需要使用MemoryStore存储API的调用者去处理异常情况。
/**
* Use `size` to test if there is enough space in MemoryStore. If so, create the ByteBuffer and
* put it into MemoryStore. Otherwise, the ByteBuffer won't be created.
*
* The caller should guarantee that `size` is correct.
*
* @return true if the put() succeeded, false otherwise.
*/
def putBytes[T: ClassTag](
blockId: BlockId,
size: Long,
memoryMode: MemoryMode,
_bytes: () => ChunkedByteBuffer): Boolean = {
// 判断BlockId 是否存在
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
//通过MemeoryManager请求存储内存
if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
// We acquired enough memory for the block, so go ahead and put it
// 如果内存足够, 创建ByteBuffer
val bytes = _bytes()
assert(bytes.size == size)
//将数据序列化为 MemoryEntry的对象, 然后存入entries
val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
entries.synchronized {
entries.put(blockId, entry)
}
logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
true
} else {
false
}
}
1.2.2、putIteratorAsValues 基于记录迭代器,以反序列化Java对象形式保存Block数据
/**
* Attempt to put the given block in memory store as values.
*
* It's possible that the iterator is too large to materialize and store in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while periodically checking
* whether there is enough free memory. If the block is successfully materialized, then the
* temporary unroll memory used during the materialization is "transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the block.
*
* @return in case of success, the estimated size of the stored data. In case of failure, return
* an iterator containing the values of the block. The returned iterator will be backed
* by the combination of the partially-unrolled block and the remaining elements of the
* original input iterator. The caller must either fully consume this iterator or call
* `close()` on it in order to free the storage memory consumed by the partially-unrolled
* block.
*/
private[storage] def putIteratorAsValues[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Number of elements unrolled so far
var elementsUnrolled = 0
// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Initial per-task memory to request for unrolling blocks (bytes).
// 每个task初始资源
val initialMemoryThreshold = unrollMemoryThreshold
// How often to check whether we need to request more memory
val memoryCheckPeriod = 16
// Memory currently reserved by this task for this particular unrolling operation
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
val memoryGrowthFactor = 1.5
// Keep track of unroll memory used by this particular block / putIterator() operation
var unrollMemoryUsedByThisBlock = 0L
// Underlying vector for unrolling the block
var vector = new SizeTrackingVector[T]()(classTag)
// Request enough memory to begin unrolling
// 请求资源
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP)
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
} else {
unrollMemoryUsedByThisBlock += initialMemoryThreshold
}
// Unroll this block safely, checking whether we have exceeded our threshold periodically
while (values.hasNext && keepUnrolling) {
vector += values.next()
if (elementsUnrolled % memoryCheckPeriod == 0) { //开始检查currentSize >= memoryThreshold
// If our vector's size has exceeded the threshold, request more memory
val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold) {
// 再申请资源
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
if (keepUnrolling) { //申请成功
unrollMemoryUsedByThisBlock += amountToRequest
}
// New threshold is currentSize * memoryGrowthFactor
memoryThreshold += amountToRequest
}
}
elementsUnrolled += 1
}
if (keepUnrolling) {
// We successfully unrolled the entirety of this block
val arrayValues = vector.toArray
vector = null
//将Block数据记录以反序列化的方式保存在内存中
val entry =
new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
val size = entry.size
def transferUnrollToStorage(amount: Long): Unit = {
// Synchronize so that transfer is atomic
memoryManager.synchronized {
releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
assert(success, "transferring unroll memory to storage memory failed")
}
}
// Acquire storage memory if necessary to store this block in memory.
val enoughStorageMemory = {
if (unrollMemoryUsedByThisBlock size
// If this task attempt already owns more unroll memory than is necessary to store the
// block, then release the extra memory that will not be used.
val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
transferUnrollToStorage(size)
true
}
}
// 存储充足, 放入entries
if (enoughStorageMemory) {
entries.synchronized {
entries.put(blockId, entry)
}
logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
Right(size)
} else {
assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
"released too much unroll memory")
// 通过MemoryManager申请的Unroll内存大小大于该Block打开需要的内存,则会返回如下结果对象
Left(new PartiallyUnrolledIterator(
this,
MemoryMode.ON_HEAP,
unrollMemoryUsedByThisBlock,
unrolled = arrayValues.toIterator,
rest = Iterator.empty))
}
} else { //内存不足
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, vector.estimateSize())
Left(new PartiallyUnrolledIterator(
this,
MemoryMode.ON_HEAP,
unrollMemoryUsedByThisBlock,
unrolled = vector.iterator,
rest = values))
}
}
1.2.3、putIteratorAsBytes 基于记录迭代器,以序列化二进制格式保存Block数据
1.2.4、获取内存数据方法 getBytes
def getSize(blockId: BlockId): Long = {
entries.synchronized {
entries.get(blockId).size
}
}
1.2.5 获取数据
def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
val entry = entries.synchronized {
entries.get(blockId)
}
entry match {
case null => None
case e: DeserializedMemoryEntry[_] =>
throw new IllegalArgumentException("should only call getBytes on serialized blocks")
case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
}
}
def getValues(blockId: BlockId): Option[Iterator[_]] = {
val entry = entries.synchronized {
entries.get(blockId)
}
entry match {
case null => None
case e: SerializedMemoryEntry[_] =>
throw new IllegalArgumentException("should only call getValues on deserialized blocks")
case DeserializedMemoryEntry(values, _, _) =>
val x = Some(values)
x.map(_.iterator)
}
}