<返回更多

Java内置条件队列应用,实现经典的生产者消费者算法

2020-11-10    
加入收藏

背景

“生产者和消费者模型” 是多线程通信的典型案例,本章节将利用前一节的锁和条件队列的知识,来实现一个完整的有界缓冲区,并创建多个线程访问该有界缓冲区,模拟生产者提供数据、消费者处理数据的过程,正文如下。

生产者消费者模型

生产者和消费者模型中,因为多个线程共享同一个缓冲区,所以就涉及到两个重要的通信约束:

  1. 缓冲区满的时候,生产者不能再添加数据,应该阻塞等待,直到缓冲区有空位;
  2. 缓冲区空的时候,消费者不能再获取数据,应该阻塞等待,直到有新的数据加入缓冲区。

要保证上述约束条件,可以用 sleep 空循环,也可以使用锁和条件队列。利用锁和条件队列实现的思路是,生产者和消费者有各自要等待的条件,一旦条件不满足,就阻塞在该条件队列上,直到另一个线程唤醒自己。

实现过程

缓冲区的 “满” 和 “空” 是两个条件,如果用内置锁,对缓冲区的操作由同一把锁保护,只能共用一个条件队列;如果使用显式锁,则可以定义两个条件队列。

这里我们就用内置锁和内置条件队列来实现一个通信模型中的共享缓冲区类。设计类图结构:

Java内置条件队列应用,实现经典的生产者消费者算法

 

抽象有界限缓存

首先,创建一抽象有界缓存类 ABoundedBuffer,提供插入和删除的基本实现。

/**
 * @title       :ABoundedBuffer
 * @description :有界缓存抽象类
 * @update      :2019-12-20 上午9:29:33
 * @author      :172.17.5.73
 * @version     :1.0.0
 * @since       :2019-12-20
 */
public abstract class ABoundedBuffer<V> {
	private final V[] buf;
	private int tail;
	private int head;
	private int count;
	
	protected ABoundedBuffer(int capacity){
		this.buf = (V[]) new Object[capacity];
	}
	
	protected synchronized final void doPut(V v){
		buf[tail] = v;
		if(++tail==buf.length){
			tail = 0;
		}
		
		++count;
	}

	protected synchronized final V doTake(){
		V v = buf[head];
		buf[head] = null;
		if(++head==buf.length){
			head = 0;
		}
		--count;
		return v;
	}
	
	public synchronized final boolean isFull(){
		return count == buf.length;
	}
	
	public synchronized final boolean isEmpty(){
		return count==0;
	}
}

定义实现类

其次,利用内置条件队列,编写子类实现可阻塞的插入和删除操作。

插入操作,依赖的条件是缓存非满,当条件不满足时,调用 wait 方法挂起线程,一旦插入成功,说明缓存非空,则调用 notifyAll 方法唤醒等待非空的线程。

删除操作,依赖的条件是非空,当条件不满足时,同样挂起等待,一旦删除成功,说明缓存非满,唤起等待该条件的线程。

完整的源码为:

import JAVA.util.Date;

/**
 * 
 * @title       :InnerConditionQueue
 * @description :使用内置条件队列,实现简单的有界缓存
 *               通过对象的 wait 和 notify 来实现挂起
 *               锁对象是 this,调用 wait/notify 的对象是同一个对象。
 *               三元关系(锁、wait/notify、条件谓词)
 *               缺陷:
 *               线程从 wait 中被唤醒时,并不代码条件谓词为真,此时还是需要再判断条件。所以必须在循环中调用wait
 *               每次醒来时都判断谓词的真假。
 *               谓词:对客体的描述或说明(是什么、怎么样、做什么),描述客体的本质、关系、特性等的词项。
 * @update      :2019-12-20 下午4:18:06
 * @author      :172.17.5.73
 * @version     :1.0.0
 * @since       :2019-12-20
 */
public class InnerConditionQueue<V> extends ABoundedBuffer<V> {

	protected InnerConditionQueue(int capacity) {
		super(capacity);
	}

	public synchronized void put(V v) throws InterruptedException{
		while(isFull()){
			System.out.println(new Date()+" buffer is Full thread wait:"+Thread.currentThread().getName());
			wait();
		}
		
		doPut(v);
		notifyAll();
	}
	
	public synchronized V take() throws InterruptedException{
		while(isEmpty()){
			System.out.println(new Date()+" buffer is empty thread wait:"+Thread.currentThread().getName());
			wait();
		}
		
		V v = doTake();
		//每当在等待一个条件时,一定要确保在条件谓词变为真时,通过某种方式发出通知
		notifyAll();
		System.out.println(new Date()+" "+Thread.currentThread().getName()+" take:"+v);
		return v;
	}
}

测试类

最后,编写测试代码,创建一个大小为 2 的缓冲区对象,同时启动三个线程执行插入操作,主线程执行四次消费操作。测试代码如下:

import java.util.Date;

public class Main {
	public static void main(String[] args) {
		final InnerConditionQueue<String> bu = new InnerConditionQueue<String>(2);
		
		Thread t1 = new Thread(new Runnable(){
			@Override
			public void run() {
				try {
					bu.put("hello1");
				} catch (InterruptedException execption) {
					System.out.println("intercetp1:"+Thread.currentThread().getName());
				}
			}
		});
		Thread t2 = new Thread(new Runnable(){
			@Override
			public void run() {
				try {
					bu.put("hello2");
				} catch (InterruptedException execption) {
					System.out.println("intercetp2:"+Thread.currentThread().getName());
				}
			}
		});
		Thread t3 =  new Thread(new Runnable(){
			@Override
			public void run() {
				try {
					bu.put("hello3");
					Thread.sleep(50000);
					bu.put("last one...");
				} catch (InterruptedException execption) {
					System.out.println("intercetp3:"+Thread.currentThread().getName());
				}
			}
		});
		
		t1.start();
		t2.start();
		t3.start();
		
		try {
			Thread.sleep(5000);
			bu.take();
			bu.take();
			bu.take();
			bu.take();
		} catch (InterruptedException execption) {
			execption.printStackTrace();
		}
		
		System.out.println(new Date()+" main over...");
	}
}

测试结果

执行结果:t3 的第一个 put 操作会因为缓存已满而阻塞,5 秒后主线程删除两个操作后,重新被唤醒。主线程的第四个 bu.take() 操作会因为缓存为空而阻塞,直到 t3 在 50 秒后重新插入"last one" 后被唤醒,操作结束。

Tue Dec 20 10:23:53 CST 2019 buffer is Full thread wait:Thread-2
Tue Dec 20 10:23:58 CST 2019 main take:hello1
Tue Dec 20 10:23:58 CST 2019 main take:hello2
Tue Dec 20 10:23:58 CST 2019 buffer is empty thread wait:main
Tue Dec 20 10:23:58 CST 2019 main take:hello3
Tue Dec 20 10:23:58 CST 2019 buffer is empty thread wait:main
Tue Dec 20 10:24:48 CST 2019 main take:last one...
Tue Dec 20 10:24:48 CST 2019 main over...

启示录

我们的例子中,“非空” 和 “非满” 这两种条件关联着同一个条件队列,当一个线程由于其他线程调用了notifyAll 而被唤醒时,并不意味着它等待的条件已经为真了,这也是内置条件队列的局限所在。

所以代码中的加固措施是,使用循环判断条件是否发生,如果发生,则调用 wait 阻塞自己,等待其他线程唤醒:

while(isFull()){
	System.out.println(new Date()+" buffer is Full thread wait:"+Thread.currentThread().getName());
	wait();
}

同样的功能,Java 并发包中的 ArrayBlockingQueue 是使用 ReentrantLock 和 ObjectCondition 实现的可阻塞队列,为什么 JDK 使用显式锁和显式条件队列呢?

使用内置锁的局限性在于一把锁只有一个条件队列,而这里涉及到两种等待条件,所以使用 ReentrantLock 更合适,它可以关联多个条件队列,这样就可以巧妙地处理多条件的阻塞和唤醒了!

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>