您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

SerializerManager

宝哥大数据 发布时间:2019-04-19 23:52:11 ,浏览量:0

1.1、在SparkEnv中初始化

        //通过反射创建序列化对象
        // Create an instance of the class with the given name, possibly initializing it with our conf
        def instantiateClass[T](className: String): T = {
            val cls = Utils.classForName(className)
            // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
            // SparkConf, then one taking no arguments
            try {
                cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
                        .newInstance(conf, new java.lang.Boolean(isDriver))
                        .asInstanceOf[T]
            } catch {
                case _: NoSuchMethodException =>
                    try {
                        cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
                    } catch {
                        case _: NoSuchMethodException =>
                            cls.getConstructor().newInstance().asInstanceOf[T]
                    }
            }
        }

        // Create an instance of the class named by the given SparkConf property, or defaultClassName
        // if the property is not set, possibly initializing it with our conf
        def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
            instantiateClass[T](conf.get(propertyName, defaultClassName))
        }

        val serializer = instantiateClassFromConf[Serializer](
            "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
        logDebug(s"Using serializer: ${serializer.getClass}")

        val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

        val closureSerializer = new JavaSerializer(conf)
1.2、数据流序列化 dataSerializeStream

如果写入存储体系的数据本身是序列化的, 那么读取是应该对其反序列化。dataSerializeStream方法使用compressionCode对文件 输入流进行压缩和序列化处理

    /** Serializes into a stream. */
    def dataSerializeStream[T: ClassTag](
            blockId: BlockId,
            outputStream: OutputStream,
            values: Iterator[T]): Unit = {
        val byteStream = new BufferedOutputStream(outputStream)
        val autoPick = !blockId.isInstanceOf[StreamBlockId]
        val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
        ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close()
    }
1.2.1、wrapStream
    /**
     * Wrap an output stream for encryption and compression
     */
    def wrapStream(blockId: BlockId, s: OutputStream): OutputStream = {
        wrapForCompression(blockId, wrapForEncryption(s))
    }
1.2.2、wrapForCompression
    /**
     * Wrap an output stream for compression if block compression is enabled for its block type
     */
    private[this] def wrapForCompression(blockId: BlockId, s: OutputStream): OutputStream = {
        if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s
    }
1.2.3、压缩算法compressionCodec

为了节省空间, 对Block进行压缩, 根据spark.io.compression.codec决定压缩算法, 默认为snappy(牺牲少量压缩比,极大提高压缩速度, 生成SnappyCompressionCodec实例),

/**
 * :: DeveloperApi ::
 * CompressionCodec allows the customization of choosing different compression implementations
 * to be used in block storage.
 *
 * @note The wire protocol for a codec is not guaranteed compatible across versions of Spark.
 * This is intended for use as an internal compression utility within a single Spark application.
 */
@DeveloperApi
trait CompressionCodec {

    def compressedOutputStream(s: OutputStream): OutputStream

    def compressedInputStream(s: InputStream): InputStream
}

private[spark] object CompressionCodec {

    private val configKey = "spark.io.compression.codec"

    private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
        (codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
                || codec.isInstanceOf[LZ4CompressionCodec])
    }

    private val shortCompressionCodecNames = Map(
        "lz4" -> classOf[LZ4CompressionCodec].getName,
        "lzf" -> classOf[LZFCompressionCodec].getName,
        "snappy" -> classOf[SnappyCompressionCodec].getName)

    def getCodecName(conf: SparkConf): String = {
        conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
    }

    def createCodec(conf: SparkConf): CompressionCodec = {
        createCodec(conf, getCodecName(conf))
    }

    def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
        val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName)
        val codec = try {
            val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf])
            Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
        } catch {
            case e: ClassNotFoundException => None
            case e: IllegalArgumentException => None
        }
        codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " +
                s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
    }

    /**
     * Return the short version of the given codec name.
     * If it is already a short name, just return it.
     */
    def getShortName(codecName: String): String = {
        if (shortCompressionCodecNames.contains(codecName)) {
            codecName
        } else {
            shortCompressionCodecNames
                    .collectFirst { case (k, v) if v == codecName => k }
                    .getOrElse {
                        throw new IllegalArgumentException(s"No short name for codec $codecName.")
                    }
        }
    }

    val FALLBACK_COMPRESSION_CODEC = "snappy"
    val DEFAULT_COMPRESSION_CODEC = "lz4"
    val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
}
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0395s