您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

MyCollector02---AppManager

宝哥大数据 发布时间:2017-10-27 16:49:17 ,浏览量:1

AppManager 1.1、appManager 提供一个队列,方便从kafka中消费的数据, 有缓存的地方
    /********data*****/
    /**
     * 消息队列
     * 由于LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,是作为生产者消费者的首选
     */
    protected LinkedBlockingQueue   kafkaQueue;
1.2创建一个变量noDatasToKafkaQueue, 用于管理是否向队列中写入数据
    /**
     * 停止向kafka队列中压入数据
     */
    private AtomicBoolean noDatasToKafkaQueue;




    /**
     * 向队列中压入数据
     * @param record    消息
     * @param timeout   超时时间
     * @param unit      时间单位
     * @return  是否成功压入队列
     * @throws InterruptedException
     */
    public boolean offerDataToQueue(ConsumerRecord record, long timeout, TimeUnit unit) 
            throws InterruptedException {
        return !noDatasToKafkaQueue.get() && kafkaQueue.offer(record, timeout, unit);
    }

    /**
     * 从队列中获取数据
     * @param timeout 
     * @param unit
     * @return
     * @throws InterruptedException
     */
    public ConsumerRecord pollRecords(long timeout, TimeUnit unit) 
            throws InterruptedException {
        return kafkaQueue.poll(timeout, unit);
    }
1.3、 任务池存储运行的job, 任务调度Scheduler管理Job

    /**
     * 任务池
     */
    protected Map jobpool;
    /**
     * 任务调度
     */
    private Scheduler scheduler;
1.3.1、注册Job, 添加job到任务池jobpool, 注册job到Scheduler的busypool
    /**
     * 通过调用scheduler将job 注册到
     * @param job
     * @return
     */
    public boolean registerJob(Job job) {
        if (!scheduler.registerJob(job)) {
            LogUtils.log(getClass(), "the job-register of scheduler is unsuccessful .", LogUtils.WARN);
            return false;
        }
        jobpool.put(job.getID(), job);
        LogUtils.log(getClass(), "register " + job.getID() + " to scheduler");
        return true;
    }
1.3.2、删除Job
    /**
     * remove job 
     * @param jobId
     */
    public synchronized void removeJob(String jobId) {
        LogUtils.log(MyScheduler.class, "Thread-" + jobId + " removed");
        scheduler.removeJob(jobId);
        jobpool.remove(jobId);
    }
1.3.3、释放job
    /**
     * 释放运行的job
     * release job
     */
    public synchronized void releaseJob() {
        Map tmp = new HashMap(jobpool);
        for (Job job:tmp.values()) {
            if (job.isRun()) {
                removeJob(job.getID());
            }
        }
        scheduler.balanceThread();
    }
2、AppManager业务 2.1、初始化
    /**
     * 初始化AppManager
     *    第一步:初始化Cache
     *    第二步:初始化Schedule, 任务调度
     */
    public boolean init() {

        // initialize cache,加载appdata.conf
        if (!initCache()) return false;
        // init scheduler
        if (!initScheduler()) return false;

        return true;
    }
2.1.1、初始化CacheManager 读取配置文件到缓存中, CacheManager中实际业务, 就是将配置文件中的内容写到一个Map
    /**
     * 加载appdata.conf内容
     * @return
     */
    private boolean loadAppData() {
        LogUtils.log(getClass(), "Load AppData ......");
        appData.putAll(Configuration.readAppdata());
        return true;
    }
2.2、初始化Scheduler

    /**
     * com.nokia.server.AppManager.initScheduler()
     * 根据appdata.conf中的参数设置初始化线程数
     * 以及初始化Cheduler
     * 启动scheduler
     * @return
     */
    private boolean initScheduler() {
        scheduler = new Scheduler();

        // appdata.conf  minThread=10
        int min = CacheManager.getInstance().getMinThread();
        // appdata.conf  maxThread=100
        int max = CacheManager.getInstance().getMaxThread();

        //根据appdata.conf中的minThead,和maxThread设置初始化scheduler的线程数
        if (!scheduler.init(min, max)) {
            LogUtils.log(getClass(), "the initializtion of scheduler is unsuccessful .", LogUtils.WARN);
            return false;
        }

/*      //initialize scheduler,将SystemJob 以及业务Job注册到jobpool中
        if (!Initializtion.getInstance().init()) {
            LogUtils.log(getClass(), "the job-initializtion of scheduler is unsuccessful .", LogUtils.WARN);
            return false;
        }*/
        initJob();

        // startup scheduler
        if (!scheduler.start()) {
            LogUtils.log(getClass(), "the startup of scheduler is unsuccessful .", LogUtils.WARN);
            return false;
        }
        return true;
    }
2.3、shutdown
    /**
     * 停止scheduler
     */
    public void shtudown() {
        if (scheduler != null) {
            LogUtils.println(getClass(), "begin shutdown scheduler");

            scheduler.shutdown();
        }
        LogUtils.println(getClass(), "AppManager:" + appkey + " shutdown");
    }

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0402s