1.1、在SparkEnv中初始化
//通过反射创建序列化对象
// Create an instance of the class with the given name, possibly initializing it with our conf
def instantiateClass[T](className: String): T = {
val cls = Utils.classForName(className)
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
// SparkConf, then one taking no arguments
try {
cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
.newInstance(conf, new java.lang.Boolean(isDriver))
.asInstanceOf[T]
} catch {
case _: NoSuchMethodException =>
try {
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
} catch {
case _: NoSuchMethodException =>
cls.getConstructor().newInstance().asInstanceOf[T]
}
}
}
// Create an instance of the class named by the given SparkConf property, or defaultClassName
// if the property is not set, possibly initializing it with our conf
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
instantiateClass[T](conf.get(propertyName, defaultClassName))
}
val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
val closureSerializer = new JavaSerializer(conf)
1.2、数据流序列化 dataSerializeStream
如果写入存储体系的数据本身是序列化的, 那么读取是应该对其反序列化。dataSerializeStream方法使用compressionCode对文件 输入流进行压缩和序列化处理
/** Serializes into a stream. */
def dataSerializeStream[T: ClassTag](
blockId: BlockId,
outputStream: OutputStream,
values: Iterator[T]): Unit = {
val byteStream = new BufferedOutputStream(outputStream)
val autoPick = !blockId.isInstanceOf[StreamBlockId]
val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close()
}
1.2.1、wrapStream
/**
* Wrap an output stream for encryption and compression
*/
def wrapStream(blockId: BlockId, s: OutputStream): OutputStream = {
wrapForCompression(blockId, wrapForEncryption(s))
}
1.2.2、wrapForCompression
/**
* Wrap an output stream for compression if block compression is enabled for its block type
*/
private[this] def wrapForCompression(blockId: BlockId, s: OutputStream): OutputStream = {
if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s
}
1.2.3、压缩算法compressionCodec
为了节省空间, 对Block进行压缩, 根据spark.io.compression.codec
决定压缩算法, 默认为snappy(牺牲少量压缩比,极大提高压缩速度, 生成SnappyCompressionCodec实例),
/**
* :: DeveloperApi ::
* CompressionCodec allows the customization of choosing different compression implementations
* to be used in block storage.
*
* @note The wire protocol for a codec is not guaranteed compatible across versions of Spark.
* This is intended for use as an internal compression utility within a single Spark application.
*/
@DeveloperApi
trait CompressionCodec {
def compressedOutputStream(s: OutputStream): OutputStream
def compressedInputStream(s: InputStream): InputStream
}
private[spark] object CompressionCodec {
private val configKey = "spark.io.compression.codec"
private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
(codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
|| codec.isInstanceOf[LZ4CompressionCodec])
}
private val shortCompressionCodecNames = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
"lzf" -> classOf[LZFCompressionCodec].getName,
"snappy" -> classOf[SnappyCompressionCodec].getName)
def getCodecName(conf: SparkConf): String = {
conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
}
def createCodec(conf: SparkConf): CompressionCodec = {
createCodec(conf, getCodecName(conf))
}
def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName)
val codec = try {
val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf])
Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
} catch {
case e: ClassNotFoundException => None
case e: IllegalArgumentException => None
}
codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " +
s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
}
/**
* Return the short version of the given codec name.
* If it is already a short name, just return it.
*/
def getShortName(codecName: String): String = {
if (shortCompressionCodecNames.contains(codecName)) {
codecName
} else {
shortCompressionCodecNames
.collectFirst { case (k, v) if v == codecName => k }
.getOrElse {
throw new IllegalArgumentException(s"No short name for codec $codecName.")
}
}
}
val FALLBACK_COMPRESSION_CODEC = "snappy"
val DEFAULT_COMPRESSION_CODEC = "lz4"
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
}