SparkLauncher,LauncherServer,LauncherBackend的通信流程
sparkLauncher 是一个在代码里提交spark任务的类 这个类底层使用的依然是spark-submit脚本进行提交,通过ProcessBuilder 来设置相关环境参数调用 主要的方法有下面几个
- launch 提交一个任务,任务的提交输出结果如何由用户自己处理
- createBuilder launch和startApplication方法中调用生成ProcessBuilder执行shell脚本对象的方法
- startApplication 提交一个任务,并根据监听任务状态的改变来执行用户指定的listener
创建一个ProcessBuilder , 设置环境变量
private def createBuilder: ProcessBuilder {
var cmd = new ArrayList[String]
val script = if (isWindows) "spark-submit.cmd" else "spark-submit" //找到spark-submit对应的脚本
cmd.add(join(File.separator, builder.getSparkHome, "bin", script)) //设置脚本的绝对路径
cmd.addAll(builder.buildSparkSubmitArgs) //设置参数
// Since the child process is a batch script, let's quote things so that special characters are preserved,
// otherwise the batch interpreter will mess up the arguments. Batch scripts are weird.
if (isWindows) {
val winCmd = new ArrayList[String]
import scala.collection.JavaConversions._
for (arg 0) {
timeoutTimer.schedule(timeout, getConnectionTimeout());
} else {
timeout.run();
}
}
}
} catch (IOException ioe) {
if (running) {
LOG.log(Level.SEVERE, "Error in accept loop.", ioe);
}
}
}
2.4、请求的消息, 具体由ServerConnection处理
private class ServerConnection extends LauncherConnection {
private TimerTask timeout;
private ChildProcAppHandle handle;
ServerConnection(Socket socket, TimerTask timeout) throws IOException {
super(socket);
this.timeout = timeout;
}
@Override
//处理请求的消息
//
protected void handle(Message msg) throws IOException {
try {
if (msg instanceof Hello) { //握手消息
timeout.cancel();
timeout = null;
Hello hello = (Hello) msg;
ChildProcAppHandle handle = pending.remove(hello.secret);
if (handle != null) {
handle.setConnection(this);
handle.setState(SparkAppHandle.State.CONNECTED);
this.handle = handle;
} else {
throw new IllegalArgumentException("Received Hello for unknown client.");
}
} else { // 更新状态
if (handle == null) {
throw new IllegalArgumentException("Expected hello, got: " +
msg != null ? msg.getClass().getName() : null);
}
if (msg instanceof SetAppId) {
SetAppId set = (SetAppId) msg;
//触发用户的listener
handle.setAppId(set.appId);
} else if (msg instanceof SetState) {
handle.setState(((SetState) msg).state);
} else {
throw new IllegalArgumentException("Invalid message: " +
msg != null ? msg.getClass().getName() : null);
}
}
} catch (Exception e) {
LOG.log(Level.INFO, "Error handling message from client.", e);
if (timeout != null) {
timeout.cancel();
}
close();
} finally {
timeoutTimer.purge();
}
}
@Override
// 销毁连接
public void close() throws IOException {
synchronized (clients) {
clients.remove(this);
}
super.close();
if (handle != null) {
if (!handle.getState().isFinal()) {
LOG.log(Level.WARNING, "Lost connection to spark application.");
handle.setState(SparkAppHandle.State.LOST);
}
handle.disconnect();
}
}
}
三、ChildProcAppHandle
ChildProcAppHandle 是用来保存和执行用户注册的listener的类,在LauncherServer中被调用
主要方法有
setAppId 当appid变化时触发listener
setState 当状态变化时触发listener
fireEvent 被setAppId和setState调用的方法,实际执行用户的listener在这里
addListener 注册用户的listener的方法,在SparkLauncher.startApplication中被调用
3.1、触发listener
private synchronized void fireEvent(boolean isInfoChanged) {
if (listeners != null) {
for (Listener l : listeners) {
if (isInfoChanged) {
l.infoChanged(this);
} else {
l.stateChanged(this);
}
}
}
}
四、LauncherBackend
LauncherBackend 是跟LauncherServer通信的客户端,向LauncherServer发送状态变化的通信端点
4.1、建立连接
def connect(): Unit = {
//连接LauncherServer的socket初始化动作,端口是从env中获取的,env里的端口是在SparkLauncher中通告出去的
val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt)
//这里通过环境变量获取LauncherServer通信的唯一凭证
val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET)
if (port != None && secret != None) {
/*
*这里建立跟LauncherServer通信的socket,ip是本地回环地址,
*因为只有通过SparkLauncher的startApplication的方式去提交spark 任务的时候LauncherServer才会在本地回环地址上建立监听
*因为SparkLauncher 通过ProcessBuilder的方式调用spark-submit,所以在spark-submit中会继承父进程的环境变量
*LauncherBackend才能通过环境变量确定是否存在LauncherServer服务
*/
val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
// 封装与LauncherServer的连接, 在LauncherServer有对应的ServerConnection,接收连接
connection = new BackendConnection(s)
//发送握手包
connection.send(new Hello(secret.get, SPARK_VERSION))
//启动线程
clientThread = LauncherBackend.threadFactory.newThread(connection)
clientThread.start()
_isConnected = true
}
}