<返回更多

SpringBoot用线程池ThreadPoolExecutor处理百万级数据

2023-06-03  微信公众号  程序猿小杨
加入收藏

一、背景:

     使用JDK线程池ThreadPoolExecutor多线程异步执行批量插入、更新等操作方法,提高百万级数据插入效率。

二、具体细节:

图片

 

2.1、创建自适应机器本身线程数量的线程池

/创建自适应机器本身线程数量的线程池    Integer processNum = Runtime.getRuntime().avAIlableProcessors();    int corePoolSize = (int) (processNum / (1 - 0.2));    int maxPoolSize = (int) (processNum / (1 - 0.5));    ExecutorService executorService = new ThreadPoolExecutor(            corePoolSize,            maxPoolSize,            2L,            TimeUnit.SECONDS,            new LinkedBlockingQueue<>(3),            Executors.defaultThreadFactory(),            new ThreadPoolExecutor.CallerRunsPolicy()    );    @Override    public boolean batchInsert(List<Student> list) throws Exception {        Future<Boolean> a = null;        try {            /**             * submit与execute 都是向线程池提交任务。             * submit提交后执行提交类实现callable方法后重写的call方法,execute提交后执行实现Runnable的run方法             * Runnable任务没有返回值,而Callable任务有返回值。             * 并且Callable的call()方法只能通过ExecutorService的submit(Callable <T> task) 方法来执行             * 多人同时提交时的线程控制:多线程多任务             */            a = executorService.submit(new BatchWay(list,studentService));            return a.get();        } catch (Exception e) {            e.printStackTrace();            try {                return a.get();            } catch (Exception ex) {                ex.printStackTrace();                return false;            }        }    }

2.2、业务核心处理类:

@Slf4jpublic class BatchWay implements Callable<Boolean> { private int batch100 = 100; //100条为分界批量导入 private List<Student> list; //list中的大量数据 private StudentService studentService;

    //有参的构造函数,方便初始化其类    public BatchWay(List<Student> list, StudentService studentService) {        this.list = list;        this.studentService = studentService;    }    /**线程池*///    private ThreadPoolExecutor threadPoolExecutor =//            new ThreadPoolExecutor(//                    10, //corePoolSize:线程池中核心线程数//                    Runtime.getRuntime().availableProcessors(),  //线程池中能拥有最多线程数 取所有//                    5L,  //keepAliveTime:表示空闲线程的存活时间 2秒//                    TimeUnit.SECONDS, //表示keepAliveTime的单位:秒//                    new LinkedBlockingQueue<>(100),  //用于缓存任务的阻塞队列////                    Executors.defaultThreadFactory(),//                    new ThreadPoolExecutor.CallerRunsPolicy()//            );    /**     * 功能描述:实现Callable的call方法     * @MethodName: call     * @MethodParam: []     * @Return: JAVA.lang.Boolean     * @Author: yyalin     * @CreateDate: 2022/5/6 15:46     */    public Boolean call(){        try {            batchOp(list);            return true;        } catch (Exception e) {            e.printStackTrace();        }        return false;    }

    /**     * 功能描述:批量保存数据     * @MethodName: batchOp     * @MethodParam: [list]     * @Return: void     * @Author: yyalin     * @CreateDate: 2022/5/6 15:40     */    private void batchOp(List<Student> list) {        if(!list.isEmpty()){            Integer size = list.size();            if(size<=batch100){                //小于分批的直接插入即可                studentService.saveBatch(list);            }else if(size>batch100){                //分批后再进行保存数据                batchOpSpilit(list,batch100);            }        }    }

    /**     * 功能描述:对list进行切割     * @MethodName: batchOpSpilit     * @MethodParam: [list, batch100]     * @Return: void     * @Author: yyalin     * @CreateDate: 2022/5/6 15:43     */    private void batchOpSpilit(List<Student> list, int batch100) {        log.info("开始切割………………");        List<List<Student>> list1 = SplitListUtils.pagingList(list, batch100);        try {            for (List<Student> list2 : list1) {                batchOp(list2);//                threadPoolExecutor.allowCoreThreadTimeOut(true);//                //再调batchOp方法,这里的多线程是多个小集合往数据库插入//                threadPoolExecutor.execute(() -> {////                    log.info("我是线程开始保存数据...:" + Thread.currentThread().getName());//                    batchOp(list2);//                });            }//            log.info("当前线程池剩余的数量222222:"+threadPoolExecutor.getPoolSize());        } catch (Exception e) {//            log.info("出现异常:"+e);        } finally {            //最后关闭线程 不允许提交新的任务,但是会处理完已提交的任务//            threadPoolExecutor.shutdown();        }    }

2.3、造数据,多线程异步插入:

  public String batchWay() throws Exception {        log.info("开始批量操作.........");        Random rand = new Random();        List<Student> list = new ArrayList<>();        for (int i = 0; i < 1000003; i++) {            Student student=new Student();            student.setStudentName("小李"+i);            student.setAddr("上海"+rand.nextInt(9) * 1000);            student.setAge(rand.nextInt(1000));            student.setPhone("134"+rand.nextInt(9) * 1000);            list.add(student);        }        long startTime = System.currentTimeMillis(); // 开始时间        boolean a=studentService.batchInsert(list);        long endTime = System.currentTimeMillis(); //结束时间        return "执行完成一共耗时time: " + (endTime - startTime) / 1000 + " s";    }

2.4、测试结果

图片

汇总结果:

序号

核心线程(core_pool_size)

插入数据(万) 耗时(秒)
1 10 100w 38s
2 15 100w 32s
3 50 100w 31s

个人推荐:SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据的方法。

总结:ThreadPoolTaskExecutor和ThreadPoolExecutor比Executors创建线程池更加灵活,可以设置参数,推荐ThreadPoolTaskExecutor和ThreadPoolExecutor,而ThreadPoolTaskExecutor是ThreadPoolExecutor的封装,所以,性能更加优秀,推荐ThreadPoolTaskExecutor

关键词:SpringBoot      点击(7)
声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多SpringBoot相关>>>