博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
并发编程 06—— CompletionService :Executor 和 BlockingQueue
阅读量:5104 次
发布时间:2019-06-13

本文共 6417 字,大约阅读时间需要 21 分钟。

Java并发编程实践 目录

 

 

概述

 

第1部分 问题引入

《Java并发编程实践》一书6.3.5节CompletionService:Executor和BlockingQueue,有这样一段话:

  "如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务CompletionService。"

这是什么意思呢?通过一个例子,分别使用繁琐的做法和CompletionService来完成,清晰的对比能让我们更好的理解上面的一段话和CompletionService这个API提供的初衷。

第2部分 实例

考虑这样的场景,有5个Callable任务分别返回5个整数,然后我们在main方法中按照各个任务完成的先后顺序,在控制台打印返回结果。

 

1 package com.concurrency.TaskExecution_6; 2  3 import java.util.concurrent.Callable; 4 import java.util.concurrent.TimeUnit; 5  6 /** 7  *  8  * @ClassName: ReturnAfterSleepCallable 9  * TODO10  * @author Xingle11  * @date 2014-9-16 上午9:20:3412  */13 public class ReturnAfterSleepCallable implements Callable
{14 15 private int sleepSeconds; 16 private int returnValue;17 18 public ReturnAfterSleepCallable(int sleepSeconds,int returnValue){19 this.sleepSeconds = sleepSeconds;20 this.returnValue = returnValue;21 }22 23 24 @Override25 public Integer call() throws Exception {26 System.out.println("begin to execute ");27 28 TimeUnit.SECONDS.sleep(sleepSeconds);29 return returnValue;30 }31 32 }

 

1.繁琐的做法

  通过一个List来保存每个任务返回的Future,然后轮询这些Future,直到每个Future都已完成。我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,所以在调用get方式时,需要将超时时间设置为0。

1 package com.concurrency.TaskExecution_6; 2  3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.concurrent.ExecutionException; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8 import java.util.concurrent.Future; 9 10 /**11  * 传统的繁琐做法12  * @ClassName: TraditionalTest13  * TODO14  * @author Xingle15  * @date 2014-9-16 上午10:06:2116  */17 public class TraditionalTest {18     19     public static void main(String[] args){20         int taskSize = 5;21         ExecutorService executor = Executors.newFixedThreadPool(taskSize);22         List
> futureList = new ArrayList
>();23 24 for(int i= 1; i<=taskSize; i++){25 int sleep = taskSize -1;26 int value = i;27 //向线程池提交任务28 Future
future = executor.submit(new ReturnAfterSleepCallable(sleep, value));29 //保留每个任务的Future30 futureList.add(future);31 }32 // 轮询,获取完成任务的返回结果33 while(taskSize > 0){34 for (Future
future : futureList){35 Integer result = null;36 try {37 result = future.get();38 } catch (InterruptedException e) {39 e.printStackTrace();40 } catch (ExecutionException e) {41 e.printStackTrace();42 } 43 //任务已经完成44 if(result!=null){45 System.out.println("result = "+result);46 //从future列表中删除已经完成的任务47 futureList.remove(future);48 taskSize --;49 break; 50 }51 }52 }53 // 所有任务已经完成,关闭线程池 54 System.out.println("all over ");55 executor.shutdown();56 }57 58 }

 

执行结果:

 

2.使用CompletionService

1 package com.concurrency.TaskExecution_6; 2  3 import java.util.concurrent.CompletionService; 4 import java.util.concurrent.ExecutionException; 5 import java.util.concurrent.ExecutorCompletionService; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8  9 /**10  * 使用CompletionService11  * @ClassName: CompletionServiceTest12  * TODO13  * @author Xingle14  * @date 2014-9-16 上午11:32:4515  */16 public class CompletionServiceTest {17     18     public static void main(String[] args){19         int taskSize = 5;20         ExecutorService executor = Executors.newFixedThreadPool(taskSize);21         // 构建完成服务 22         CompletionService
completionService = new ExecutorCompletionService
(executor);23 24 for (int i=1;i<= taskSize; i++){25 // 睡眠时间 26 int sleep = taskSize - i;27 // 返回结果 28 int value = i;29 //向线程池提交任务30 completionService.submit(new ReturnAfterSleepCallable(sleep, value));31 try {32 System.out.println("result:"+completionService.take().get());33 } catch (InterruptedException e) {34 e.printStackTrace();35 } catch (ExecutionException e) {36 e.printStackTrace();37 }38 }39 40 System.out.println("all over. ");41 executor.shutdown();42 43 }44 45 }

 

执行结果:

 

3.CompletionService和ExecutorCompletionService的实现

 JDK源码中CompletionService的javadoc说明如下:

/** * A service that decouples the production of new asynchronous tasks * from the consumption of the results of completed tasks.  Producers * submit tasks for execution. Consumers take * completed tasks and process their results in the order they * complete.  */

 

也就是说,CompletionService实现了生产者提交任务和消费者获取结果的解耦,生产者和消费者都不用关心任务的完成顺序,由CompletionService来保证,消费者一定是按照任务完成的先后顺序来获取执行结果。
ExecutorCompletionService是CompletionService的实现,融合了线程池Executor和阻塞队列BlockingQueue的功能。
public ExecutorCompletionService(Executor executor) {        if (executor == null)            throw new NullPointerException();        this.executor = executor;        this.aes = (executor instanceof AbstractExecutorService) ?            (AbstractExecutorService) executor : null;        this.completionQueue = new LinkedBlockingQueue
>(); }
到这里可以推测,按照任务的完成顺序获取结果,就是通过阻塞队列实现的,阻塞队列刚好具有这样的性质:阻塞和有序。
 
ExecutorCompletionService任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture
public Future
submit(Callable
task) { if (task == null) throw new NullPointerException(); RunnableFuture
f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f;}

 

QueueingFuture是FutureTask的一个子类,通过改写FutureTask类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。

private class QueueingFuture extends FutureTask
{ QueueingFuture(RunnableFuture
task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future
task; }

 

FutureTask.done(),这个方法默认什么都不做,就是一个回调,当提交的线程池中的任务完成时,会被自动调用。这也就说时候,当任务完成的时候,会自动执行QueueingFuture.done()方法,将返回结果加入到阻塞队列中,加入的顺序就是任务完成的先后顺序。

 

转载于:https://www.cnblogs.com/xingele0917/p/3974187.html

你可能感兴趣的文章
好玩的-记最近玩的几个经典ipad ios游戏
查看>>
MySQL更改默认的数据文档存储目录
查看>>
PyQt5--EventSender
查看>>
Sql Server 中由数字转换为指定长度的字符串
查看>>
Java 多态 虚方法
查看>>
Unity之fragment shader中如何获得视口空间中的坐标
查看>>
万能的SQLHelper帮助类
查看>>
tmux的简单快捷键
查看>>
[Swift]LeetCode922.按奇偶排序数组 II | Sort Array By Parity II
查看>>
Html5 离线页面缓存
查看>>
《绿色·精简·性感·迷你版》易语言,小到不可想象
查看>>
Android打包key密码丢失找回
查看>>
VC6.0调试技巧(一)(转)
查看>>
类库与框架,强类型与弱类型的闲聊
查看>>
webView添加头视图
查看>>
php match_model的简单使用
查看>>
在NT中直接访问物理内存
查看>>
Intel HEX 文件格式
查看>>
SIP服务器性能测试工具SIPp使用指导(转)
查看>>
php_扑克类
查看>>