DiskBlockObjectWrite用于输出Spark任务的中间计算结果,
1.1、iniyislize() 获取一个文件的输出流, private def initialize(): Unit = {
fos = new FileOutputStream(file, true)
channel = fos.getChannel()
ts = new TimeTrackingOutputStream(writeMetrics, fos)
class ManualCloseBufferedOutputStream
extends BufferedOutputStream(ts, bufferSize) with ManualCloseOutputStream
mcs = new ManualCloseBufferedOutputStream
}
1.2、open 打开文件的输出流
def open(): DiskBlockObjectWriter = {
if (hasBeenClosed) {
throw new IllegalStateException("Writer already closed. Cannot be reopened.")
}
if (!initialized) {
initialize()
initialized = true
}
bs = serializerManager.wrapStream(blockId, mcs) //压缩
objOut = serializerInstance.serializeStream(bs) // 序列化
streamOpen = true
this
}
1.3、write 写入文件
/**
* Writes a key-value pair.
*/
def write(key: Any, value: Any) {
if (!streamOpen) {
open()
}
objOut.writeKey(key)
objOut.writeValue(value)
recordWritten()
}
override def write(b: Int): Unit = throw new UnsupportedOperationException()
override def write(kvBytes: Array[Byte], offs: Int, len: Int): Unit = {
if (!streamOpen) {
open()
}
bs.write(kvBytes, offs, len)
}
1.3.1、更新测量信息
/**
* Notify the writer that a record worth of bytes has been written with OutputStream#write.
*/
def recordWritten(): Unit = {
numRecordsWritten += 1
writeMetrics.incRecordsWritten(1)
if (numRecordsWritten % 16384 == 0) {
updateBytesWritten()
}
}
/**
* Report the number of bytes written in this writer's shuffle write metrics.
* Note that this is only valid before the underlying streams are closed.
*/
private def updateBytesWritten() {
val pos = channel.position()
writeMetrics.incBytesWritten(pos - reportedPosition)
reportedPosition = pos
}
1.4、close 关闭文件, 并更新测量信息
/**
* Commits any remaining partial writes and closes resources.
*/
override def close() {
if (initialized) {
Utils.tryWithSafeFinally {
commitAndGet()
} {
closeResources()
}
}
}
1.4.1、commitAndGet 缓存数据提交
将缓存数据写入磁盘, 关闭缓存, 然后更新测量数据
/**
* Flush the partial writes and commit them as a single atomic block.
* A commit may write additional bytes to frame the atomic block.
*
* @return file segment with previous offset and length committed on this call.
*/
def commitAndGet(): FileSegment = {
if (streamOpen) {
// NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
// serializer stream and the lower level stream.
objOut.flush()
bs.flush()
objOut.close()
streamOpen = false
if (syncWrites) {
// Force outstanding writes to disk and track how long it takes
val start = System.nanoTime()
fos.getFD.sync()
writeMetrics.incWriteTime(System.nanoTime() - start)
}
val pos = channel.position()
val fileSegment = new FileSegment(file, committedPosition, pos - committedPosition)
committedPosition = pos
// In certain compression codecs, more bytes are written after streams are closed
writeMetrics.incBytesWritten(committedPosition - reportedPosition)
reportedPosition = committedPosition
fileSegment
} else {
new FileSegment(file, committedPosition, 0)
}
}
1.4.2、关闭资源
/**
* Close and cleanup all resources.
* Should call after committing or reverting partial writes.
*/
private def closeResources(): Unit = {
if (initialized) {
Utils.tryWithSafeFinally {
mcs.manualClose()
} {
channel = null
mcs = null
bs = null
fos = null
ts = null
objOut = null
initialized = false
streamOpen = false
hasBeenClosed = true
}
}
}