Java实现生产者消费者问题的多种方式 - Sanarous的博客

Java实现生产者消费者问题的多种方式

什么是生产者消费者问题

生产者-消费者模式的本质是一个多线程同步问题,即生产者生成一定数量的数据放入缓冲区,然后重复此过程;与此同时,消费者也在缓冲区中消耗这些数据。乍一想这个问题似乎很简单,只要按照顺序执行就可以了,但是在多线程环境下,如何按照“顺序”执行就成了一个非常突出的问题。多线程环境下,生产者和消费者线程之间必须保持同步,要保证生成不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据,如果解决办法不够完善,那么很容易出现死锁的情况。

生产者消费者问题的解决思路

思路

而解决这个问题可以分为两类:

  1. 采用某种机制保护生产者-消费者之间的同步
  2. 在生产者和消费者之间建立一个管道。

第一种方式效率较高,并且易于实现,代码的可控制性较好,属于常用的模式。

第二种方式由于管道缓冲区不易控制,被传输的对象不易封装等,实用性不是很强,但是也有方法去实现。

解决问题的核心

保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。

Java实现的几种方式

  1. wait() / notify() 模式
  2. await() / signal()模式
  3. BlockingQueue阻塞队列方式
  4. 信号量
  5. 管道

代码实现

使用wait()/notify()方法

当缓冲区已满时,生产者线程执行,放弃锁,使自己处于等待状态,让其它线程执行;

当缓冲区已空时,消费者停止执行,放弃锁,使自己处于等待状态,让其它线程执行。

当生产者向缓冲区放入一个产品时,向其它等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态;

当消费者从缓冲区取出一个产品时,向其它等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.LinkedList;

public class Storage {

// 仓库容量
private final int MAX_SIZE = 10;
// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<>();

//生产
public void produce() {
//加锁
synchronized (list) {
while (list.size() + 1 > MAX_SIZE) {
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】仓库已满");
try {
//挂起当前线程
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + list.size());
list.notifyAll();
}
}

//消费
public void consume() {
//加锁
synchronized (list) {
while (list.size() == 0) {
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】仓库为空");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.remove();
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + list.size());
list.notifyAll();
}
}
}

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Producer implements Runnable{
private Storage storage;
public Producer(){}

public Producer(Storage storage , String name){
this.storage = storage;
}

@Override
public void run(){
while(true){
try{
Thread.sleep(1000); //避免执行太快,睡1s执行一次
storage.produce(); //生产产品
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Consumer implements Runnable{
private Storage storage;

public Consumer(){}

public Consumer(Storage storage , String name){
this.storage = storage;
}

@Override
public void run(){
while(true){
try{
Thread.sleep(3000); //避免执行太快,睡3s再消费
storage.consume();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}

测试效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Main {

public static void main(String[] args) {
Storage storage = new Storage();
//3个生产者线程
Thread p1 = new Thread(new Producer(storage));
Thread p2 = new Thread(new Producer(storage));
Thread p3 = new Thread(new Producer(storage));

//3个消费者线程
Thread c1 = new Thread(new Consumer(storage));
Thread c2 = new Thread(new Consumer(storage));
Thread c3 = new Thread(new Consumer(storage));

p1.start();
p2.start();
p3.start();
c1.start();
c2.start();
c3.start();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
【生产者p1】生产一个产品,现库存1
【生产者p2】生产一个产品,现库存2
【生产者p3】生产一个产品,现库存3
【生产者p1】生产一个产品,现库存4
【生产者p2】生产一个产品,现库存5
【生产者p3】生产一个产品,现库存6
【生产者p1】生产一个产品,现库存7
【生产者p2】生产一个产品,现库存8
【消费者c1】消费一个产品,现库存7
【生产者p3】生产一个产品,现库存8
【消费者c2】消费一个产品,现库存7
【消费者c3】消费一个产品,现库存6
【生产者p1】生产一个产品,现库存7
【生产者p2】生产一个产品,现库存8
【生产者p3】生产一个产品,现库存9
【生产者p1】生产一个产品,现库存10
【生产者p2】仓库已满
【生产者p3】仓库已满
【生产者p1】仓库已满
【消费者c1】消费一个产品,现库存9
【生产者p1】生产一个产品,现库存10
【生产者p3】仓库已满
。。。。。。以下省略

一个生产者线程运行produce方法,睡眠1s;一个消费者运行一次consume方法,睡眠3s。此次实验过程中,有3个生产者和3个消费者,也就是我们说的多对多的情况。仓库的容量为10,可以看出消费的速度明显慢于生产的速度,符合设定。

notifyAll()方法可使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。此时,优先级最高的哪个线程最先执行,但也有可能是随机执行的,这要取决于JVM虚拟机的实现。即最终也只有一个线程能被运行,上述线程优先级都相同,每次运行的线程都不确定是哪个,后来给线程设置优先级后也跟预期不一样,还是要看JVM的具体实现吧。

使用await()/signal()方法

在JDK5中,用ReentrantLockCondition可以实现等待/通知模型,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

这种方式跟第一种方式基本一样,只不过一个使用的是基于JVM实现的synchronized锁的等待/通知机制,而这种使用的是基于AQS的显式Lock锁的等待/通知机制,所以只需要改动上面的Storage.java类,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Storage {

// 仓库最大存储量
private final int MAX_SIZE = 10;
// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<Object>();
// 锁
private final Lock lock = new ReentrantLock();
// 仓库满的条件变量
private final Condition full = lock.newCondition();
// 仓库空的条件变量
private final Condition empty = lock.newCondition();

public void produce()
{
// 获得锁
lock.lock();
while (list.size() + 1 > MAX_SIZE) {
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】仓库已满");
try {
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + list.size());

// 唤醒其他所有线程、释放锁
full.signalAll();
empty.signalAll();
lock.unlock();
}

public void consume()
{
// 获得锁
lock.lock();
while (list.size() == 0) {
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】仓库为空");
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.remove();
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + list.size());

// 唤醒其他所有线程、释放锁
full.signalAll();
empty.signalAll();
lock.unlock();
}
}

其它代码均与上面一样,运行结果也类似,所以就不再重复

使用BlockingQueue阻塞队列方法

BlockingQueueJDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await() / signal()方法。它可以在生成对象时指定容量大小,用于阻塞操作的是put()take()方法。

put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。

代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.LinkedBlockingQueue;

public class Storage {

// 仓库存储的载体
private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<>(10);

public void produce() {
try{
//容量最大时自动阻塞
list.put(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + list.size());
} catch (InterruptedException e){
e.printStackTrace();
}
}

public void consume() {
try{
//容量为0时自动阻塞
list.take();
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费了一个产品,现库存" + list.size());
} catch (InterruptedException e){
e.printStackTrace();
}
}
}

上述运行代码可能会出现put()take()System.out.println()输出不匹配的情况,是由于它们之间没有同步造成的。BlockingQueue可以放心使用,这可不是它的问题,只是在它和别的对象之间的同步有问题。

使用信号量Semaphore实现

Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。计数为0Semaphore是可以release的,然后就可以acquire(即一开始使线程阻塞从而完成其他执行)。

代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.util.LinkedList;
import java.util.concurrent.Semaphore;

public class Storage {

// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<Object>();
// 仓库的最大容量
final Semaphore notFull = new Semaphore(10);
// 将线程挂起,等待其他来触发
final Semaphore notEmpty = new Semaphore(0);
// 互斥锁
final Semaphore mutex = new Semaphore(1);

public void produce()
{
try {
notFull.acquire();
mutex.acquire();
list.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + list.size());
}
catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
notEmpty.release();
}
}

public void consume()
{
try {
notEmpty.acquire();
mutex.acquire();
list.remove();
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + list.size());
} catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
notFull.release();
}
}
}

使用管道实现

管道一种特殊的流,用于不同线程间直接传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读数据。

inputStream.connect(outputStream)outputStream.connect(inputStream)作用是使两个Stream之间产生通信链接,这样才可以将数据进行输出与输入。

这种方式只适用于两个线程之间通信,不适合多个线程之间通信。

PipedInputStream / PipedOutputStream (操作字节流)

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.io.IOException;
import java.io.PipedOutputStream;

public class Producer implements Runnable {

private PipedOutputStream pipedOutputStream;

public Producer() {
pipedOutputStream = new PipedOutputStream();
}

public PipedOutputStream getPipedOutputStream() {
return pipedOutputStream;
}

@Override
public void run() {
try {
for (int i = 1; i <= 5; i++) {
pipedOutputStream.write(("This is a test, Id=" + i + "!\n").getBytes());
}
pipedOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.io.IOException;
import java.io.PipedInputStream;

public class Consumer implements Runnable {
private PipedInputStream pipedInputStream;

public Consumer() {
pipedInputStream = new PipedInputStream();
}

public PipedInputStream getPipedInputStream() {
return pipedInputStream;
}

@Override
public void run() {
int len = -1;
byte[] buffer = new byte[1024];
try {
while ((len = pipedInputStream.read(buffer)) != -1) {
System.out.println(new String(buffer, 0, len));
}
pipedInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

测试函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import java.io.IOException;

public class Main {

public static void main(String[] args) {
Producer p = new Producer();
Consumer c = new Consumer();
Thread t1 = new Thread(p);
Thread t2 = new Thread(c);
try {
p.getPipedOutputStream().connect(c.getPipedInputStream());
t2.start();
t1.start();
} catch (IOException e) {
e.printStackTrace();
}
}
}

PipedReader / PipedWriter (操作字符流)

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.io.IOException;
import java.io.PipedWriter;

public class Producer implements Runnable {

private PipedWriter pipedWriter;

public Producer() {
pipedWriter = new PipedWriter();
}

public PipedWriter getPipedWriter() {
return pipedWriter;
}

@Override
public void run() {
try {
for (int i = 1; i <= 5; i++) {
pipedWriter.write("This is a test, Id=" + i + "!\n");
}
pipedWriter.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.io.IOException;
import java.io.PipedReader;

public class Consumer implements Runnable {
private PipedReader pipedReader;

public Consumer() {
pipedReader = new PipedReader();
}

public PipedReader getPipedReader() {
return pipedReader;
}

@Override
public void run() {
int len = -1;
char[] buffer = new char[1024];
try {
while ((len = pipedReader.read(buffer)) != -1) {
System.out.println(new String(buffer, 0, len));
}
pipedReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

测试函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import java.io.IOException;

public class Main {

public static void main(String[] args) {
Producer p = new Producer();
Consumer c = new Consumer();
Thread t1 = new Thread(p);
Thread t2 = new Thread(c);
try {
p.getPipedWriter().connect(c.getPipedReader());
t2.start();
t1.start();
} catch (IOException e) {
e.printStackTrace();
}
}
}

参考文章

  1. 《现代操作系统 第4版》
  2. 《Java多线程编程核心技术》高洪岩
  3. 生产者/消费者问题的多种Java实现方式
  4. Producer–consumer problem
  5. Semaphore实现的生产者消费者程序
如果这篇文章对您很有帮助,不妨
-------------    本文结束  感谢您的阅读    -------------
0%