Scheduler对JobThread进行调度,
1.1、Job handler是业务出处理的方法
public interface Job {
/**
* 判断是否在运行
* @return
*/
boolean isRun();
/**
* 任务实现方法,在此方法中写入实际任务代码
*/
void handler();
/**
* 设置任务ID
* @param id
*/
void setID(String id);
/**
* 获取任务ID
* @return
*/
String getID();
/**
* 获取任务组名
* @return
*/
Group getGroup();
/**
* 获取运行间隔
* systemJob=2000
* @return
*/
long getInterval();
/**
* 停止任务
*/
void shutdown();
}
1.2、Scheduer对JobThread进行调度, 创建一个集体内部类JobThread, 持有Job对象,
/**
* 内部类, JobThread,
*/
class JobThread extends Thread {
private Job job;
/**
* jobThread是否运行
*/
private boolean isRun;
/**
*
* 注入job
* @param job
*/
public void setJob(Job job) {
synchronized (this) {
this.job = job;
this.notifyAll();
}
}
public Job getJob(){
return job;
}
@Override
public void run() {
try {
while(isRun) {
this.wait(job.getInterval());
while(job == null && isRun) {
this.wait();
}
if (job != null && job.isRun()) {
job.handler();
this.notifyAll();
}
}
} catch (InterruptedException e) {
LogUtils.log(getClass(), e, LogUtils.FATAL);
}
}
}
1.3、注入JobThread, 通过JobThread注入hob,
1.3.1、注入JobThread, 获取是否有可用的JobThread
/**
* 从Availpool中获取可用线程,
* 首先判断availpool中是否有可用线程,
* 有: 获取成功
* 没有: 当前线程数是否大于MAX_THREADNUM
* 是 : 获取失败
* 否 : 创建新的线程
* @return JobThread
*/
private JobThread getAvailThread() {
//从availpool中获取可用线程jobThread
JobThread jobThread = null;
if (availpool.isEmpty()) {
if (currentThreadNum >= MAX_THREAD_NUM) {
LogUtils.log(getClass(), "the current Thread num is " + currentThreadNum + ", there is no JobThread that cat be use", LogUtils.FATAL);
}else {
jobThread = new JobThread();
if (started) {
jobThread.start();
currentThreadNum++;
}
}
}else {
jobThread = availpool.remove(0);
}
return jobThread;
}
1.3.2、注入Job
/**
* 注册Job
* @param job
* @return
*/
public boolean registerJob(Job job) {
synchronized (lock) {
//获取jobThread
JobThread jobThread = getAvailThread();
if (jobThread != null) {
//添加到busypool
busypool.put(jobThread.getJob().getID(), jobThread);
//注入Job
jobThread.setJob(job);
}
}
return false;
}
1.4、删除一个Job,通过jobid
/**
* 通过jobId删除一个Job
* 将job 删除, 并将JobThread从busypool移除到availpool
* @param jobId
*/
public void removeJob(String jobId) {
synchronized (lock) {
JobThread jobThread = busypool.remove(jobId);
if (jobThread != null) {
//将job设置为空
jobThread.makeAvailable();
//并将JobThread移到availpool中,
availpool.add(jobThread);
}
LogUtils.log(getClass(), "Thead-" + jobId + " removed", LogUtils.INFO);
}
}
1.5、删除Job, 通过GroupId
/**
* 通过group,
* @param groupId
*/
public void removeJob(Group group){
Map tmp = new HashMap(busypool);
for (JobThread jobThread : tmp.values()) {
Job job = jobThread.getJob();
if(job.getGroup() == group){
//删除job, 通过groupid
removeJob(job.getID());
}
}
LogUtils.log(getClass(), "Thead-" + group + " removed", LogUtils.INFO);
}
1.6、启动Scheduler
/**
* 启动Scheduler
* @return
*/
public boolean start() {
if (started) return true;
//没有启动
synchronized (lock) {
for (JobThread jobThread : availpool) {
if (jobThread.isAlive()) {
jobThread.start();
}
}
for (JobThread jobThread : busypool.values()) {
if (jobThread.isAlive()) {
jobThread.start();
}
}
}
started = true;
return true;
}
1.7、停止scheduler
/**
* shut donw the scheduler
* @return
*/
public boolean shutdown() {
synchronized (lock) {
for (JobThread jobThread : availpool) {
if (jobThread.isAlive()) {
jobThread.shutdown();
}
}
for (JobThread jobThread : busypool.values()) {
if (jobThread.isAlive()) {
jobThread.shutdown();
}
}
}
LogUtils.log(getClass(), "show down the scheduler...");
return true;
}
完整代码
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.nokia.common.schedule.Group;
import com.nokia.common.schedule.Job;
import com.nokia.util.LogUtils;
/**
* 用于调度job
*/
public final class MyScheduler {
/**
* 可用线程线程池
*/
private List availpool;
/**
* 工作线程池
*/
private Map busypool;
/** */
private Object lock;
private static int INIT_THREAD_NUM = 20;
private static int MAX_THREAD_NUM = 500;
/** 当前job数 */
private int currentThreadNum = 0;
private boolean started = false;
public MyScheduler() {
availpool = Collections.synchronizedList(new ArrayList());
busypool = Collections.synchronizedMap(new HashMap());
}
/**
* 初始化Scheduler, 线程数
* @param max
* @param min
* @return
*/
public boolean init(int max, int min) {
INIT_THREAD_NUM = min;
MAX_THREAD_NUM = max;
int num = getInitThreadNum();
LogUtils.log(getClass(), "this num of init Thread is " + num, LogUtils.INFO);
for (int i = 0; i < num; i++) {
JobThread jobThread = new JobThread();
jobThread.start();
}
currentThreadNum = num;
return true;
}
/**
* 通过jobId删除一个Job
* 将job 删除, 并将JobThread从busypool移除到availpool
* @param jobId
*/
public void removeJob(String jobId) {
synchronized (lock) {
JobThread jobThread = busypool.remove(jobId);
if (jobThread != null) {
//将job设置为空
jobThread.makeAvailable();
//并将JobThread移到availpool中,
availpool.add(jobThread);
}
LogUtils.log(getClass(), "Thead-" + jobId + " removed", LogUtils.INFO);
}
}
/**
* 通过group,
* @param groupId
*/
public void removeJob(Group group){
Map tmp = new HashMap(busypool);
for (JobThread jobThread : tmp.values()) {
Job job = jobThread.getJob();
if(job.getGroup() == group){
//删除job, 通过groupid
removeJob(job.getID());
}
}
LogUtils.log(getClass(), "Thead-" + group + " removed", LogUtils.INFO);
}
/**
* 启动Scheduler
* @return
*/
public boolean start() {
if (started) return true;
//没有启动
synchronized (lock) {
for (JobThread jobThread : availpool) {
if (jobThread.isAlive()) {
jobThread.start();
}
}
for (JobThread jobThread : busypool.values()) {
if (jobThread.isAlive()) {
jobThread.start();
}
}
}
LogUtils.log(getClass(), "start the scheduler...");
started = true;
return true;
}
/**
* shut donw the scheduler
* @return
*/
public boolean shutdown() {
synchronized (lock) {
for (JobThread jobThread : availpool) {
if (jobThread.isAlive()) {
jobThread.shutdown();
}
}
for (JobThread jobThread : busypool.values()) {
if (jobThread.isAlive()) {
jobThread.shutdown();
}
}
}
LogUtils.log(getClass(), "show down the scheduler...");
return true;
}
/**
* balance thread number
*
*/
public void balanceThread() {
int balanceValue = 30;
int availNum = availpool.size();
if (availNum > balanceValue) {
JobThread wt = availpool.remove(0);
wt.shutdown();
currentThreadNum--;
}
}
/**
* 注册Job
* @param job
* @return
*/
public boolean registerJob(Job job) {
synchronized (lock) {
//获取jobThread
JobThread jobThread = getAvailThread();
if (jobThread != null) {
//添加到busypool
busypool.put(jobThread.getJob().getID(), jobThread);
//注入Job
jobThread.setJob(job);
}
}
return false;
}
/**
* 从Availpool中获取可用线程,
* 首先判断availpool中是否有可用线程,
* 有: 获取成功
* 没有: 当前线程数是否大于MAX_THREADNUM
* 是 : 获取失败
* 否 : 创建新的线程
* @return JobThread
*/
private JobThread getAvailThread() {
//从availpool中获取可用线程jobThread
JobThread jobThread = null;
if (availpool.isEmpty()) {
if (currentThreadNum >= MAX_THREAD_NUM) {
LogUtils.log(getClass(), "the current Thread num is " + currentThreadNum + ", there is no JobThread that cat be use", LogUtils.FATAL);
}else {
jobThread = new JobThread();
if (started) {
jobThread.start();
currentThreadNum++;
}
}
}else {
jobThread = availpool.remove(0);
}
return jobThread;
}
/**
* 显示busypool中的JobThread的状态
* @return
*/
public String displayCurrentJobState() {
Map tmp = new HashMap(busypool);
StringBuffer sb = new StringBuffer();
for(JobThread job : tmp.values()) {
sb.append("job name=" + job.getName() + " state=" + job.getState() + " alive=" + job.isAlive() + " \n");
}
return sb.toString();
}
/**
* get the interval of thread
*
* @return
*/
private int getThreadInterval() {
return 100;
}
/**
* get initial thread number
*
* @return
*/
private int getInitThreadNum() {
// int initThreadNum = 10;
// if (initThreadNum < 0) {
// initThreadNum = INIT_THREAD_NUM;
// }
return INIT_THREAD_NUM;
}
/**
* get max thread number
*
* @return
*/
private int getMaxThreadNum() {
// int maxThreadNum = 20;
// if (maxThreadNum < 0) {
// maxThreadNum = MAX_THREAD_NUM;
// }
return MAX_THREAD_NUM;
}
/**
* current thread number
*
* @return
*/
public int getCurrentThreadNum() {
return currentThreadNum;
}
/**
* 内部类, JobThread,
*/
class JobThread extends Thread {
public Job job;
/**
* jobThread是否运行
*/
private boolean isRun;
/**
*
* 注入job
* @param job
*/
public void setJob(Job job) {
synchronized (this) {
this.job = job;
this.notifyAll();
}
}
public Job getJob(){
return job;
}
/**
* Callback a JobThread
* 删除当前的job,
*/
public void makeAvailable() {
synchronized (this) {
this.job = null;
this.notifyAll();
}
}
/**
* 停止jobThread, 清除Job
*/
public void shutdown() {
isRun = false;
if (job != null) {
job = null;
}
}
@Override
public void run() {
try {
while(isRun) {
this.wait(job.getInterval());
while(job == null && isRun) {
this.wait();
}
if (job != null && job.isRun()) {
job.handler();
this.notifyAll();
}
}
} catch (InterruptedException e) {
LogUtils.log(getClass(), e, LogUtils.FATAL);
}
}
}
}