当Memory没有足够的空间时, 就会使用DiskStore将块存储到磁盘, 下列方法都调用DiskBlockManger的getFile() 用来查找blockId对应的文件
1.1、getBytes, NIO读取方法通过DiskBlockManger的getFile() 查找block文件, 使用NIO将文件读取到ByteBuffer
def getBytes(blockId: BlockId): ChunkedByteBuffer = {
// 通过DiskBlockManger的getFile() 查找block文件
val file = diskManager.getFile(blockId.name)
// 通过随机访问文件流读取block文件
val channel = new RandomAccessFile(file, "r").getChannel
Utils.tryWithSafeFinally {
// For small files, directly read rather than memory map
if (file.length < minMemoryMapBytes) {
//将数据存入ByteBuffer中
val buf = ByteBuffer.allocate(file.length.toInt)
channel.position(0)
while (buf.remaining() != 0) {
if (channel.read(buf) == -1) {
throw new IOException("Reached EOF before filling buffer\n" +
s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
}
}
buf.flip()
new ChunkedByteBuffer(buf)
} else {
new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
}
} {
channel.close()
}
}
1.2、putBytes() NIO写入方法
def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
//调用put方法
put(blockId) { fileOutputStream =>
val channel = fileOutputStream.getChannel
//Utils提供文件操纵安全方法, 保证流关闭
Utils.tryWithSafeFinally {
bytes.writeFully(channel)
} {
channel.close()
}
}
}
1.2.1、实际调用def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit
/**
* Invokes the provided callback function to write the specific block.
*
* @throws IllegalStateException if the block already exists in the disk store.
*/
def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
if (contains(blockId)) {
throw new IllegalStateException(s"Block $blockId is already present in the disk store")
}
logDebug(s"Attempting to put block $blockId")
val startTime = System.currentTimeMillis
// 通过DiskBlockManager查找该blockId的文件
val file = diskManager.getFile(blockId)
val fileOutputStream = new FileOutputStream(file)
var threwException: Boolean = true
try {
writeFunc(fileOutputStream)
threwException = false
} finally {
try {
Closeables.close(fileOutputStream, threwException)
} finally {
if (threwException) {
// 写block失败, 移除该blockId对应的文件
remove(blockId)
}
}
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName,
Utils.bytesToString(file.length()),
finishTime - startTime))
}
1.2.2、写操作失败, 会删除block对应的文件
def remove(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
if (file.exists()) {
val ret = file.delete()
if (!ret) {
logWarning(s"Error deleting ${file.getPath()}")
}
ret
} else {
false
}
}
1.3、判断某个blockId的文件是否存在
def contains(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
file.exists()
}