@Bean("lcAsyncServiceExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE);
threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE);
//根据业务场景设置队列长度
threadPoolTaskExecutor.setQueueCapacity(400);
//允许线程的空闲时间60秒:当超过了核心线程数之外的线程在空闲时间到达之后会被销毁
threadPoolTaskExecutor.setKeepAliveSeconds(60);
threadPoolTaskExecutor.setThreadNamePrefix("BPExecutor-");
threadPoolTaskExecutor.setRejectedExecutionHandler(new BPConfig.BuriedPointRunsPolicy());
return threadPoolTaskExecutor;
}
/**
* 自定义拒绝策略
*/
public static class BPRunsPolicy implements RejectedExecutionHandler {
public BPRunsPolicy() { }
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if(r instanceof BPTaskManager.BPThread){
BPTaskManager.BPThread bPThread = (BPTaskManager.BPThread)r;
//先做存储再更新状态
bPThread.markBPReport();
bPThread.failBPReport();
BlockingQueue queue = bPTaskManager.getThreadPoolTaskExecutor().getThreadPoolExecutor().getQueue();
queue.stream().forEach(qu->{BPTaskManager.BPThread thread=(BPTaskManager.BPThread)qu;
if("69d".equals(thread.bPLogEntityBody.getBPInfo().buriId)){
log.info("我是true======{}",thread.bPLogEntityBody.getBPInfo().buriedId);
}else{
log.info("我是false====={}",thread.bPLogEntityBody.getBPInfo().buriedId);
}
});
}
}
}
1.TransmittableThreadLocal中TtlRunnable使用spring框架中的ThreadPoolTaskExecutor线程池,如果自定义线程池不加名字时会和TtlRunnable使用同一个spring的ThreadPoolTaskExecutor线程池,并会将任务交给ThreadPoolTaskExecutor线程池处理任务。
2.请求任务太多时,连阻塞队列都放不下时,线程池会直接做丢弃,此时应
判断超过阻塞队列内的任务数1半时,将请求线程休眠1s钟,伪代码如下:
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(200);
QueryBuilder matchQueryBuilder = QueryBuilders.rangeQuery("body.bPInfo.reqInfo.reqTime").gte(begin).lte(end);
searchSourceBuilder.query(matchQueryBuilder);
searchSourceBuilder.sort("body.bPInfo.reqInfo.reqTime", SortOrder.ASC);
List bPLogPage = new ArrayList(1000);//list在外面建,不用每次查询再建,指定初始值
ScrollDto scrollDto = new ScrollDto();//持有scrollId
do{
bPLogPage.clear();
while (bPTaskManager.getThreadPoolTaskExecutor().getThreadPoolExecutor().getQueue().size() > 200){
//休眠一秒
try{
Thread.sleep(1000);
}catch (Exception e){}
}
//从es中查询出一批数据处理一批数据,防止OOM内存溢出logService.getDocByDate(bPLogEntity,searchSourceBuilder,bPLogPage,scrollDto);
if(CollectionUtils.isNotEmpty(bPLogPage)){
bPLogPage.stream().forEach(bP -> {
if(null!=bP.getBody() && null != bP.getBody().getBPInfo()){
//将查询的数据推送给线程执行器
bPTaskManager.pushTask(bP.getBody());
}
});
}
}while(null !=bPLogPage && bPLogPage.size() > 0)
