Spring Boot使用多线程

jasmine 于 2020-05-01 发布

1、线程池初始化

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class AsyncConfiguration {
    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.keep_alive_seconds}")
    private int keepAliveSeconds;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean("menuExecutor")
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程数5:线程池创建时候初始化的线程数
        executor.setCorePoolSize(corePoolSize);
        //最大线程数5:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        executor.setMaxPoolSize(maxPoolSize);
        //缓冲队列500:用来缓冲执行任务的队列
        executor.setQueueCapacity(queueCapacity);
        //允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
        executor.setKeepAliveSeconds(keepAliveSeconds);
        //线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
        executor.setThreadNamePrefix(namePrefix);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //注意线程池中有的线程是一直存在一直被复用的,
        // 所以线程执行完成后需要在TaskDecorator的finally方法中移除传递的上下文对象,否则就存在内存泄漏的问题。
        //  https://www.cnblogs.com/x-kq/p/14911497.html
        //threadPoolTaskExecutor.setTaskDecorator(this::decorate);
        //线程初始化
        executor.initialize();
        return executor;
    }
    
    //当有参数共享子线程的时候用TaskDecorator传参
    public Runnable decorate(Runnable runnable) {
        Map<String, String> stringStringMap = HomeFoodHolder.unitKeyNameFoodMap.get();
        String foodName = HomeFoodHolder.foodName;
        return () -> {
            try {
                //②
                //ThreadContextHandler.set(context);
                runnable.run();
            } finally {
                // ③
                //ThreadContextHandler.clear();
            }
        };
    }
}

2、yml参数配置

#多线程
async.executor.thread.core_pool_size=10
async.executor.thread.max_pool_size=10
async.executor.thread.queue_capacity=100000
async.executor.thread.keep_alive_seconds=10
async.executor.thread.name.prefix=carryfilethreadpool

3、异步代码

@Service
public class MenuSync {
    /**
     * 执行多线程任务,无返回结果
     */
    @Async("menuExecutor")
    public void executeVoid() {
        try {
            System.out.println("haha 1---");
            Thread.sleep(3000);
            System.out.println("haha 2---");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 执行多线程任务,有返回结果
     */
    @Async("menuExecutor")
    public Future<String> executeFuture(int shopId, CountDownLatch latch,int time) {
        try {
            System.out.println("haha 1---" + shopId);
            Thread.sleep(time);
            System.out.println("haha 2---" + shopId);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            latch.countDown();
        }
        return new AsyncResult<>("---executeFuture result" + shopId);
    }
}

4、业务代码使用

@Service
public class MenuService {
    @Autowired
    private MenuSync menuSync;
    @Autowired
    private Executor menuExecutor;

    /**
     * 执行无返回值多线程任务
     */
    public List<Food> getFoods(int shopId) {
        System.out.println("111---");
        menuSync.executeVoid();
        System.out.println("222---");
        return new ArrayList<>();
    }

    /**
     * 执行有返回值多线程任务
     */
    public String getFoods2(int shopId) {
        System.out.println("111---");
        Map<String, String> map = new HashMap<>(2);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Future<String> future = menuSync.executeFuture(111, countDownLatch, 3000);
        Future<String> future2 = menuSync.executeFuture(222, countDownLatch, 5000);
        try {
            //future.get是阻塞状态,当需要几个线程同时执行完再执行后续数据处理的话用到countDownLatch.await()方法
            countDownLatch.await();
            return future.get() + future2.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("222---" + map.get("test1"));
        return "";
    }
}

5、启用异步任务

@SpringBootApplication
@EnableAsync  // 启用异步任务
public class MainApplication {
    public static void main(String[] args) {
        SpringApplication.run(MainApplication.class, args);
    }
}