在生产者和消费者模型中,我们会使用到阻塞队列。java给我们提供了七种阻塞队列,基本上能满足各种使用 场景了。下面我们来分析一下:
阻塞队列
何为阻塞队列?
一看到这个问题,很可能会想起来ConcurrentLinkedQueue是一种非阻塞的实现方式,阻塞队列具有 如下两个性质:
支持阻塞的插入方法,就是在队列满的时候,队列会阻塞插入元素的线程,直到队列不满
支持阻塞的移除方法,就是在队列为空的时候,队列会阻塞获取元素的线程,直到队列非空
当阻塞队列不可用时,它提供了四种处理方式:
抛出异常:add(e)方法会在队列已满的时候,抛出异常IllegalStateException("Queue full"),使用 remove方法在队列为空的时候抛出NoSuchElementException异常。
返回特殊值:offer(e)方法往队列中插入元素的时候,会返回是否插入成功,poll方法从队列中拿 元素时,如果没有则返回null。
一直阻塞:调用put(e)方法,当队列已满的时候,队列会阻塞生产者线程,调用take()方法,当队列 为空的时候,会阻塞线程直到队列不为空。
超时退出:在上述第二种情况中,offer和poll方法分别传入时间参数,队列会阻塞线程一段时间, 如果超时则退出线程。
现在我们终于对队列的这些方法有了个明确的认识,下面就看看java中提供了那些BlockingQueue吧。
Java中有哪些阻塞队列?
Java中一共提供了7种阻塞队列,分析如下:
ArrayBlockingQueue,一个由数组结构构成的有界阻塞队列。默认的情况下不保证公平的访问队列, 也就是说最先阻塞的线程可能最后才访问队列,我们可以通过设置参数来保证队列的公平性,但是这样会 影响队列的吞吐量。
LinkedBolckingQueue,一个用链表实现的有界阻塞队列。线程池中Executor.newFixedThreadPool 使用的就是这个队列。
PriorityBlockingQueue,一个支持优先级的无界阻塞队列,默认情况下采取自然升序排列,也可以 通过compareTo方法来指定元素的排序规则。但是通优先级的情况下并不能保证顺序。
DelayQueue,一个支持延时获取元素的无界阻塞队列,队列使用PriorityQueue来实现,队列中的元 素必须实现Delayed接口。创建元素的时候可以指定多久才能从队列中获取元素,只有在元素的延时期满的 时候才能从队列中获取它。网上说这个队列非常有用,还有待场景的剖析···
SynchronousQueue,一个不存储元素的阻塞队列,每一个put操作必须等待一个take操作,否则不能 继续添加元素。如上文所述,它也可以构造其公平访问的策略,它就相当于一个数据的传递者,将数据从 生产者处传递给消费者,并且它的效率要高于ArrayBlockingQueue和LinkedBlockQueue。
LinkedTransferQueue,是一个由链表结构组成的无界阻塞TransferQueue队列。
LinkedBlockingDeque,是一个由链表结构组成的双向阻塞队列,所谓双向队列你可以从队列的两端 移出或者插入元素。
阻塞队列如何实现?
java中的阻塞队列只用通知模式实现,所谓通知模式,就是当往一个满队列中添加元素的时候,生产者 阻塞,当有消费者从队列中拿走一个元素的时候,会通知生产者,队列可用。至于如何实现对一个线程的 阻塞和唤醒,则需要去看JVM底层的源码了。
Fork/Join框架
何为Fork/Join框架?
Fork/Join是Java7提供的一个用于执行并行任务的框架,简单地说Fork就是将一个大的任务切分成诺干个 小任务并行执行,Join就是将这些小任务的结果合并起来,得到大任务的结果。
这里有一个很有趣的算法就是工作窃取算法,当我们将一个比较大的任务分割成若干互不干扰的任务的 时候,有的线程会先把任务干完,而有的线程的任务队列里还有任务待处理,这时候干完活的线程会等待 还没做完任务的线程返回,与其让它等着,不如让它去帮助干活。这么时候它们会访问同一个队列,可能 会产生竞争,为了减少竞争,可以使用双端队列。
我们了解了fork/join框架的使用场景,明确了使用它的两个步骤:
分割任务:我们需要一个fork类来将大任务分割成子任务。
执行任务并合并结果,分割的子任务分别放在双端队列里,然后启动几个线程对双端队列中的任务执行, 并将执行完的结果放入另一个队列中,启动一个线程从队列中拿数据直到完成最后的结果。
如何使用Fork/Join框架?
import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; public class CountTask extends RecursiveTask<Integer>{ private static final int THRESHOLD =2; private int start; private int end; public CountTask(int start,int end){ this.start = start; this.end = end; } protected Integer compute(){ int sum=0; boolean canCompute=(end-start)<=THRESHOLD; if(canCompute){ for(int i=start;i<=end;i++){ sum+=i; } }else{ int middle = (start+end)/2; CountTask leftTask=new CountTask(start,middle); CountTask rightTask = new CountTask(middle+1,end); leftTask.fork(); rightTask.fork(); int leftResult=leftTask.join(); int rightResult=rightTask.join(); sum = leftResult + rightResult; } return sum; } public static void main(String[] args){ ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(1,4); Future<Integer> result = forkJoinPool.submit(task); try{ System.out.println(result.get()); }catch(InterruptedException e){ }catch(ExecutionException e){ } } }
上述代码用Fork/Join框架演绎了1+2+3+4的任务分解与合并。
Fork/Join框架如何实现?
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,前者负责存放程序提交给ForkJoinPool 的任务,后者主要负责执行任务。大致的工作流程如下:
ForkJoinTask的fork方法会调用ForkJoinWorkerThread的pushTask方法,pushTask方法将当前任务放在ForkJoinTask数组queue里。
然后调用ForkJoinPool的signalWork()方法唤醒或是创建一个工作线程来执行任务。
ForkJoinTask的join方法主要是阻塞当前线程并等待获取结果。
上面的过程已经相当清晰了
以上学习了java的各种并发容器和框架,不仅是学习层面上,在以后开发过程中也要好好体会。