<返回更多

JDK CompletionService 理解

2023-04-19  今日头条  huohuo494
加入收藏

有一个赛跑比赛,4个人参赛,给前三名颁发得金银铜牌,用代码怎么写呢

 

//运动员
@Data
@RequiredArgsConstructor
public static class Runner {
    private final String name;
    private Integer score;
}
// 颁奖类
public static class AwardUtils {
    private static BlockingQueue<String> awardQueue = new LinkedBlockingQueue<>(3);
    static {
        awardQueue.offer("金牌");
        awardQueue.offer("银牌");
        awardQueue.offer("铜牌");
    }
    public static void winAward(String name) {
        String award = awardQueue.poll();
        if (award != null) {
            log.info("{} 获得了 {}", name,  award);
        } else {
            log.info("{} 没有获得奖牌", name);
        }
    }
}

普通做法:我们可以等运动员跑玩,看谁用时少来颁奖

CountDownLatch countDownLatch = new CountDownLatch(1);
// 赛跑任务
Function<Runner, Callable<Runner>> runTask = (runner) -> () -> {
    countDownLatch.await();
    int time = ThreadLocalRandom.current().nextInt(10, 20);
    runner.setScore(time);
    TimeUnit.SECONDS.sleep(time);
    log.info("{} 跑了 {} 秒", runner.getName(), time);
    return runner;
};
ExecutorService executor = Executors.newFixedThreadPool(4);
List<Future<Runner>> results = Arrays.asList(
        executor.submit(runTask.Apply(new Runner("小明"))),
        executor.submit(runTask.apply(new Runner("小鹏"))),
        executor.submit(runTask.apply(new Runner("小张"))),
        executor.submit(runTask.apply(new Runner("小李")))
);
countDownLatch.countDown();
results.stream().map(future -> {
    try {
        return future.get(20, TimeUnit.SECONDS);
    } catch (TimeoutException | InterruptedException | ExecutionException e) {
        throw new RuntimeException(e);
    }
}).sorted((runner1, runner2) -> {
    int score1 = runner1.getScore();
    int score2 = runner2.getScore();
    return Integer.compare(score1, score2);
}).forEach(runner -> {
    AwardUtils.winAward(runner.getName());
});
11:21:38,915 [pool-1-thread-2] INFO - 小鹏 跑了 11 秒
11:21:42,908 [pool-1-thread-4] INFO - 小李 跑了 15 秒
11:21:43,901 [pool-1-thread-3] INFO - 小张 跑了 16 秒
11:21:43,901 [pool-1-thread-1] INFO - 小明 跑了 16 秒
11:21:43,902 [main] INFO - 小鹏 获得了 金牌
11:21:43,902 [main] INFO - 小李 获得了 银牌
11:21:43,902 [main] INFO - 小明 获得了 铜牌
11:21:43,902 [main] INFO - 小张 没有获得奖牌

CompletionService:有人跑完了就把奖牌给他,不用等其他人是否跑完
主要功能就是一边执行任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不依赖任务顺序

CountDownLatch countDownLatch = new CountDownLatch(1);
Function<Runner, Callable<Runner>> runTask = (runner) -> () -> {
    countDownLatch.await();
    int time = ThreadLocalRandom.current().nextInt(10, 20);
    runner.setScore(time);
    TimeUnit.SECONDS.sleep(time);
    log.info("{} 跑了 {} 秒", runner.getName(), time);
    return runner;
};
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletionService<Runner> completionService = new ExecutorCompletionService<>(executor);
completionService.submit(runTask.apply(new Runner("小明")));
completionService.submit(runTask.apply(new Runner("小鹏")));
completionService.submit(runTask.apply(new Runner("小张")));
completionService.submit(runTask.apply(new Runner("小李")));
countDownLatch.countDown();
for (int i = 0; i < 4; i++) {
    AwardUtils.winAward(completionService.take().get().getName());
}
11:11:15,125 [pool-1-thread-3] INFO  - 小张 跑了 10 秒
11:11:15,130 [main] INFO  - 小张 获得了 金牌
11:11:19,122 [pool-1-thread-1] INFO   - 小明 跑了 14 秒
11:11:19,122 [main] INFO   - 小明 获得了 银牌
11:11:20,125 [pool-1-thread-4] INFO   - 小李 跑了 15 秒
11:11:20,125 [main] INFO   - 小李 获得了 铜牌
11:11:22,132 [pool-1-thread-2]  - 小鹏 跑了 17 秒
11:11:22,132 [main] INFO  - 小鹏 没有获得奖牌

ExecutorCompletionService 类中维护一个了 BlockingQueue;

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final BlockingQueue<Future<V>> completionQueue;
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
}

QueueingFuture 重写了done 方法 ,done 方法会在完成或取消任务时执行,将其加入队列

private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}
声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>