您当前的位置: 首页 > 

qq_34412985

暂无认证

  • 4浏览

    0关注

    1061博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

ThreadPoolTaskExecutor同时自定义线程拒绝策略,防止线程太多造成线程池将任务丢弃

qq_34412985 发布时间:2021-12-08 21:12:01 ,浏览量:4

@Bean("lcAsyncServiceExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE);
    threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE);
    //根据业务场景设置队列长度
    threadPoolTaskExecutor.setQueueCapacity(400);
    //允许线程的空闲时间60秒:当超过了核心线程数之外的线程在空闲时间到达之后会被销毁
    threadPoolTaskExecutor.setKeepAliveSeconds(60);
    threadPoolTaskExecutor.setThreadNamePrefix("BPExecutor-");
    threadPoolTaskExecutor.setRejectedExecutionHandler(new BPConfig.BuriedPointRunsPolicy());
    return threadPoolTaskExecutor;
}

/**
 * 自定义拒绝策略
 */
public static class BPRunsPolicy implements RejectedExecutionHandler {
    public BPRunsPolicy() { }
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if(r instanceof  BPTaskManager.BPThread){
            BPTaskManager.BPThread bPThread = (BPTaskManager.BPThread)r;
            //先做存储再更新状态
            bPThread.markBPReport();
            bPThread.failBPReport();
BlockingQueue queue = bPTaskManager.getThreadPoolTaskExecutor().getThreadPoolExecutor().getQueue();
queue.stream().forEach(qu->{BPTaskManager.BPThread thread=(BPTaskManager.BPThread)qu;
    if("69d".equals(thread.bPLogEntityBody.getBPInfo().buriId)){
        log.info("我是true======{}",thread.bPLogEntityBody.getBPInfo().buriedId);
    }else{
        log.info("我是false====={}",thread.bPLogEntityBody.getBPInfo().buriedId);
    }      
});
        }
    }
}

1.TransmittableThreadLocal中TtlRunnable使用spring框架中的ThreadPoolTaskExecutor线程池,如果自定义线程池不加名字时会和TtlRunnable使用同一个spring的ThreadPoolTaskExecutor线程池,并会将任务交给ThreadPoolTaskExecutor线程池处理任务。

2.请求任务太多时,连阻塞队列都放不下时,线程池会直接做丢弃,此时应

判断超过阻塞队列内的任务数1半时,将请求线程休眠1s钟,伪代码如下:

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(200);
QueryBuilder matchQueryBuilder = QueryBuilders.rangeQuery("body.bPInfo.reqInfo.reqTime").gte(begin).lte(end);
searchSourceBuilder.query(matchQueryBuilder);
searchSourceBuilder.sort("body.bPInfo.reqInfo.reqTime", SortOrder.ASC);
List bPLogPage = new ArrayList(1000);//list在外面建,不用每次查询再建,指定初始值
ScrollDto scrollDto = new ScrollDto();//持有scrollId
do{
bPLogPage.clear();
while (bPTaskManager.getThreadPoolTaskExecutor().getThreadPoolExecutor().getQueue().size() > 200){
//休眠一秒
    try{
        Thread.sleep(1000);
    }catch (Exception e){}
}
//从es中查询出一批数据处理一批数据,防止OOM内存溢出logService.getDocByDate(bPLogEntity,searchSourceBuilder,bPLogPage,scrollDto);
if(CollectionUtils.isNotEmpty(bPLogPage)){
    bPLogPage.stream().forEach(bP -> {
        if(null!=bP.getBody() && null != bP.getBody().getBPInfo()){
            //将查询的数据推送给线程执行器
            bPTaskManager.pushTask(bP.getBody());
        }
    });
}
}while(null !=bPLogPage && bPLogPage.size() > 0)

关注
打赏
1653291990
查看更多评论
立即登录/注册

微信扫码登录

0.2487s