您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

DiskBlockObjectWriter

宝哥大数据 发布时间:2019-04-20 00:19:58 ,浏览量:0

DiskBlockObjectWrite用于输出Spark任务的中间计算结果,

1.1、iniyislize() 获取一个文件的输出流,
    private def initialize(): Unit = {
        fos = new FileOutputStream(file, true)
        channel = fos.getChannel()
        ts = new TimeTrackingOutputStream(writeMetrics, fos)
        class ManualCloseBufferedOutputStream
                extends BufferedOutputStream(ts, bufferSize) with ManualCloseOutputStream
        mcs = new ManualCloseBufferedOutputStream
    }
1.2、open 打开文件的输出流
    def open(): DiskBlockObjectWriter = {
        if (hasBeenClosed) {
            throw new IllegalStateException("Writer already closed. Cannot be reopened.")
        }
        if (!initialized) {
            initialize()
            initialized = true
        }

        bs = serializerManager.wrapStream(blockId, mcs)		//压缩
        objOut = serializerInstance.serializeStream(bs) // 序列化
        streamOpen = true
        this
    }
1.3、write 写入文件

    /**
     * Writes a key-value pair.
     */
    def write(key: Any, value: Any) {
        if (!streamOpen) {
            open()
        }

        objOut.writeKey(key)
        objOut.writeValue(value)
        recordWritten()
    }

    override def write(b: Int): Unit = throw new UnsupportedOperationException()

    override def write(kvBytes: Array[Byte], offs: Int, len: Int): Unit = {
        if (!streamOpen) {
            open()
        }

        bs.write(kvBytes, offs, len)
    }
1.3.1、更新测量信息
    /**
     * Notify the writer that a record worth of bytes has been written with OutputStream#write.
     */
    def recordWritten(): Unit = {
        numRecordsWritten += 1
        writeMetrics.incRecordsWritten(1)

        if (numRecordsWritten % 16384 == 0) {
            updateBytesWritten()
        }
    }

    /**
     * Report the number of bytes written in this writer's shuffle write metrics.
     * Note that this is only valid before the underlying streams are closed.
     */
    private def updateBytesWritten() {
        val pos = channel.position()
        writeMetrics.incBytesWritten(pos - reportedPosition)
        reportedPosition = pos
    }
1.4、close 关闭文件, 并更新测量信息

    /**
     * Commits any remaining partial writes and closes resources.
     */
    override def close() {
        if (initialized) {
            Utils.tryWithSafeFinally {
                commitAndGet()
            } {
                closeResources()
            }
        }
    }
1.4.1、commitAndGet 缓存数据提交

将缓存数据写入磁盘, 关闭缓存, 然后更新测量数据

    /**
     * Flush the partial writes and commit them as a single atomic block.
     * A commit may write additional bytes to frame the atomic block.
     *
     * @return file segment with previous offset and length committed on this call.
     */
    def commitAndGet(): FileSegment = {
        if (streamOpen) {
            // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
            //       serializer stream and the lower level stream.
            objOut.flush()
            bs.flush()
            objOut.close()
            streamOpen = false

            if (syncWrites) {
                // Force outstanding writes to disk and track how long it takes
                val start = System.nanoTime()
                fos.getFD.sync()
                writeMetrics.incWriteTime(System.nanoTime() - start)
            }

            val pos = channel.position()
            val fileSegment = new FileSegment(file, committedPosition, pos - committedPosition)
            committedPosition = pos
            // In certain compression codecs, more bytes are written after streams are closed
            writeMetrics.incBytesWritten(committedPosition - reportedPosition)
            reportedPosition = committedPosition
            fileSegment
        } else {
            new FileSegment(file, committedPosition, 0)
        }
    }
1.4.2、关闭资源
    /**
     * Close and cleanup all resources.
     * Should call after committing or reverting partial writes.
     */
    private def closeResources(): Unit = {
        if (initialized) {
            Utils.tryWithSafeFinally {
                mcs.manualClose()
            } {
                channel = null
                mcs = null
                bs = null
                fos = null
                ts = null
                objOut = null
                initialized = false
                streamOpen = false
                hasBeenClosed = true
            }
        }
    }
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0392s