AppManager
1.1、appManager 提供一个队列,方便从kafka中消费的数据, 有缓存的地方
/********data*****/
/**
* 消息队列
* 由于LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,是作为生产者消费者的首选
*/
protected LinkedBlockingQueue kafkaQueue;
1.2创建一个变量noDatasToKafkaQueue, 用于管理是否向队列中写入数据
/**
* 停止向kafka队列中压入数据
*/
private AtomicBoolean noDatasToKafkaQueue;
/**
* 向队列中压入数据
* @param record 消息
* @param timeout 超时时间
* @param unit 时间单位
* @return 是否成功压入队列
* @throws InterruptedException
*/
public boolean offerDataToQueue(ConsumerRecord record, long timeout, TimeUnit unit)
throws InterruptedException {
return !noDatasToKafkaQueue.get() && kafkaQueue.offer(record, timeout, unit);
}
/**
* 从队列中获取数据
* @param timeout
* @param unit
* @return
* @throws InterruptedException
*/
public ConsumerRecord pollRecords(long timeout, TimeUnit unit)
throws InterruptedException {
return kafkaQueue.poll(timeout, unit);
}
1.3、 任务池存储运行的job, 任务调度Scheduler管理Job
/**
* 任务池
*/
protected Map jobpool;
/**
* 任务调度
*/
private Scheduler scheduler;
1.3.1、注册Job, 添加job到任务池jobpool, 注册job到Scheduler的busypool
/**
* 通过调用scheduler将job 注册到
* @param job
* @return
*/
public boolean registerJob(Job job) {
if (!scheduler.registerJob(job)) {
LogUtils.log(getClass(), "the job-register of scheduler is unsuccessful .", LogUtils.WARN);
return false;
}
jobpool.put(job.getID(), job);
LogUtils.log(getClass(), "register " + job.getID() + " to scheduler");
return true;
}
1.3.2、删除Job
/**
* remove job
* @param jobId
*/
public synchronized void removeJob(String jobId) {
LogUtils.log(MyScheduler.class, "Thread-" + jobId + " removed");
scheduler.removeJob(jobId);
jobpool.remove(jobId);
}
1.3.3、释放job
/**
* 释放运行的job
* release job
*/
public synchronized void releaseJob() {
Map tmp = new HashMap(jobpool);
for (Job job:tmp.values()) {
if (job.isRun()) {
removeJob(job.getID());
}
}
scheduler.balanceThread();
}
2、AppManager业务
2.1、初始化
/**
* 初始化AppManager
* 第一步:初始化Cache
* 第二步:初始化Schedule, 任务调度
*/
public boolean init() {
// initialize cache,加载appdata.conf
if (!initCache()) return false;
// init scheduler
if (!initScheduler()) return false;
return true;
}
2.1.1、初始化CacheManager
读取配置文件到缓存中, CacheManager中实际业务, 就是将配置文件中的内容写到一个Map
/**
* 加载appdata.conf内容
* @return
*/
private boolean loadAppData() {
LogUtils.log(getClass(), "Load AppData ......");
appData.putAll(Configuration.readAppdata());
return true;
}
2.2、初始化Scheduler
/**
* com.nokia.server.AppManager.initScheduler()
* 根据appdata.conf中的参数设置初始化线程数
* 以及初始化Cheduler
* 启动scheduler
* @return
*/
private boolean initScheduler() {
scheduler = new Scheduler();
// appdata.conf minThread=10
int min = CacheManager.getInstance().getMinThread();
// appdata.conf maxThread=100
int max = CacheManager.getInstance().getMaxThread();
//根据appdata.conf中的minThead,和maxThread设置初始化scheduler的线程数
if (!scheduler.init(min, max)) {
LogUtils.log(getClass(), "the initializtion of scheduler is unsuccessful .", LogUtils.WARN);
return false;
}
/* //initialize scheduler,将SystemJob 以及业务Job注册到jobpool中
if (!Initializtion.getInstance().init()) {
LogUtils.log(getClass(), "the job-initializtion of scheduler is unsuccessful .", LogUtils.WARN);
return false;
}*/
initJob();
// startup scheduler
if (!scheduler.start()) {
LogUtils.log(getClass(), "the startup of scheduler is unsuccessful .", LogUtils.WARN);
return false;
}
return true;
}
2.3、shutdown
/**
* 停止scheduler
*/
public void shtudown() {
if (scheduler != null) {
LogUtils.println(getClass(), "begin shutdown scheduler");
scheduler.shutdown();
}
LogUtils.println(getClass(), "AppManager:" + appkey + " shutdown");
}