🌟 语言 | Java 并发:乐观锁与悲观锁



悲观锁

悲观锁总是假设最坏的情况,认为共享资源每次被访问的时候就会出现问题(比如恭喜数据被修改),所以每次在获取资源操作的时候都会上锁,这样其他线程想拿到这个资源就会阻塞直到锁被上一个持有者释放。也就是说,共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程。

像 Java 中 synchronized 和 ReentranLock 等独占锁就是悲观锁思想的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void performSynchronisedTask() {
synchronized (this) {
// 需要同步的操作
}
}

private Lock lock = new ReentrantLock();
lock.lock();
try {
// 需要同步的操作
} finally {
lock.unlock();
}

高并发的场景下,激烈的锁竞争会造成线程阻塞,大量阻塞线程会导致系统的上下文切换,增加系统的性能开销。并且,悲观锁还可能会存在死锁问题,影响代码的正常运行。

乐观锁

乐观锁总是假设最好的情况,认为共享资源每次被访问的时候不会出现问题,线程可以不停地执行,无需加锁也无需等待,只是在提交修改的时候去验证对应的资源(也就是数据)是否被其它线程修改了(具体方法可以使用版本号机制或 CAS 算法)。

高并发的场景下,乐观锁相比悲观锁来说,不存在锁竞争造成线程阻塞,也不会有死锁的问题,在性能上往往会更胜一筹。但是,如果冲突频繁发生(写占比非常多的情况),会频繁失败和重试,这样同样会非常影响性能,导致 CPU 飙升。

悲观锁和乐观锁适用场景

  • 悲观锁通常多用于写比较多的情况(多写场景,竞争激烈),这样可以避免频繁失败和重试影响性能,悲观锁的开销是固定的。不过,如果乐观锁解决了频繁失败和重试这个问题的话(比如LongAdder),也是可以考虑使用乐观锁的,要视实际情况而定。

  • 乐观锁通常多用于写比较少的情况(多读场景,竞争较少),这样可以避免频繁加锁影响性能。不过,乐观锁主要针对的对象是单个共享变量(参考java.util.concurrent.atomic包下面的原子变量类)。

悲观锁实现

数据库层实现

1
select * from account where id = 1 for update;

FOR UPDATE 会在事务中锁定该行记录,直到事物提交或回滚。
其他事物在这期间无法修改或查询到这行。

1
2
3
4
5
-- 事务1
BEGIN;
SELECT balance FROM account WHERE id = 1 FOR UPDATE;
UPDATE account SET balance = balance - 100 WHERE id = 1;
COMMIT;

此时事务2再操作同条数据则会被阻塞

1
2
3
-- 事务2
BEGIN;
SELECT balance FROM account WHERE id = 1 FOR UPDATE; -- 会被阻塞

java层实现

在代码层可使用同步机制

1
2
3
4
synchronized (this) {
// 线程安全的操作
updateDatabase();
}

或使用 ReentrantLock:

1
2
3
4
5
6
lock.lock();
try {
updateDatabase();
} finally {
lock.unlock();
}

乐观锁实现

乐观锁一般会使用版本号机制或 CAS 算法实现,CAS 算法相对来说更多一些,这里需要格外注意。

版本号机制

一般是在数据表中加上一个数据版本号 version 字段,表示数据被修改的次数。当数据被修改时,version 值会加一。当线程 A 要更新数据值时,在读取数据的同时也会读取 version 值,在提交更新时,若刚才读取到的 version 值为当前数据库中的 version 值相等时才更新,否则重试更新操作,直到更新成功。

1
2
3
UPDATE account 
SET balance = 900, version = version + 1
WHERE id = 1 AND version = 1;

CAS 算法

CAS 的全称是 Compare And Swap(比较与交换) ,用于实现乐观锁,被广泛应用于各大框架中。CAS 的思想很简单,就是用一个预期值和要更新的变量值进行比较,两值相等才会进行更新。

CAS 是一个原子操作,底层依赖于一条 CPU 的原子指令。

原子操作 即最小不可拆分的操作,也就是说操作一旦开始,就不能被打断,直到操作完成。

CAS 涉及到三个操作数:

  • V:要更新的变量值(Var)
  • E:预期值(Expected)
  • N:拟写入的新值(New)

当且仅当 V 的值等于 E 时,CAS 通过原子方式用新值 N 来更新 V 的值。如果不等,说明已经有其它线程更新了 V,则当前线程放弃更新。

当多个线程同时使用 CAS 操作一个变量时,只有一个会胜出,并成功更新,其余均会失败,但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。

Java 中的 CAS 实现

Java 中,实现 CAS 操作的一个关键类是 Unsafe

Unsafe 类位于 sun.misc 包下,是一个提供低级别、不安全操作的类。由于其强大的功能和潜在的危险性,它通常用于 JVM 内部或一些需要极高性能和底层访问的库中,而不推荐普通开发者在应用程序中使用。

sun.misc 包下的 Unsafe 类提供了 compareAndSwapObject、compareAndSwapInt、compareAndSwapLong 方法来实现的对 Object、int、long 类型的 CAS 操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 以原子方式更新对象字段的值。
*
* @param o 要操作的对象
* @param offset 对象字段的内存偏移量
* @param expected 期望的旧值
* @param x 要设置的新值
* @return 如果值被成功更新,则返回 true;否则返回 false
*/
boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);

/**
* 以原子方式更新 int 类型的对象字段的值。
*/
boolean compareAndSwapInt(Object o, long offset, int expected, int x);

/**
* 以原子方式更新 long 类型的对象字段的值。
*/
boolean compareAndSwapLong(Object o, long offset, long expected, long x);

Unsafe 类中的 CAS 方法是 native 方法。native 关键字表明这些方法是用本地代码(通常是 C 或 C++)实现的,而不是用 Java 实现的。这些方法直接调用底层的硬件指令来实现原子操作。也就是说,Java 语言并没有直接用 Java 实现 CAS,而是通过 C++ 内联汇编的形式实现的(通过 JNI 调用)。因此,CAS 的具体实现与操作系统以及 CPU 密切相关。

AtomicInteger 是 Java 的原子类之一,主要用于对 int 类型的变量进行原子操作,它利用Unsafe 类提供的低级别原子操作方法实现无锁的线程安全性。

AtomicInteger 的核心源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 获取 Unsafe 实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
// 获取“value”字段在 AtomicInteger 类中的内存偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
// 确保“value”字段的可见性
private volatile int value;

// 如果当前值等于预期值,则原子地将值设置为 newValue
// 使用 Unsafe#compareAndSwapInt 方法进行CAS操作
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

// 原子地将当前值加 delta 并返回旧值
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}

// 原子地将当前值加 1 并返回加之前的值(旧值)
// 使用 Unsafe#getAndAddInt 方法进行CAS操作。
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}

// 原子地将当前值减 1 并返回减之前的值(旧值)
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}

Unsafe#getAndAddInt 源码:

1
2
3
4
5
6
7
8
9
10
// 原子地获取并增加整数值
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
// 以 volatile 方式获取对象 o 在内存偏移量 offset 处的整数值
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
// 返回旧值
return v;
}

getAndAddInt 使用了 do-while 循环:在 compareAndSwapInt 操作失败时,会不断重试直到成功。也就是说,getAndAddInt 方法会通过 compareAndSwapInt 方法来尝试更新 value 的值,如果更新失败(当前值在此期间被其他线程修改),它会重新获取当前值并再次尝试更新,直到操作成功。

CAS 算法的问题

ABA 问题

如果一个变量 V 初次读取的时候是 A 值,并且在准备赋值的时候查到它仍然是 A 值,那我们就能说明它的值没有被其他线程修改过了吗?很明显是不能的,因为在这段时间它的值可能被改为其他值,然后又改回 A,那 CAS 操作就会误认为它从来没有修改过的,这个问题就被称为 CAS 操作的 ABA 问题。

ABA 问题的解决思路就是在变量前追加版本号或时间戳。AtomicStampedReference 类就是用来解决 ABA 问题的,其中的 compareAndSet() 方法就是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

1
2
3
4
5
6
7
8
9
10
11
12
public boolean compareAndSet(V   expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}

循环时间长开销大

CAS 经常会用到自旋操作来进行重试,也就是不成功就一直循环执行直到成功。如果长时间不成功,会给 CPU 带来非常大的执行开销。

如果 JVM 能够支持处理器提供的 pause 指令,那么自旋操作的效率将有所提升。pause指令有两个重要作用:

  • 延迟流水线执行指令:pause 指令可以延迟指令的执行,从而减少 CPU 的资源消耗。具体的延迟时间取决于处理器的实现版本,在某些处理器上,延迟时间可能为零。
  • 避免内存顺序冲突:在退出循环时,pause 指令可以避免由于内存顺序冲突而导致的 CPU 流水线被清空,从而提高 CPU 的执行效率。

只能保证一个共享变量的原子操作

CAS 操作仅能对单个共享变量有效。当需要操作多个共享变量时,CAS 就显得无能为力。

从 JDK 1.5 开始,Java 提供了 AtomicReference 类,这使得我们能够保证引用对象之间的原子性。通过将多个变量封装在一个对象中,我们可以使用 AtomicReference 来执行 CAS 操作。

除了 AtomicReference 这种方式之外,还可以利用加锁来保证。