为什么要用任务队列 linkblockqueue take

消费者生产者队列BlockQueue - CSDN博客
消费者生产者队列BlockQueue
import java.util.LinkedL
import java.util.concurrent.atomic.AtomicI
public class MyBlockQueue {
private final LinkedList&Object& list = new LinkedList&Object&();
private final AtomicInteger count = new AtomicInteger(0);
private final int minSize = 0;
private final int maxS
public MyBlockQueue(int maxSize){
this.maxSize = maxS
private final Object lock = new Object();
public void put(Object element){
synchronized (lock) {
if (count.get() == this.maxSize) {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
list.add(element);
count.incrementAndGet();
System.out.println("新加入元素: " + element);
lock.notify();
public Object take() {
Object take = null;
synchronized (lock) {
if (count.get() == this.minSize) {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
take = list.removeFirst();
count.decrementAndGet();
System.out.println("移除元素");
lock.notify();
public int getSize(){
return this.count.get();
同樣,我們可以使用ReentranLock,並配合try…finally語句塊來實現
本文已收录于以下专栏:
相关文章推荐
LinkedBlockingQueue的put,add和offer的区别 
      最近在学习&,有很多java.util.concurrent包下的新类。LinkedBlockingQueu...
Lambda表达式入门Java 8之前,大家应该有创建匿名内部类的体验,代码有点繁琐。Lambda表达式支持将代码块,作为方法参数,允许使用更简洁的代码,来创建只有一个抽象方法的接口的实例。Java ...
线程同步线程安全问题
当多个线程访问某一个类(对象或方法)时,这个对象始终都能表现出正确的行为,那么这个类(对象或方法)就是线程安全的。synchronized
可以在任意对象及方法上加锁,...
Class类文件的结构
魔数Magic Number与Class文件的版本Version
常量池Constant Pool
访问标志Access Flags
类索引父类索引与接口索引集合This Cl...
垃圾收集器
Serial收集器
Serial Old收集器
ParNew收集器
Parallel Scanvenge收集器
Parallel Old收集器
CMSConcurrent Mark Swe...
同步容器VectorHashtableCollections.synchronziedXX并发容器ConcurrentMap基於Hashtable,段Segement,最大16個把粒度分細,每個段有自...
垃圾收集算法
可达性分析算法强引用StringReference
软引用SoftReference
弱引用WeakReference
虚引用PhantomReference
引用计数算法Referen...
本文主要讲了Java中BlockingQueue的源码一、BlockingQueue介绍与常用方法BlockingQueue是一个阻塞队列。在高并发场景是用得非常多的,在线程池中。如果运行线程数目大于...
阻塞队列BlockingQueue的使用
强引用StringReference
创建对象,并把对象赋给一个引用变量,程序通过这个引用变量来操作对象,对象和数组都采用了强引用。
一个对象被一个或以上引用变量引用,则处于可达状态,不会被GC
他的最新文章
讲师:吴岸城
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)博客分类:
最近在研究blockqueue的源码,从今天开始,和大家分享一下我看源码的一些心得体会
(1)源码解析
(2)源码解析
LinkedBlockingQueue实现了BlockingQueue接口以及Serializable接口,是有序的FIFO队列,构造函数中,可传入一个最大容量值,如果没有传入,则默认是Integer.MAX_VALUE
一 首先看一下重要的几个类变量:
/** 保存当前队列中元素的个数 */
private final AtomicInteger count = new AtomicInteger(0);
* Invariant: head.item == null
private transient Node&E&
* Invariant: last.next == null
private transient Node&E&
/** 消费者锁,Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** 使消费者线程等待,直到被唤醒或者打断 */
private final Condition notEmpty = takeLock.newCondition();
/** 生产者锁,Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** 使生产者线程等待,直到被唤醒或者打断 */
private final Condition notFull = putLock.newCondition();
二 put方法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node&E& node = new Node(e);
final ReentrantLock putLock = this.putL
final AtomicInteger count = this.
putLock.lockInterruptibly();
while (count.get() == capacity) {
notFull.await();
enqueue(node);
c = count.getAndIncrement();
if (c + 1 & capacity)
notFull.signal();
} finally {
putLock.unlock();
if (c == 0)
signalNotEmpty();
执行过程如下:
1 如果传入元素为空,抛出空指针异常
2 获得put的锁,以及原子的count,然后lock,注意,是可打断的lock
3 判断当前队列是否饱和,若饱和,生产者线程进入等待状态
4 如果队列不饱和,则将元素包装为一个node放到队列中
5 count+1,如果count+1仍然小于队列的最大容量,则生产者线程被唤醒
6 在finnally中释放锁,最后唤醒消费者,提醒消费者可以从队列中取对象了
三 offer方法
offer提供了两个方法,一个方法是可传入等待时间,另一个则没有
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putL
final AtomicInteger count = this.
putLock.lockInterruptibly();
while (count.get() == capacity) {
if (nanos &= 0)
nanos = notFull.awaitNanos(nanos);
enqueue(new Node&E&(e));
c = count.getAndIncrement();
if (c + 1 & capacity)
notFull.signal();
} finally {
putLock.unlock();
if (c == 0)
signalNotEmpty();
执行过程如下:
1 如果传入元素为空,抛出空指针异常
2 根据传入的timeout和unit计算出最长等待的时间
3 获得put的锁,以及原子的count,表示当前队列中的元素个数,然后lock,注意,是可打断的lock
4 如果队列饱和,则超过等待时间后,直接返回false
5 以下处理构成同put
另外一个不带参数的方法,如果判断队列饱和,直接返回false,不阻塞
四 take方法
public E take() throws InterruptedException {
int c = -1;
final AtomicInteger count = this.
final ReentrantLock takeLock = this.takeL
takeLock.lockInterruptibly();
while (count.get() == 0) {
notEmpty.await();
x = dequeue();
c = count.getAndDecrement();
if (c & 1)
notEmpty.signal();
} finally {
takeLock.unlock();
if (c == capacity)
signalNotFull();
执行过程如下:
1 获得put的锁,以及原子的count,表示当前队列中的元素个数,然后lock,注意,是可打断的lock
2 如果当前队列为空,则阻塞
3 如果非空,则元素出列,count-1,如果count&1,表示队列非空,则消费者线程被唤醒
4 在finally中释放lock,唤醒生产者生产
五 pool方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.
final ReentrantLock takeLock = this.takeL
takeLock.lockInterruptibly();
while (count.get() == 0) {
if (nanos &= 0)
nanos = notEmpty.awaitNanos(nanos);
x = dequeue();
c = count.getAndDecrement();
if (c & 1)
notEmpty.signal();
} finally {
takeLock.unlock();
if (c == capacity)
signalNotFull();
执行过程如下:
1 获得put的锁,以及原子的count,表示当前队列中的元素个数,然后lock,注意,是可打断的lock
2 如果当前队列为空,则线程阻塞,并等待指定时间,如果在指定时间还是空,则直接返回空
3 以下过程同take
pool也提供了不带参数的方法,表示如果队列为空,则直接返回null,不阻塞
public E peek() {
if (count.get() == 0)
final ReentrantLock takeLock = this.takeL
takeLock.lock();
Node&E& first = head.
if (first == null)
return first.
} finally {
takeLock.unlock();
不阻塞,不移除队首元素
根据以上源码分析,得出方法之间的异同:
(1) 生产者方法:put,offer
(2)消费者方法:take,poll,peek
(3)put方法中,如果队列始终饱和,则当前线程会一直等待,直到有对象出列
offer(可传入等待时间),如果队列饱满,会等待指定的时间,如果在指定时间内还饱满,则直接返回false
offer(没有参数),如果队列饱满,直接返回false,线程不等待
put和offer(可传入等待时间)都是可被打断的
(4)take在队列为空时,会始终阻塞
poll分为带等待时间和不带的,如果不带等待时间,则不阻塞,移除队首元素(FIFO)
peek是不移除队首元素,不阻塞
wang7839186
浏览: 21604 次
来自: 北京
京晶啊 在这也能碰到你啊 呵呵
wang7839186 写道longfor5 写道请问楼主学习 ...
longfor5 写道请问楼主学习的教材是什么?能给个连接吗在 ...
请问楼主学习的教材是什么?能给个连接吗
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'博客分类:
BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具。
BlockingQueue有四个具体的实现类,根据不同需求,选择不同的实现类1、ArrayBlockingQueue:一个由数组支持的有界阻塞队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的。
2、LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的。
3、PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序。
4、SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。
LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。
生产者消费者的示例代码:
import java.util.concurrent.BlockingQ
public class Producer implements Runnable {
BlockingQueue&String&
public Producer(BlockingQueue&String& queue) {
this.queue =
public void run() {
String temp = "A Product, 生产线程:"
+ Thread.currentThread().getName();
System.out.println("I have made a product:"
+ Thread.currentThread().getName());
queue.put(temp);//如果队列是满的话,会阻塞当前线程
} catch (InterruptedException e) {
e.printStackTrace();
import java.util.concurrent.BlockingQ
public class Consumer implements Runnable{
BlockingQueue&String&
public Consumer(BlockingQueue&String& queue){
this.queue =
public void run() {
String temp = queue.take();//如果队列为空,会阻塞当前线程
System.out.println(temp);
} catch (InterruptedException e) {
e.printStackTrace();
import java.util.concurrent.ArrayBlockingQ
import java.util.concurrent.BlockingQ
import java.util.concurrent.LinkedBlockingQ
public class Test3 {
public static void main(String[] args) {
BlockingQueue&String& queue = new LinkedBlockingQueue&String&(2);
// BlockingQueue&String& queue = new LinkedBlockingQueue&String&();
//不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE
// BlockingQueue&String& queue = new ArrayBlockingQueue&String&(2);
Consumer consumer = new Consumer(queue);
Producer producer = new Producer(queue);
for (int i = 0; i & 5; i++) {
new Thread(producer, "Producer" + (i + 1)).start();
new Thread(consumer, "Consumer" + (i + 1)).start();
打印结果:
I have made a product:Producer1
I have made a product:Producer2
A Product, 生产线程:Producer1
A Product, 生产线程:Producer2
I have made a product:Producer3
A Product, 生产线程:Producer3
I have made a product:Producer5
I have made a product:Producer4
A Product, 生产线程:Producer5
A Product, 生产线程:Producer4
由于队列的大小限定成了2,所以最多只有两个产品被加入到队列当中,而且消费者取到产品的顺序也是按照生产的先后顺序,原因就是LinkedBlockingQueue和ArrayBlockingQueue都是按照FIFO的顺序存取元素的。
浏览 26482
tonlion2046
浏览: 309466 次
来自: 杭州
支持分布式吗
qingchenyuji 写道您好,
看了您的博文,想向您请 ...
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'本帖子已过去太久远了,不再提供回复功能。并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法 - CSDN博客
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。
注:什么叫线程安全?这个首先要明确。线程安全就是说多线程访问同一代码,不会产生不确定的结果。
LinkedBlockingQueue由于LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,是作为生产者消费者的首选,LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。
package cn.
import java.util.concurrent.BlockingQ
import java.util.concurrent.ExecutorS
import java.util.concurrent.E
import java.util.concurrent.LinkedBlockingQ
* 多线程模拟实现生产者/消费者模型
* @author 林计钦
* @version 1.0
下午05:23:11
public class BlockingQueueTest2 {
* 定义装苹果的篮子
public class Basket {
// 篮子,能够容纳3个苹果
BlockingQueue&String& basket = new LinkedBlockingQueue&String&(3);
// 生产苹果,放入篮子
public void produce() throws InterruptedException {
// put方法放入一个苹果,若basket满了,等到basket有位置
basket.put("An apple");
// 消费苹果,从篮子中取走
public String consume() throws InterruptedException {
// take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部)
return basket.take();
// 定义苹果生产者
class Producer implements Runnable {
public Producer(String instance, Basket basket) {
this.instance =
this.basket =
public void run() {
while (true) {
// 生产苹果
System.out.println("生产者准备生产苹果:" + instance);
basket.produce();
System.out.println("!生产者生产苹果完毕:" + instance);
// 休眠300ms
Thread.sleep(300);
} catch (InterruptedException ex) {
System.out.println("Producer Interrupted");
// 定义苹果消费者
class Consumer implements Runnable {
public Consumer(String instance, Basket basket) {
this.instance =
this.basket =
public void run() {
while (true) {
// 消费苹果
System.out.println("消费者准备消费苹果:" + instance);
System.out.println(basket.consume());
System.out.println("!消费者消费苹果完毕:" + instance);
// 休眠1000ms
Thread.sleep(1000);
} catch (InterruptedException ex) {
System.out.println("Consumer Interrupted");
public static void main(String[] args) {
BlockingQueueTest2 test = new BlockingQueueTest2();
// 建立一个装苹果的篮子
Basket basket = test.new Basket();
ExecutorService service = Executors.newCachedThreadPool();
Producer producer = test.new Producer("生产者001", basket);
Producer producer2 = test.new Producer("生产者002", basket);
Consumer consumer = test.new Consumer("消费者001", basket);
service.submit(producer);
service.submit(producer2);
service.submit(consumer);
// 程序运行5s后,所有任务停止
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
service.shutdownNow();
ConcurrentLinkedQueueConcurrentLinkedQueue是Queue的一个安全实现.Queue中元素按FIFO原则进行排序.采用CAS操作,来保证元素的一致性。LinkedBlockingQueue是一个线程安全的阻塞队列,它实现了BlockingQueue接口,BlockingQueue接口继承自java.util.Queue接口,并在这个接口的基础上增加了take和put方法,这两个方法正是队列操作的阻塞版本。
package cn.
import java.util.concurrent.ConcurrentLinkedQ
import java.util.concurrent.CountDownL
import java.util.concurrent.ExecutorS
import java.util.concurrent.E
public class ConcurrentLinkedQueueTest {
private static ConcurrentLinkedQueue&Integer& queue = new ConcurrentLinkedQueue&Integer&();
private static int count = 2; // 线程个数
//CountDownLatch,一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
private static CountDownLatch latch = new CountDownLatch(count);
public static void main(String[] args) throws InterruptedException {
long timeStart = System.currentTimeMillis();
ExecutorService es = Executors.newFixedThreadPool(4);
ConcurrentLinkedQueueTest.offer();
for (int i = 0; i & i++) {
es.submit(new Poll());
latch.await(); //使得主线程(main)阻塞直到latch.countDown()为零才继续执行
System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");
es.shutdown();
public static void offer() {
for (int i = 0; i & 100000; i++) {
queue.offer(i);
* @author 林计钦
* @version 1.0
下午05:32:56
static class Poll implements Runnable {
public void run() {
// while (queue.size()&0) {
while (!queue.isEmpty()) {
System.out.println(queue.poll());
latch.countDown();
运行结果:costtime 2360ms
改用while (queue.size()&0)后运行结果:cost time 46422ms
结果居然相差那么大,看了下ConcurrentLinkedQueue的API原来.size()是要遍历一遍集合的,难怪那么慢,所以尽量要避免用size而改用isEmpty().
总结了下, 在单位缺乏性能测试下,对自己的编程要求更加要严格,特别是在生产环境下更是要小心谨慎。
本文已收录于以下专栏:
相关文章推荐
队列、链表之类的数据结构及其常用。Java中,ArrayList和Vector都是使用数组作为其内部实现。两者最大的不同在于:Vector是线程安全的,而ArrayList不是。此外LinkedLis...
由于2采用读写锁的形式对读写进行控制,可能会在锁的获取与释放上损失一定的性能。所以当有多个消费者时多用1。
而对于2,我们在其源码中可以看到,获取队首元素有take与poll方法,这两者的最本质区别在...
1 Test.java代码参考public class ConcurrentQueueTest {
private static int COUNT = 100000;
private sta...
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出...
在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出)。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQ...
在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非...
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列。Java提...
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
来源:/linjiqin/archive/2013...
他的最新文章
讲师:吴岸城
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)}

我要回帖

更多关于 linkblockqueue take 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信