您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

LauncherBackend与LauncherServer

宝哥大数据 发布时间:2019-04-12 23:59:45 ,浏览量:0

SparkLauncher,LauncherServer,LauncherBackend的通信流程

在这里插入图片描述

一、SparkLauncher

sparkLauncher 是一个在代码里提交spark任务的类 这个类底层使用的依然是spark-submit脚本进行提交,通过ProcessBuilder 来设置相关环境参数调用 主要的方法有下面几个

  • launch 提交一个任务,任务的提交输出结果如何由用户自己处理
  • createBuilder launch和startApplication方法中调用生成ProcessBuilder执行shell脚本对象的方法
  • startApplication 提交一个任务,并根据监听任务状态的改变来执行用户指定的listener
1.1、createBuilder

创建一个ProcessBuilder , 设置环境变量

        private def createBuilder: ProcessBuilder {
            var cmd = new ArrayList[String]
            val script = if (isWindows) "spark-submit.cmd" else "spark-submit"  //找到spark-submit对应的脚本
            cmd.add(join(File.separator, builder.getSparkHome, "bin", script))   //设置脚本的绝对路径
            cmd.addAll(builder.buildSparkSubmitArgs)						//设置参数
            // Since the child process is a batch script, let's quote things so that special characters are preserved, 
            // otherwise the batch interpreter will mess up the arguments. Batch scripts are weird.
            if (isWindows) {
                val winCmd = new ArrayList[String]
                import scala.collection.JavaConversions._
                for (arg  0) {
                        timeoutTimer.schedule(timeout, getConnectionTimeout());
                    } else {
                        timeout.run();
                    }
                }
            }
        } catch (IOException ioe) {
            if (running) {
                LOG.log(Level.SEVERE, "Error in accept loop.", ioe);
            }
        }
    }
2.4、请求的消息, 具体由ServerConnection处理

    private class ServerConnection extends LauncherConnection {

        private TimerTask timeout;
        private ChildProcAppHandle handle;

        ServerConnection(Socket socket, TimerTask timeout) throws IOException {
            super(socket);
            this.timeout = timeout;
        }

        @Override
        //处理请求的消息
        //
        protected void handle(Message msg) throws IOException {
            try {
                if (msg instanceof Hello) { //握手消息
                    timeout.cancel();
                    timeout = null;
                    Hello hello = (Hello) msg;
                    ChildProcAppHandle handle = pending.remove(hello.secret);
                    if (handle != null) {
                        handle.setConnection(this);
                        handle.setState(SparkAppHandle.State.CONNECTED);
                        this.handle = handle;
                    } else {
                        throw new IllegalArgumentException("Received Hello for unknown client.");
                    }
                } else {    // 更新状态
                    if (handle == null) {
                        throw new IllegalArgumentException("Expected hello, got: " +
                                msg != null ? msg.getClass().getName() : null);
                    }
                    if (msg instanceof SetAppId) {
                        SetAppId set = (SetAppId) msg;
                        //触发用户的listener
                        handle.setAppId(set.appId);
                    } else if (msg instanceof SetState) {
                        handle.setState(((SetState) msg).state);
                    } else {
                        throw new IllegalArgumentException("Invalid message: " +
                                msg != null ? msg.getClass().getName() : null);
                    }
                }
            } catch (Exception e) {
                LOG.log(Level.INFO, "Error handling message from client.", e);
                if (timeout != null) {
                    timeout.cancel();
                }
                close();
            } finally {
                timeoutTimer.purge();
            }
        }

        @Override
        // 销毁连接
        public void close() throws IOException {
            synchronized (clients) {
                clients.remove(this);
            }
            super.close();
            if (handle != null) {
                if (!handle.getState().isFinal()) {
                    LOG.log(Level.WARNING, "Lost connection to spark application.");
                    handle.setState(SparkAppHandle.State.LOST);
                }
                handle.disconnect();
            }
        }

    }
三、ChildProcAppHandle
ChildProcAppHandle 是用来保存和执行用户注册的listener的类,在LauncherServer中被调用
主要方法有
    setAppId               当appid变化时触发listener
    setState                当状态变化时触发listener
    fireEvent               被setAppId和setState调用的方法,实际执行用户的listener在这里
    addListener           注册用户的listener的方法,在SparkLauncher.startApplication中被调用
3.1、触发listener
    private synchronized void fireEvent(boolean isInfoChanged) {
        if (listeners != null) {
            for (Listener l : listeners) {
                if (isInfoChanged) {
                    l.infoChanged(this);
                } else {
                    l.stateChanged(this);
                }
            }
        }
    }

四、LauncherBackend
LauncherBackend 是跟LauncherServer通信的客户端,向LauncherServer发送状态变化的通信端点
4.1、建立连接
    def connect(): Unit = {

        //连接LauncherServer的socket初始化动作,端口是从env中获取的,env里的端口是在SparkLauncher中通告出去的
        val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt)
        //这里通过环境变量获取LauncherServer通信的唯一凭证
        val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET)
        if (port != None && secret != None) {
            /*
             *这里建立跟LauncherServer通信的socket,ip是本地回环地址,
             *因为只有通过SparkLauncher的startApplication的方式去提交spark 任务的时候LauncherServer才会在本地回环地址上建立监听
             *因为SparkLauncher 通过ProcessBuilder的方式调用spark-submit,所以在spark-submit中会继承父进程的环境变量
             *LauncherBackend才能通过环境变量确定是否存在LauncherServer服务
            */
            val s = new Socket(InetAddress.getLoopbackAddress(), port.get)

            // 封装与LauncherServer的连接, 在LauncherServer有对应的ServerConnection,接收连接
            connection = new BackendConnection(s)
            //发送握手包
            connection.send(new Hello(secret.get, SPARK_VERSION))

            //启动线程
            clientThread = LauncherBackend.threadFactory.newThread(connection)
            clientThread.start()
            _isConnected = true
        }
    }
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0474s