J.U.C并发包知识点梳理

J.U.C并发包知识点梳理

请注意,本文编写于  1,026  天前,最后修改于  1,026  天前,其中某些信息可能已经过时。

Java.util.concurrent:提供了并发编程的解决方法

CAS是 java.util.concurrent.atomic 包的基础,AQS是 java.util.concurrent.locks 包以及一些常用类比如 Semophore , ReentrantLock 等类的基础

J.U.C包的分类

  1. 线程执行器executor
  2. 锁locks
  3. 原子变量类atomic
  4. 并发工具类tools
  5. 并发集合collections

上面的图可以点开放大来看,为了看的更仔细,下面是J.U.C模块拆分出来的具体分类图:

J.U.C的几大内容

其中 tools 包中提供了并发编程的工具类如 CountDownLatch、CyclicBarrier 等;locks 包中提供了如 ReentrantLock 为代表的显式锁等;Collections 包中提供了并发集合类,常用的有 ConcurrentHashMap、CopyOnWriteArrayList、CopyOnWriteArraySet 以及阻塞队列等;executor 包中提供了一个线程调度的框架;最后的 atomic 包中就是原子操作类。

下面一一介绍。

Executor框架

Locks锁

lock 包中的锁都是基于 AQS 构成,因此理解这一部分就需要重点理解 AQS的原理Condition 接口实现类和 Lock 实现类组合可以实现等待/通知模式,读写锁则是有自己的实现方式,LockSupport 是一个工具类,与 Condition 接口配合使用。

原子类atomic

原子类的构建是以 CAS 实现为基础,原子类实现的原理必须要了解。

并发工具类

CountDownLatch

闭锁,让主线程等待一组事件发生后继续执行,事件指的是CountDownLatch里的countDown()方法。使用这个工具类可以将任务分为多个子任务执行。

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        new CountDownLatchDemo().go();
    }

    private void go() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(3);
        //依次创建3个线程,并启动
        new Thread(new Task(countDownLatch),"Thread1").start();
        Thread.sleep(1000);
        new Thread(new Task(countDownLatch),"Thread2").start();
        Thread.sleep(1000);
        new Thread(new Task(countDownLatch),"Thread3").start();
        Thread.sleep(1000);
        //调用await方法阻塞当前线程,直到其它所有线程执行完
        countDownLatch.await();
        System.out.println("所有线程已到达,主线程开始执行" + System.currentTimeMillis());
    }

    class Task implements Runnable{
        private CountDownLatch countDownLatch;
        public Task(CountDownLatch countDownLatch){
            this.countDownLatch = countDownLatch;
        }
        @Override
        public void run() {
            System.out.println("线程" + Thread.currentThread().getName() + "已经到达" + System.currentTimeMillis());
            countDownLatch.countDown();
        }
    }
}

可以运行一下代码,控制台输出结果如下:

线程Thread1已经到达1554697004783
线程Thread2已经到达1554697005755
线程Thread3已经到达1554697006755
所有线程已到达,主线程开始执行1554697007755

CountDownLatch 的实现原理并不复杂,也是基于 AQS 实现的同步组件,由于允许多个线程同时运行,所以内部维护的是共享锁。构造器中传入 CountDownLatch 的数字 3 实质上就是 AQS 的同步状态 state 设置为 3,而 CountDownLatch 的关键方法 countDown() 实质上调用的是 AQS 的 releaseShared() 方法,即调用一次该方法这个共享锁的同步状态减 1,直到同步状态为 0 这个锁释放。

CyclicBarrier

字面意思为可循环使用的屏障,主要作用是让一组线程到达一个屏障(又叫做同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。可用于多线程计算数据等场景。

  1. 等待其它线程,且会阻塞自己当前线程,所有线程必须同时到达栅栏位置后,才能继续执行。
  2. 所有线程达到栅栏处,可以触发执行另外一个预先设置的线程
public class CyclicBarrierDemo {
    public static void main(String[] args) throws InterruptedException {
        new CyclicBarrierDemo().go();
    }
    private void go() throws InterruptedException {
        //初始化栅栏的参与者数是3
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        //依次创建三个线程并启动
        new Thread(new Task(cyclicBarrier),"Thread1").start();
        Thread.sleep(1000);
        new Thread(new Task(cyclicBarrier),"Thread2").start();
        Thread.sleep(1000);
        new Thread(new Task(cyclicBarrier),"Thread3").start();
        Thread.sleep(1000);
    }

    class Task implements Runnable{
        private CyclicBarrier cyclicBarrier;
        public Task(CyclicBarrier cyclicBarrier){
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            System.out.println("线程" + Thread.currentThread().getName() + "已经到达" + System.currentTimeMillis());
            try {
                //每个线程调用await方法告诉CyclicBarrier我已经到达了屏障
                //然后当前线程被阻塞
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("线程" + Thread.currentThread().getName() + "开始处理" + System.currentTimeMillis());
        }
    }
}

从上可以看出来,好像表面上 CyclicBarrier 和 CountDownLatch 没有什么区别啊?那么区别在哪呢?

CountDownLatch 的计数器只能用一次,而 CyclicBarrier 的计数器可以使用 reset() 方法重置,所以 CyclicBarrier 可以适用于更复杂的业务场景。例如如果计算发生错误,可以重置计数器,并让线程重新执行一次。

CyclicBarrier 还提供其它有用的方法,比如 getNumberWaiting() 方法可以获得 CyclicBarrier 阻塞的线程数量。isBroken() 方法用来了解阻塞的线程是否被中断等等。

我们可以看一下源码,CyclicBarrier 是基于 ReentrantLock 和 Condition 接口实现的,而两者都是基于 AQS 实现的,所以说到底最终还是基于 AQS 实现的工具类。其中有一个内部类 Generation,每一次使用的CycBarrier可以当成Generation的实例,其源代码如下:

private static class Generation {
    boolean broken = false;
}

其属性如下:

public class CyclicBarrier {
    /** The lock for guarding barrier entry */
    // 可重入锁
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    // 条件队列
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    // 参与的线程数量
    private final int parties;
    /* The command to run when tripped */
    // 由最后一个进入 barrier 的线程执行的操作
    private final Runnable barrierCommand;
    /** The current generation */
    // 当前代
    private Generation generation = new Generation();
    // 正在等待进入屏障的线程数量
    private int count;
}

其构造方法源码如下:

public CyclicBarrier(int parties, Runnable barrierAction) {
        // 参与的线程数量小于等于0,抛出异常
        if (parties <= 0) throw new IllegalArgumentException();
        // 设置parties
        this.parties = parties;
        // 设置count
        this.count = parties;
        // 设置barrierCommand
        this.barrierCommand = barrierAction;
    }

//默认
public CyclicBarrier(int parties) {
    this(parties, null);
}

核心函数 dowait 源码如下:

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
TimeoutException {
    // 保存当前锁
    final ReentrantLock lock = this.lock;
    // 锁定
    lock.lock();
    try {
        // 保存当前代
        final Generation g = generation;

        if (g.broken) // 屏障被破坏,抛出异常
            throw new BrokenBarrierException();

        if (Thread.interrupted()) { // 线程被中断
            // 损坏当前屏障,并且唤醒所有的线程,只有拥有锁的时候才会调用
            breakBarrier();
            // 抛出异常
            throw new InterruptedException();
        }

        // 减少正在等待进入屏障的线程数量
        int index = --count;
        if (index == 0) {  // 正在等待进入屏障的线程数量为0,所有线程都已经进入
            // 运行的动作标识
            boolean ranAction = false;
            try {
                // 保存运行动作
                final Runnable command = barrierCommand;
                if (command != null) // 动作不为空
                    // 运行
                    command.run();
                // 设置ranAction状态
                ranAction = true;
                // 进入下一代
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction) // 没有运行的动作
                    // 损坏当前屏障
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        // 无限循环
        for (;;) {
            try {
                if (!timed) // 没有设置等待时间
                    // 等待
                    trip.await(); 
                else if (nanos > 0L) // 设置了等待时间,并且等待时间大于0
                    // 等待指定时长
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) { 
                if (g == generation && ! g.broken) { // 等于当前代并且屏障没有被损坏
                    // 损坏当前屏障
                    breakBarrier();
                    // 抛出异常
                    throw ie;
                } else { // 不等于当前带后者是屏障被损坏
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    // 中断当前线程
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken) // 屏障被损坏,抛出异常
                throw new BrokenBarrierException();

            if (g != generation) // 不等于当前代
                // 返回索引
                return index;

            if (timed && nanos <= 0L) { // 设置了等待时间,并且等待时间小于0
                // 损坏屏障
                breakBarrier();
                // 抛出异常
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放锁
        lock.unlock();
    }
}

此函数为 CyclicBarrier 类的核心函数,CyclicBarrier 类对外提供的 await 函数在底层都是调用该了 doawait 函数,其逻辑判断过程大致如下:

Semaphore

信号量,控制某个资源可被同时访问的线程个数。可以用做流量控制,特别是公有资源有限的应用场景,比如数据库连接等。

​ 该工具类还提供一些其它方法,比如:

(1)intavailablePermits() :返回此信号量中当前可用的许可证数。

(2)intgetQueueLength() :返回正在等待获取许可证的线程数。

(3)booleanhasQueuedThreads() :是否有线程正在等待获取许可证。

(4)void reducePermits(int reduction) :减少reduction个许可证,是个protected方法。

(5)Collection getQueuedThreads() :返回所有等待获取许可证的线程集合,是个protected方法。

public class SemaphoreDemo {
    public static void main(String[] args) {
        //线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //只能5个线程同时访问
        final Semaphore semp = new Semaphore(5);
        //模拟20个客户端访问
        for(int index = 0; index < 20;index++){
            final int NO = index;
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    //获取许可
                    try {
                        semp.acquire();
                        System.out.println("Accessing:" + NO);
                        Thread.sleep((long)(Math.random() * 10000));
                        //访问完后,释放
                        semp.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            executorService.execute(run);
        }
        //退出线程池
        executorService.shutdown();
    }
}

Exchanger

交换器,两个线程到达同步点后,相互交换数据。可以用于遗传算法,也可以用于校对工作,比如银行电子账户流水,为了避免错误采用AB岗两人进行录入系统,录入到Excel后可以使用这种方式进行校对来判断录入是否一致。

public class ExchangerTest extends Thread {
    private Exchanger<String> exchanger;
    private String string;
    private String threadName;
 
    public ExchangerTest(Exchanger<String> exchanger, String string,
            String threadName) {
        this.exchanger = exchanger;
        this.string = string;
        this.threadName = threadName;
    }
 
    public void run() {
        try {
            System.out.println(threadName + ": " + exchanger.exchange(string));
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
 
public class Test {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        ExchangerTest test1 = new ExchangerTest(exchanger, "string1",
                "thread-1");
        ExchangerTest test2 = new ExchangerTest(exchanger, "string2",
                "thread-2");
 
        test1.start();
        test2.start();
    }
}

并发集合Collections

**BlockQueue**:提供可阻塞的入队和出队操作,`BlockingQueue`不光实现了一个完整队列所具有的基本功能,同时在多线程环境下,他还自动管理了多线间的自动等待于唤醒功能,从而使得程序员可以忽略这些细节,关注更高级的功能。

放入数据

  1. offer(anObject) :表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回 true ,否则返回 false 。(本方法不阻塞当前执行方法的线程);      
  2. offer(E o, long timeout, TimeUnit unit) :可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。`
  3. put(anObject) :把 anObject 加到 BlockingQueue 里,如果 BlockQueue 没有空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续。

获取数据

  1. poll(time):取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 time 参数规定的时间,取不到时返回 null;
  2. poll(long timeout, TimeUnit unit):从 BlockingQueue 取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
  3. take():取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 BlockingQueue 有新的数据被加入;
  4. drainTo():一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

ArrayBlockingQueue

基于数组的阻塞队列实现,在 ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外, ArrayBlockingQueue 内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。

ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于 LinkedBlockingQueue;按照实现原理来分析, ArrayBlockingQueue 完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea 之所以没这样去做,也许是因为 ArrayBlockingQueue 的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue 和 LinkedBlockingQueue 间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的 Node 对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于 GC 的影响还是存在一定的区别。而在创建 ArrayBlockingQueue 时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

LinkedBlockingQueue

基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue 可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
作为开发者,我们需要注意的是,如果构造一个 LinkedBlockingQueue 对象,而没有指定其容量大小,LinkedBlockingQueue 会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。
ArrayBlockingQueue 和 LinkedBlockingQueue 是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
 
/**
 * 生产者线程
 */
public class Producer implements Runnable {
    
    private volatile boolean  isRunning = true;//是否在运行标志
    private BlockingQueue queue;//阻塞队列
    private static AtomicInteger count = new AtomicInteger();//自动更新的值
    private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
 
    //构造函数
    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }
 
    public void run() {
        String data = null;
        Random r = new Random();
 
        System.out.println("启动生产者线程!");
        try {
            while (isRunning) {
                System.out.println("正在生产数据...");
                Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));//取0~DEFAULT_RANGE_FOR_SLEEP值的一个随机数
 
                data = "data:" + count.incrementAndGet();//以原子方式将count当前值加1
                System.out.println("将数据:" + data + "放入队列...");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {//设定的等待时间为2s,如果超过2s还没加进去返回true
                    System.out.println("放入数据失败:" + data);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("退出生产者线程!");
        }
    }
 
    public void stop() {
        isRunning = false;
    }
}
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
 
/**
 * 消费者线程
 */
public class Consumer implements Runnable {
    
    private BlockingQueue<String> queue;
    private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
 
    //构造函数
    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }
 
    public void run() {
        System.out.println("启动消费者线程!");
        Random r = new Random();
        boolean isRunning = true;
        try {
            while (isRunning) {
                System.out.println("正从队列获取数据...");
                String data = queue.poll(2, TimeUnit.SECONDS);//有数据时直接从队列的队首取走,无数据时阻塞,在2s内有数据,取走,超过2s还没数据,返回失败
                if (null != data) {
                    System.out.println("拿到数据:" + data);
                    System.out.println("正在消费数据:" + data);
                    Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
                } else {
                    // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
                    isRunning = false;
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("退出消费者线程!");
        }
    }    
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; 

public class BlockingQueueTest {
 
    public static void main(String[] args) throws InterruptedException {
        // 声明一个容量为10的缓存队列
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
 
        //new了三个生产者和一个消费者
        Producer producer1 = new Producer(queue);
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);
        Consumer consumer = new Consumer(queue);
 
        // 借助Executors
        ExecutorService service = Executors.newCachedThreadPool();
        // 启动线程
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer);
 
        // 执行10s
        Thread.sleep(10 * 1000);
        producer1.stop();
        producer2.stop();
        producer3.stop();
 
        Thread.sleep(2000);
        // 退出Executor
        service.shutdown();
    }
}

DelayQueue

DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
使用场景:
DelayQueue 使用场景较少,但都相当巧妙,常见的例子比如使用一个 DelayQueue 来管理一个超时未响应的连接队列。

PriorityBlockingQueue

基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来决定),但需要注意的是 PriorityBlockingQueue 并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现 PriorityBlockingQueue 时,内部控制线程同步的锁采用的是公平锁。

SynchronousQueue

一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的 BlockingQueue 来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。

声明一个 SynchronousQueue 有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:

如果采用公平模式:SynchronousQueue 会采用公平锁,并配合一个 FIFO 队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;

但如果是非公平模式(SynchronousQueue 默认):SynchronousQueue 采用非公平锁,同时配合一个 LIFO 队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

本文由 Sanarous 创作,如果您觉得本文不错,请随意赞赏
采用 知识共享署名4.0 国际许可协议进行许可,转载前请务必署名
本文链接:https://bestzuo.cn/posts/juc.html
最后更新于:2019-04-08 11:12:03

切换主题 | SCHEME TOOL