JUC 并发编程
约 5168 字大约 17 分钟
2025-04-10
一、JUC包结构
打开代码 java.util.concurrent 包,总结一下可以分为以下几类:
1. atomic包
是 JDK 提供的一组原子操作类。
包含有 AtomicBoolean、AtomicInteger、AtomicIntegerArray 等原子变量类,他们的实现原理大多是持有它们各自的对应的类型变量 value,而且被 volatile 关键字修饰了。这样来保证每次一个线程要使用它都会拿到最新的值。
2.locks
是 JDK 提供的锁机制,相比 synchronized 关键字来进行同步锁,功能更加强大,它为锁提供了一个框架,该框架允许更灵活地使用锁包含的实现类有:
ReentrantLock
它是独占锁,是指只能被独自占领,即同一个时间点只能被一个线程锁获取到的锁。
ReentrantReadWriteLock
它包括子类 ReadLock 和 WriteLock 。ReadLock 是共享锁,而WriteLock是独占锁。
LockSupport
它具备阻塞线程和解除阻塞线程的功能,并且不会引发死锁。
3.collections
主要是提供线程安全的集合, 比如:
- ArrayList 对应的高并发类是 CopyOnWriteArrayList
- HashSet 对应的高并发类是 CopyOnWriteArraySet
- HashMap 对应的高并发类是 ConcurrentHashMap 等等
4.tools
又叫信号量三组工具类,包含有
CountDownLatch(闭锁)
是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
CyclicBarrier(栅栏)
之所以叫barrier,是因为是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 ,并且在释放等待线程后可以重用。
Semaphore(信号量)
是一个计数信号量,它的本质是一个“共享锁“。信号量维护了一个信号量许可集。线程可以通过调用 acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可。
5.executor
是 Java 里面线程池的顶级接口,但它只是一个执行线程的工具,真正的线程池接口是 ExecutorService。Excecutor框架主要包含3部分的内容:
任务相关的:包含被执行的任务要实现的接口:Runnable 接口或 Callable 接口
任务的执行相关的:包含任务执行机制的核心接口 Executor,以及继承自 Executor的ExecutorService 接口。Executor 框架中有两个关键的类实现了 ExecutorService 接口(ThreadPoolExecutor 和 ScheduleThreadPoolExecutor)
异步计算结果相关的:包含接口 Future 和实现 Future 接口的 FutureTask 类
二、Atomic
Atomic包里面分为4种类型,分别是:原子更新基本类型,原子更新数组,原子更新引用,原子更新属性。
Atomic包里的类基本都是使用Unsafe实现的包装类,从而达到了原子性的操作。然后通过将内部的value变量用volatile关键字修饰,从而达到了可见性、 防止重排序 、原子性。
为什么需要原子类?
在很多时候,我们需要的仅仅是一个简单的、高效的、线程安全的递增或者递减方案。对于是需要简单的递增或者递减的需求场景,使用synchronized关键字和lock固然可以实现,但代码写的会略显冗余,且性能会有影响,此时用原子类更加方便。
以原子更新基本类型中的AtomicInteger类为例,介绍通用的API接口和使用方法。首先是几个常用的API:
// 以原子方式将给定值与当前值相加,可用于线程中的计数使用,(返回更新的值)。
int addAndGet(int delta)
// 以原子方式将给定值与当前值相加,可用于线程中的计数使用,(返回以前的值)
int getAndAdd(int delta)
// 以原子方式将当前值加 1(返回更新的值)
int incrementAndGet()
// 以原子方式将当前值加 1(返回以前的值)
int getAndIncrement()
// 以原子方式设置为给定值(返回旧值)
int getAndSet(int newValue)
// 以原子方式将当前值减 1(返回更新的值)
int decrementAndGet()
// 以原子方式将当前值减 1(返回以前的值)
int getAndDecrement()
// 获取当前值
get()
不使用原子类来定义一个临界变量val,起10个异步线程,每个线程都是对这个临界变量进行1000次自增操作,如下:
public class AtomicWrongDemo {
private int val = 0;
private void increase(){
for (int i = 0; i < 1000; ++i){
++this.val;
}
}
private int getVal(){
return this.val;
}
public static void main(String[] args) {
// 初始化实例
AtomicWrongDemo atomicWrongDemo = new AtomicWrongDemo();
for (int i = 0; i < 10; ++i){
new Thread(atomicWrongDemo::increase).start();
}
// 让主线程休眠5秒,保证前面起的10个异步线程都执行完毕
try {
Thread.sleep(3000);
} catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(atomicWrongDemo.getVal());
}
}
运行结果有时为我们期望的10000,有时候比10000少,出现比10000少的结果是因为自增操作++i不是原子操作,出现了竞争,需要对临界变量做同步处理。
使用synchronized关键字和lock固然可以实现,但这里只是对临界变量val++时做同步处理,有种高射炮打蚊子的感觉,且加锁后势必会对性能有所影响,这种场景正是我们使用Atomic类的场景,如下:
public class AtomicDemo {
private AtomicInteger val = new AtomicInteger();
public static void main(String[] args) {
// 初始化实例
AtomicDemo atomicDemo = new AtomicDemo();
for (int i = 0; i < 10; ++i){
new Thread(atomicDemo::increase).start();
}
// 让主线程休眠5秒,保证前面起的10个异步线程都执行完毕
try {
Thread.sleep(3000);
} catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(atomicDemo.getVal().toString());
}
private void increase(){
for (int i = 0; i < 1000; ++i){
this.val.incrementAndGet();
}
}
private AtomicInteger getVal(){
return this.val;
}
}
三、Locks
1. Locks接口
Lock接口可以视为synchronized的增强版,提供了更灵活的功能。该接口提供了限时锁等待、锁中断、锁尝试等功能。
虽然synchronized方法和语句的范围机制使得使用监视器锁更容易编程,并且有助于避免涉及锁的许多常见编程错误,但是有时需要以更灵活的方式处理锁。 例如,用于遍历并发访问的数据结构的一些算法需要使用“手动”或“链锁定”:获取节点A的锁定,然后获取节点B,然后释放A并获取C,然后释放B并获得D等。 所述的实施方式中Lock接口通过允许获得并在不同的范围释放的锁,并允许获得并以任何顺序释放多个锁使得能够使用这样的技术。
返回 | 方法 | 说明 |
---|---|---|
void | lock() | 获得锁 |
void | lockInterruptibly() | 获取锁定,除非当前线程是 interrupted |
Condition | newCondition() | 返回一个新Condition绑定到该实例Lock实例 |
boolean | tryLock() | 只有在调用时才可以获得锁 |
boolean | tryLock(long time, TimeUnit unit) | 如果在给定的等待时间内是空闲的,并且当前的线程尚未得到 interrupted,则获取该锁 |
void | unlock() | 释放锁 |
Lock()
获得锁。如果锁不可用,则当前线程将被禁用以进行线程调度,并处于休眠状态,直到获取锁。 Lock实现可能能够检测锁的错误使用,例如将导致死锁的调用,并且可能在这种情况下抛出(未检查)异常。 情况和异常类型必须由Lock实现记录。
void lockInterruptibly() throws InterruptedException
它提供了一种更加“灵活”的锁定机制,当一个线程调用这个方法时,如果锁已经被其他线程持有,那么这个线程同样会进入等待状态,但是,在等待的过程中,如果这个线程收到了中断请求(即有其他线程调用了它的interrupt()方法),那么它就会立即响应中断,不再等待锁的释放,而是抛出一个InterruptedException异常。
而 lock()方法是一种基本的锁定机制,当一个线程调用这个方法时,如果锁已经被其他线程持有,那么调用线程就会进入等待状态,直到锁被释放,在等待的过程中,这个线程会无视中断请求,也就是说,即使有其他线程调用了这个等待线程的interrupt()方法,它也不会有任何响应,依旧会“执着”地等待锁的释放。
tryLock()
只有在调用时才可以获得锁。如果可用,则获取锁定,并立即返回值为true 。 如果锁不可用,则此方法将立即返回值为false 。 使用示例:
Lock lock = ...; if (lock.tryLock()) { try { // manipulate protected state } finally { lock.unlock(); } } else { // perform alternative actions }
此用法可确保锁定已被取消,如果未获取锁定,则不会尝试解锁。
tryLock(long time,TimeUnit unit) throws InterruptedException
tryLock() 会立即获取锁,如果在调用方法时刻,锁没有被其他线程占用,将获取到锁并返回true,否则false; tryLock(long timeout, TimeUnit unit) 将在指定的时间内(理解为容忍时间)尝试获取锁,如果在期间获取到锁,立即返回true,超时则返回false
unlock()
释放锁。一个Lock实现通常会限制哪个线程可以释放锁(通常只有锁的持有者可以释放它),并且如果限制被违反则可能会引发(未检查)异常
Condition newCondition()
关键字synchronized与wait()/notify()这两个方法一起使用可以实现等待/通知模式。Lock锁的newContition()方法返回Condition对象,Condition类也可以实现等待/通知模式。await()会使当前线程等待,同时会释放锁,当其他线程调用signal()时,线程会重新获得锁并继续执行
调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用,因为内部会做释放锁的操作,如果不是在lock和unlock之间使用,会报错java.lang.IllegalMonitorStateException
2. Condition接口
Condition可以看做是Obejct类的wait()、notify()、notifyAll()方法的替代品,与Lock配合使用。当线程执行condition对象的await方法时,当前线程会立即释放锁,并进入对象的等待区,等待其它线程唤醒或中断。
ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列,该队列是Condition对象实现等待/通知功能的关键。
等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。(一个Condition包含一个等待队列,多个condition就有多个等待队列)
await()
线程释放锁进入等待直到被通知或者中断。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 当前线程加入等待队列 Node node = addConditionWaiter(); // 释放同步状态,也就是释放锁 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将 当前线程 构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。
当等待队列中的节点被唤醒,则唤醒节点的线程开始 尝试获取同步状态 。如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException
提示
等待 Condition 时,为了防止发生“虚假唤醒”, Condition 一般都是在一个循环中被等待,并测试正被等待的状态声明
signal()
唤醒一个等待线程。
用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程
3. ReadWriteLock接口
该接口内部只定义了两个方法。
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading.
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing.
*/
Lock writeLock();
}
从表面来看,ReadLock和WriteLock 是两把锁,实际上它只是同一把锁的两个视图而已。
什么叫两个视图呢?可以理解为是一把锁,线程分成两类:读线程和写线程。读线程和读线程之间不互斥(可以同时拿到这把锁),读线程和写线程互斥,写线程和写线程也互斥。
当使用 ReadWriteLock 的时候,并不是直接使用,而是获得其内部的读锁和写锁,然后分别调用 lock/unlock。
class RWDictionary {
private final Map<String, Data> m = new TreeMap<String, Data>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public Data get(String key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
public String[] allKeys() {
r.lock();
try { return m.keySet().toArray(); }
finally { r.unlock(); }
}
public Data put(String key, Data value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
public void clear() {
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}
4. 抽象类 AQS/AQLS/AOS
AQS是 AbstractQueuedSynchronizer 的简称,即抽象队列同步器,从字面上可以这样理解:
- 抽象:抽象类,只实现一些主要逻辑,有些方法由子类实现;
- 队列:使用先进先出(FIFO)的队列存储数据;
- 同步:实现了同步的功能。
AQS 也是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出应用广泛的同步器,比如 ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask 等等,都是基于 AQS 的。
- 使用一个 int 成员变量来表示同步状态。
- 通过内置的FIFO队列来完成获取资源线程的排队工作。
- 使用CAS对该同步状态进行原子操作实现对其值的修改。
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS是 用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
提示
CLH(Craig,Landin and Hagersten)是三个人,共同发明了一个可扩展、高性能、公平且基于自旋锁的链表;链表中的每个线程只在本地自旋前一个节点的状态,即该节点(线程)不断自旋获取前一个节点的状态;每个节点都有一个状态(要么自旋,要么释放锁)
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import sun.misc.Unsafe;
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
/**
* 同步状态
*/
private volatile int state;
/**
* 返回同步状态的当前值。
*/
protected final int getState() {
return state;
}
/**
* 设置同步状态的值。
*/
protected final void setState(int newState) {
state = newState;
}
/**
* 如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的更新值。
*
* @param expect 期待值
* @param update 新值
* @return true 更新成功。False 表示实际值不等于预期值
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
...
}
自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮我们实现好了。
✒️ AQS定义两种资源共享方式
Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:
公平锁:按照线程在队列中的排队顺序,先到者先拿到锁 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。
ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。
✒️ AQS底层使用了模板方法模式
同步器的设计是基于模板方法模式的(模板方法模式很经典的一个应用),如果需要自定义同步器一般的方式是这样:
使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放) 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
需要子类去实现的,它们主要有:
//该线程是否正在独占资源。只有用到condition才需要去实现它。
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
//独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
//共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回 true,否则返回 false。
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
这里不使用抽象方法的目的是:避免强迫子类中把所有的抽象方法都实现一遍,减少无用功,这样子类只需要实现自己关心的抽象方法即可。
以 ReentrantLock 为例,state 初始化为0,表示未锁定状态。A线程 lock() 时,会调用 tryAcquire() 独占该锁并将 state +1。此后,其他线程再 tryAcquire() 时就会失败,直到A线程 unlock() 到 state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的( state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证 state 是能回到零态的。
AQS 里面的“资源”是用一个int类型的数据来表示的,有时候业务需求的资源数超出了int的范围,所以在 JDK 1.6 中,多了一个AQLS(AbstractQueuedLongSynchronizer)。它的代码跟 AQS 几乎一样,只是把资源的类型变成了long类型。
AQS 和 AQLS 都继承了一个类叫AOS(AbstractOwnableSynchronizer)。这个类也是在 JDK 1.6 中出现的。
4. LockSupport 阻塞原语
LockSupport用来创建锁和其他同步类的基本线程阻塞原语。简而言之,当调用LockSupport.park时,表示当前线程将会等待,直至获得许可,当调用LockSupport.unpark时,必须把等待获得许可的线程作为参数进行传递,好让此线程继续运行。
LockSupport的核心函数都是基于Unsafe类中定义的park和unpark函数,下面给出两个函数的定义:
park函数,阻塞线程,并且该线程在下列情况发生之前都会被阻塞:
① 调用unpark函数,释放该线程的许可。
② 该线程被中断。
③ 设置的时间到了。并且,当time为绝对时间时,isAbsolute为true,否则,isAbsolute为false。当time为0时,表示无限等待,直到unpark发生。
unpark函数,释放线程的许可,即激活调用park后阻塞的线程。
这个函数不是安全的,调用这个函数时要确保线程依旧存活。