您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

MyCollector01---Scheduler

宝哥大数据 发布时间:2017-10-27 14:12:59 ,浏览量:1

Scheduler对JobThread进行调度, 1.1、Job   handler是业务出处理的方法
public interface Job {
    /**
     * 判断是否在运行
     * @return
     */
    boolean isRun();
    /**
     * 任务实现方法,在此方法中写入实际任务代码
     */
    void handler();
    /**
     * 设置任务ID
     * @param id
     */
    void setID(String id);
    /**
     * 获取任务ID
     * @return
     */
    String getID();
    /**
     * 获取任务组名
     * @return
     */
    Group getGroup();
    /**
     * 获取运行间隔
     * systemJob=2000
     * @return
     */
    long getInterval();
    /**
     * 停止任务
     */
    void shutdown();
}
1.2、Scheduer对JobThread进行调度, 创建一个集体内部类JobThread, 持有Job对象,

    /**
     *  内部类, JobThread, 
     */
    class JobThread extends Thread {
        private Job job;
        /**
         * jobThread是否运行
         */
        private boolean isRun;
        /**
         * 
         * 注入job
         * @param job
         */
        public void setJob(Job job) {
            synchronized (this) {
                this.job = job;
                this.notifyAll();
            }
        }
        public Job getJob(){
            return job;
        }

        @Override
        public void run() {
            try {
                while(isRun) {
                    this.wait(job.getInterval());
                    while(job == null && isRun) {
                        this.wait();
                    }
                    if (job != null && job.isRun()) {
                        job.handler();
                        this.notifyAll();
                    }
                }
            } catch (InterruptedException e) {
                LogUtils.log(getClass(), e, LogUtils.FATAL);
            }
        }
    }
1.3、注入JobThread, 通过JobThread注入hob, 1.3.1、注入JobThread, 获取是否有可用的JobThread
    /**
     * 从Availpool中获取可用线程, 
     *  首先判断availpool中是否有可用线程, 
     *      有: 获取成功
     *      没有: 当前线程数是否大于MAX_THREADNUM  
     *              是  : 获取失败
     *              否  : 创建新的线程
     * @return JobThread
     */
    private JobThread getAvailThread() {
        //从availpool中获取可用线程jobThread
        JobThread jobThread = null;
        if (availpool.isEmpty()) {
            if (currentThreadNum >= MAX_THREAD_NUM) {
                LogUtils.log(getClass(), "the current Thread num is " + currentThreadNum + ", there is no JobThread that cat be use", LogUtils.FATAL);
            }else {
                jobThread = new JobThread();
                if (started) {
                    jobThread.start();
                    currentThreadNum++;
                }
            }
        }else {
            jobThread = availpool.remove(0);
        }
        return jobThread;
    }
1.3.2、注入Job
    /**
     * 注册Job
     * @param job
     * @return
     */
    public boolean registerJob(Job job) {
        synchronized (lock) {
            //获取jobThread 
            JobThread jobThread = getAvailThread();
            if (jobThread != null) {
                //添加到busypool
                busypool.put(jobThread.getJob().getID(), jobThread);
                //注入Job
                jobThread.setJob(job);
            }
        }
        return false;
    }
1.4、删除一个Job,通过jobid

    /**
     * 通过jobId删除一个Job
     *      将job 删除, 并将JobThread从busypool移除到availpool
     * @param jobId
     */
    public void removeJob(String jobId) {
        synchronized (lock) {
            JobThread jobThread = busypool.remove(jobId);
            if (jobThread != null) {
                //将job设置为空
                jobThread.makeAvailable();
                //并将JobThread移到availpool中, 
                availpool.add(jobThread);
            }
            LogUtils.log(getClass(), "Thead-" + jobId + " removed", LogUtils.INFO);
        }
    }
1.5、删除Job, 通过GroupId
    /**
     * 通过group, 
     * @param groupId
     */
    public void removeJob(Group group){

        Map tmp = new HashMap(busypool);
        for (JobThread jobThread : tmp.values()) {
            Job job = jobThread.getJob();
            if(job.getGroup() == group){
                //删除job, 通过groupid
                removeJob(job.getID());
            }
        }
        LogUtils.log(getClass(), "Thead-" + group + " removed", LogUtils.INFO);
    }
1.6、启动Scheduler
    /**
     * 启动Scheduler
     * @return
     */
    public boolean start() {
        if (started) return true;
        //没有启动
        synchronized (lock) {
            for (JobThread jobThread : availpool) {
                if (jobThread.isAlive()) {
                    jobThread.start();
                }
            }

            for (JobThread jobThread : busypool.values()) {
                if (jobThread.isAlive()) {
                    jobThread.start();
                }
            }
        }
        started = true;
        return true;
    }
1.7、停止scheduler
    /**
     * shut donw the scheduler
     * @return
     */
    public boolean shutdown() {
        synchronized (lock) {
            for (JobThread jobThread : availpool) {
                if (jobThread.isAlive()) {
                    jobThread.shutdown();
                }
            }

            for (JobThread jobThread : busypool.values()) {
                if (jobThread.isAlive()) {
                    jobThread.shutdown();
                }
            }
        }
        LogUtils.log(getClass(), "show down the scheduler...");
        return true;
    }
完整代码


import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.nokia.common.schedule.Group;
import com.nokia.common.schedule.Job;
import com.nokia.util.LogUtils;

/**
 * 用于调度job
 */
public final class MyScheduler {
    /**
     * 可用线程线程池
     */
    private List availpool;
    /**
     * 工作线程池
     */
    private Map busypool;
    /**  */
    private Object lock;
    private static int INIT_THREAD_NUM = 20;
    private static int MAX_THREAD_NUM = 500;
    /** 当前job数 */
    private int  currentThreadNum = 0;
    private boolean started = false;

    public MyScheduler() {
        availpool = Collections.synchronizedList(new ArrayList());
        busypool = Collections.synchronizedMap(new HashMap());
    }

    /**
     * 初始化Scheduler, 线程数
     * @param max
     * @param min
     * @return
     */
    public boolean init(int max, int min) {
        INIT_THREAD_NUM = min;
        MAX_THREAD_NUM = max;
        int num = getInitThreadNum();
        LogUtils.log(getClass(), "this num of init Thread is " + num, LogUtils.INFO);
        for (int i = 0; i < num; i++) {
            JobThread jobThread = new JobThread();
            jobThread.start();
        }
        currentThreadNum = num;
        return true;
    }

    /**
     * 通过jobId删除一个Job
     *      将job 删除, 并将JobThread从busypool移除到availpool
     * @param jobId
     */
    public void removeJob(String jobId) {
        synchronized (lock) {
            JobThread jobThread = busypool.remove(jobId);
            if (jobThread != null) {
                //将job设置为空
                jobThread.makeAvailable();
                //并将JobThread移到availpool中, 
                availpool.add(jobThread);
            }
            LogUtils.log(getClass(), "Thead-" + jobId + " removed", LogUtils.INFO);
        }
    }

    /**
     * 通过group, 
     * @param groupId
     */
    public void removeJob(Group group){

        Map tmp = new HashMap(busypool);
        for (JobThread jobThread : tmp.values()) {
            Job job = jobThread.getJob();
            if(job.getGroup() == group){
                //删除job, 通过groupid
                removeJob(job.getID());
            }
        }
        LogUtils.log(getClass(), "Thead-" + group + " removed", LogUtils.INFO);
    }

    /**
     * 启动Scheduler
     * @return
     */
    public boolean start() {
        if (started) return true;
        //没有启动
        synchronized (lock) {
            for (JobThread jobThread : availpool) {
                if (jobThread.isAlive()) {
                    jobThread.start();
                }
            }

            for (JobThread jobThread : busypool.values()) {
                if (jobThread.isAlive()) {
                    jobThread.start();
                }
            }
        }
        LogUtils.log(getClass(), "start the scheduler...");
        started = true;
        return true;
    }


    /**
     * shut donw the scheduler
     * @return
     */
    public boolean shutdown() {
        synchronized (lock) {
            for (JobThread jobThread : availpool) {
                if (jobThread.isAlive()) {
                    jobThread.shutdown();
                }
            }

            for (JobThread jobThread : busypool.values()) {
                if (jobThread.isAlive()) {
                    jobThread.shutdown();
                }
            }
        }
        LogUtils.log(getClass(), "show down the scheduler...");
        return true;
    }

    /**
     * balance thread number
     * 
     */
    public void balanceThread() {
        int balanceValue = 30;
        int availNum = availpool.size();
        if (availNum > balanceValue) {
            JobThread wt = availpool.remove(0);
            wt.shutdown();
            currentThreadNum--;
        }
    }


    /**
     * 注册Job
     * @param job
     * @return
     */
    public boolean registerJob(Job job) {
        synchronized (lock) {
            //获取jobThread 
            JobThread jobThread = getAvailThread();
            if (jobThread != null) {
                //添加到busypool
                busypool.put(jobThread.getJob().getID(), jobThread);
                //注入Job
                jobThread.setJob(job);
            }
        }
        return false;
    }

    /**
     * 从Availpool中获取可用线程, 
     *  首先判断availpool中是否有可用线程, 
     *      有: 获取成功
     *      没有: 当前线程数是否大于MAX_THREADNUM  
     *              是  : 获取失败
     *              否  : 创建新的线程
     * @return JobThread
     */
    private JobThread getAvailThread() {
        //从availpool中获取可用线程jobThread
        JobThread jobThread = null;
        if (availpool.isEmpty()) {
            if (currentThreadNum >= MAX_THREAD_NUM) {
                LogUtils.log(getClass(), "the current Thread num is " + currentThreadNum + ", there is no JobThread that cat be use", LogUtils.FATAL);
            }else {
                jobThread = new JobThread();
                if (started) {
                    jobThread.start();
                    currentThreadNum++;
                }
            }
        }else {
            jobThread = availpool.remove(0);
        }
        return jobThread;
    }

    /**
     * 显示busypool中的JobThread的状态
     * @return
     */
    public String displayCurrentJobState() {
        Map tmp = new HashMap(busypool);
        StringBuffer sb = new StringBuffer();
        for(JobThread job : tmp.values()) {
            sb.append("job name=" + job.getName() + " state=" + job.getState() + " alive=" + job.isAlive() + " \n");
        }
        return sb.toString();
    }
    /**
     * get the interval of thread
     * 
     * @return
     */
    private int getThreadInterval() {
        return 100;
    }

    /**
     * get initial thread number
     * 
     * @return
     */
    private int getInitThreadNum() {
//      int initThreadNum = 10;
//      if (initThreadNum < 0) {
//          initThreadNum = INIT_THREAD_NUM;
//      }
        return INIT_THREAD_NUM;         
    }

    /**
     * get max thread number
     * 
     * @return
     */
    private int getMaxThreadNum() {
//      int maxThreadNum = 20;
//      if (maxThreadNum < 0) {
//          maxThreadNum = MAX_THREAD_NUM;
//      }
        return MAX_THREAD_NUM;          
    }

    /**
     * current thread number
     * 
     * @return
     */
    public int getCurrentThreadNum() {
        return currentThreadNum;
    }


    /**
     *  内部类, JobThread, 
     */
    class JobThread extends Thread {
        public Job job;
        /**
         * jobThread是否运行
         */
        private boolean isRun;
        /**
         * 
         * 注入job
         * @param job
         */
        public void setJob(Job job) {
            synchronized (this) {
                this.job = job;
                this.notifyAll();
            }
        }
        public Job getJob(){
            return job;
        }


        /**
         * Callback a JobThread
         * 删除当前的job, 
         */
        public void makeAvailable() {
            synchronized (this) {
                this.job = null;
                this.notifyAll();
            }       
        }
        /**
         * 停止jobThread, 清除Job
         */
        public void shutdown() {
            isRun = false;
            if (job != null) {
                job = null;
            }
        }


        @Override
        public void run() {
            try {
                while(isRun) {
                    this.wait(job.getInterval());
                    while(job == null && isRun) {
                        this.wait();
                    }
                    if (job != null && job.isRun()) {
                        job.handler();
                        this.notifyAll();
                    }
                }
            } catch (InterruptedException e) {
                LogUtils.log(getClass(), e, LogUtils.FATAL);
            }
        }
    }
}

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0528s