TransportContext中的TransportConf给Spark的RPC框架提供配置信息, 它有两个成员属性: 配置提供者conf和配置的模块名称module
// 真正的配置提供者
private final ConfigProvider conf;
private final String module;
从NettyRpcEnv中看到通过SparkTransportConf 创建TransportConf
val ioThreads = clone.getInt("spark.files.io.threads", 1)
//通过SparkTransportConf 创建TransportConf
val downloadConf = SparkTransportConf.fromSparkConf(clone, module, ioThreads)
val downloadContext = new TransportContext(downloadConf, new NoOpRpcHandler(), true)
//创建TransportClientFactory
fileDownloadFactory = downloadContext.createClientFactory(createClientBootstraps())
SparkTransportConf
package org.apache.spark.network.netty
import org.apache.spark.SparkConf
import org.apache.spark.network.util.{ConfigProvider, TransportConf}
/**
* Provides a utility for transforming from a SparkConf inside a Spark JVM (e.g., Executor,
* Driver, or a standalone shuffle service) into a TransportConf with details on our environment
* like the number of cores that are allocated to this JVM.
*/
object SparkTransportConf {
/**
* Specifies an upper bound on the number of Netty threads that Spark requires by default.
* In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core
* that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes
* at a premium.
*
* Thus, this value should still retain maximum throughput and reduce wasted off-heap memory
* allocation. It can be overridden by setting the number of serverThreads and clientThreads
* manually in Spark's configuration.
*/
private val MAX_DEFAULT_NETTY_THREADS = 8
/**
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
* @param _conf the [[SparkConf]]
* @param module the module name
* @param numUsableCores if nonzero, this will restrict the server and client threads to only
* use the given number of cores, rather than all of the machine's cores.
* This restriction will only occur if these properties are not already set.
*/
def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {
val conf = _conf.clone
// Specify thread configuration based on our JVM's allocation of cores (rather than necessarily
// assuming we have all the machine's cores).
// NB: Only set if serverThreads/clientThreads not already set.
val numThreads = defaultNumThreads(numUsableCores)
//服务端传输线程数
conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
//设置客户端传输线程数
conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)
new TransportConf(module, new ConfigProvider {
override def get(name: String): String = conf.get(name)
})
}
/**
* Returns the default number of threads for both the Netty client and server thread pools.
* If numUsableCores is 0, we will use Runtime get an approximate number of available cores.
*/
private def defaultNumThreads(numUsableCores: Int): Int = {
//如果numUsableCores小于等于0,那么线程数是系统可用处理器的数量
val availableCores =
if (numUsableCores > 0) {
numUsableCores
} else {
Runtime.getRuntime.availableProcessors()
}
//不过系统的内核数不可能全部用于网络传输使用,所以这里还将分配给网络传输的内核数量最多限制在8个
math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
}
}