您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

TransportConf

宝哥大数据 发布时间:2019-04-15 23:41:49 ,浏览量:1

TransportContext中的TransportConf给Spark的RPC框架提供配置信息, 它有两个成员属性: 配置提供者conf和配置的模块名称module

	// 真正的配置提供者
  	private final ConfigProvider conf;  

  	private final String module;
从NettyRpcEnv中看到通过SparkTransportConf 创建TransportConf
        val ioThreads = clone.getInt("spark.files.io.threads", 1)
        //通过SparkTransportConf 创建TransportConf
        val downloadConf = SparkTransportConf.fromSparkConf(clone, module, ioThreads)
        val downloadContext = new TransportContext(downloadConf, new NoOpRpcHandler(), true)
        //创建TransportClientFactory
        fileDownloadFactory = downloadContext.createClientFactory(createClientBootstraps())
SparkTransportConf
package org.apache.spark.network.netty

import org.apache.spark.SparkConf
import org.apache.spark.network.util.{ConfigProvider, TransportConf}

/**
 * Provides a utility for transforming from a SparkConf inside a Spark JVM (e.g., Executor,
 * Driver, or a standalone shuffle service) into a TransportConf with details on our environment
 * like the number of cores that are allocated to this JVM.
 */
object SparkTransportConf {
    /**
     * Specifies an upper bound on the number of Netty threads that Spark requires by default.
     * In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core
     * that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes
     * at a premium.
     *
     * Thus, this value should still retain maximum throughput and reduce wasted off-heap memory
     * allocation. It can be overridden by setting the number of serverThreads and clientThreads
     * manually in Spark's configuration.
     */
    private val MAX_DEFAULT_NETTY_THREADS = 8

    /**
     * Utility for creating a [[TransportConf]] from a [[SparkConf]].
     * @param _conf the [[SparkConf]]
     * @param module the module name
     * @param numUsableCores if nonzero, this will restrict the server and client threads to only
     *                       use the given number of cores, rather than all of the machine's cores.
     *                       This restriction will only occur if these properties are not already set.
     */
    def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {
        val conf = _conf.clone

        // Specify thread configuration based on our JVM's allocation of cores (rather than necessarily
        // assuming we have all the machine's cores).
        // NB: Only set if serverThreads/clientThreads not already set.
        val numThreads = defaultNumThreads(numUsableCores)
        //服务端传输线程数
        conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
        //设置客户端传输线程数
        conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)

        new TransportConf(module, new ConfigProvider {
            override def get(name: String): String = conf.get(name)
        })
    }


    /**
     * Returns the default number of threads for both the Netty client and server thread pools.
     * If numUsableCores is 0, we will use Runtime get an approximate number of available cores.
     */
    private def defaultNumThreads(numUsableCores: Int): Int = {
        //如果numUsableCores小于等于0,那么线程数是系统可用处理器的数量
        val availableCores =
            if (numUsableCores > 0) {
                numUsableCores
            } else {
                Runtime.getRuntime.availableProcessors()
            }
        //不过系统的内核数不可能全部用于网络传输使用,所以这里还将分配给网络传输的内核数量最多限制在8个
        math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
    }
}

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0400s