您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

DiskStore

宝哥大数据 发布时间:2019-04-17 00:06:07 ,浏览量:0

当Memory没有足够的空间时, 就会使用DiskStore将块存储到磁盘, 下列方法都调用DiskBlockManger的getFile() 用来查找blockId对应的文件

1.1、getBytes, NIO读取方法

通过DiskBlockManger的getFile() 查找block文件, 使用NIO将文件读取到ByteBuffer

    def getBytes(blockId: BlockId): ChunkedByteBuffer = {
        // 通过DiskBlockManger的getFile() 查找block文件
        val file = diskManager.getFile(blockId.name)
        // 通过随机访问文件流读取block文件
        val channel = new RandomAccessFile(file, "r").getChannel

        Utils.tryWithSafeFinally {
            // For small files, directly read rather than memory map
            if (file.length < minMemoryMapBytes) {
                //将数据存入ByteBuffer中
                val buf = ByteBuffer.allocate(file.length.toInt)
                channel.position(0)
                while (buf.remaining() != 0) {
                    if (channel.read(buf) == -1) {
                        throw new IOException("Reached EOF before filling buffer\n" +
                                s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
                    }
                }
                buf.flip()
                new ChunkedByteBuffer(buf)
            } else {
                new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
            }
        } {
            channel.close()
        }
    }
1.2、putBytes() NIO写入方法
    def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
        //调用put方法
        put(blockId) { fileOutputStream =>
            val channel = fileOutputStream.getChannel
            //Utils提供文件操纵安全方法, 保证流关闭
            Utils.tryWithSafeFinally {
                bytes.writeFully(channel)
            } {
                channel.close()
            }
        }
    }

1.2.1、实际调用def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit
    /**
     * Invokes the provided callback function to write the specific block.
     *
     * @throws IllegalStateException if the block already exists in the disk store.
     */
    def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
        if (contains(blockId)) {
            throw new IllegalStateException(s"Block $blockId is already present in the disk store")
        }
        logDebug(s"Attempting to put block $blockId")
        val startTime = System.currentTimeMillis
        // 通过DiskBlockManager查找该blockId的文件
        val file = diskManager.getFile(blockId)
        val fileOutputStream = new FileOutputStream(file)
        var threwException: Boolean = true
        try {
            writeFunc(fileOutputStream)
            threwException = false
        } finally {
            try {
                Closeables.close(fileOutputStream, threwException)
            } finally {
                if (threwException) {
                    // 写block失败, 移除该blockId对应的文件
                    remove(blockId)
                }
            }
        }
        val finishTime = System.currentTimeMillis
        logDebug("Block %s stored as %s file on disk in %d ms".format(
            file.getName,
            Utils.bytesToString(file.length()),
            finishTime - startTime))
    }
1.2.2、写操作失败, 会删除block对应的文件
    def remove(blockId: BlockId): Boolean = {
        val file = diskManager.getFile(blockId.name)
        if (file.exists()) {
            val ret = file.delete()
            if (!ret) {
                logWarning(s"Error deleting ${file.getPath()}")
            }
            ret
        } else {
            false
        }
    }
1.3、判断某个blockId的文件是否存在
    def contains(blockId: BlockId): Boolean = {
        val file = diskManager.getFile(blockId.name)
        file.exists()
    }
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0378s