JAVA - 并发 - 无锁(乐观锁)
无锁(乐观锁)
介绍,无锁,即乐观锁,是非阻塞的

CAS
先看一段代码,采用加锁的方式保证并发的安全
package org.example.xiancheng.study11;
import java.util.ArrayList;
import java.util.List;
public class AccountTest{
public static void main(String[] args) {
Account account =new AccountUnsafe(10000);
Account.demo(account);
}
}
class AccountUnsafe implements Account{
private Integer balance;
public AccountUnsafe(Integer balance){
this.balance=balance;
}
@Override
public Integer getBalance() {
synchronized (this){
return this.balance;
}
}
@Override
public void withdraw(Integer amount) {
synchronized (this){
this.balance -= amount;
}
}
}
interface Account {
//获取余额
Integer getBalance();
//取款
void withdraw(Integer amout);
/*起1000个线程,分别取10元*/
static void demo(Account account) {
List<Thread> ts =new ArrayList<>();
for(int i =0;i<1000;i++){
ts.add(new Thread(() -> {
account.withdraw(10);
}));
}
long start = System.currentTimeMillis();
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.currentTimeMillis();
System.out.println(account.getBalance()+" cost:"+(end-start));
}
}
通过CAS方式来实现并发的安全
package org.example.xiancheng.study11;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class AccountTest{
public static void main(String[] args) {
Account account = new AccountCAS(10000);
Account.demo(account);
}
}
class AccountCAS implements Account{
private AtomicInteger balance;
public AccountCAS(int balance){
this.balance=new AtomicInteger(balance);
}
@Override
public Integer getBalance() {
return balance.get();
}
@Override
public void withdraw(Integer amount) {
while (true){
int prev = balance.get();
int next = prev - amount;
if (balance.compareAndSet(prev,next)){
break;
}
}
}
}
interface Account {
//获取余额
Integer getBalance();
//取款
void withdraw(Integer amout);
/*起1000个线程,分别取10元*/
static void demo(Account account) {
List<Thread> ts =new ArrayList<>();
for(int i =0;i<1000;i++){
ts.add(new Thread(() -> {
account.withdraw(10);
}));
}
long start = System.currentTimeMillis();
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.currentTimeMillis();
System.out.println(account.getBalance()+" cost:"+(end-start));
}
}
分析原理

线程1获取到余额100(第一个参数值),改为90,线程2已经最新的余额改为90,线程1调用cas方法,比较自己获取到的余额100与最新的余额90,不相等,方法返回false,线程1不修改,进入下一次循环,直到比较相等时返回true,则修改为第二个参数值。


慢动作分析,见视频,此处不赘述。
CAS借助了


CAS的特点

原子操作类
AtomicInteger
示例一:
public class Test34 {
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(0);
//先自增1再获得
System.out.println(i.incrementAndGet());
//先获得在自增1
System.out.println(i.getAndIncrement());
System.out.println(i.get());
}
}
输出:
1
1
2
示例二:
public class Test34 {
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(0);
//先自增5再获得
System.out.println(i.addAndGet(5));
//先获得在自增12
System.out.println(i.getAndAdd(12));
System.out.println(i.get());
}
}
输出:
5
5
17
示例三:
public class Test34 {
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(6);
//value是AtomicInteger里的值,即原子变量,读取到变量,然后相乘
i.updateAndGet(value -> value*7);
System.out.println(i.get());
}
}
输出:
42
手写模仿updateAndGet方法
第一步
public class Test34 {
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(6);
while (true){
int prev = i.get();
int next = prev * 7;
if (i.compareAndSet(prev,next)){
//成功就设置原子变量
break;
}
}
System.out.println(i.get());
}
}
输出:
42
接着优化

public class Test34 {
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(6);
updateAndGet(i,p -> p*9);
System.out.println(i.get());
}
public static int updateAndGet(AtomicInteger i, IntUnaryOperator operator){
while (true){
int prev = i.get();
int next = operator.applyAsInt(prev);
if (i.compareAndSet(prev,next)){
//成功就设置原子变量
break;
}
}
return i.get();
}
}
原子引用
AtomicReference
@Slf4j(topic = "c.test")
class DecimalAccountCas implements DecimalAccount{
//直接操作BigDecimal,可能线程不安全
//private BigDecimal balance;
private AtomicReference<BigDecimal> balance;
public DecimalAccountCas(BigDecimal balance){
this.balance=new AtomicReference<>(balance);
}
@Override
public BigDecimal getBalance() {
return balance.get();
}
@Override
public void withdraw(BigDecimal amount) {
while (true){
BigDecimal prev =balance.get();
//此处相减后会生成了新的变量next
BigDecimal next = prev.subtract(amount);
log.debug("prev id:{} - value:{},next id:{} - value:{}",prev.hashCode(),balance.get(),next.hashCode(),next);
//线程取得的变量prev的值(以前获得的)对比再次取得的变量prev的值(现在取得的),有没有改变
if (balance.compareAndSet(prev,next)){
break;
}
}
}
}
interface DecimalAccount{
BigDecimal getBalance();
void withdraw(BigDecimal amount);
static void demo (DecimalAccount account){
List<Thread> ts = new ArrayList<>();
for(int i=0;i<3;i++){
ts.add(new Thread(()->{
account.withdraw(BigDecimal.TEN);
},"t"+i));
}
ts.forEach(Thread::start);
ts.forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(account.getBalance());
}
}
输出如下:并发执行,第一轮,时间上,t2先修改成功,结束循环,t1、t0修改失败,接着循环,第二轮,t1修改成功,结束,t0修改失败,第三轮t0修改成功。
21:31:55.330 [t2] DEBUG c.test - prev id:65100 - value:2100,next id:64790 - value:2090
21:31:55.330 [t1] DEBUG c.test - prev id:65100 - value:2100,next id:64790 - value:2090
21:31:55.330 [t0] DEBUG c.test - prev id:65100 - value:2100,next id:64790 - value:2090
21:31:55.330 [t1] DEBUG c.test - prev id:64790 - value:2090,next id:64480 - value:2080
21:31:55.330 [t0] DEBUG c.test - prev id:64790 - value:2090,next id:64480 - value:2080
21:31:55.330 [t0] DEBUG c.test - prev id:64480 - value:2080,next id:64170 - value:2070
2070
ABA问题
t1将A改为B,t2将B改为A,主线程无法感知被修改过,因为字符串常量池的关系。需要使用AtomicStampedReference,增加版本号。



AtomicStampedReference
通过版本号,每次修改比对获取到的值和获取到的值时的版本号,成功则改值,版本加一
@Slf4j(topic = "c.test")
public class Test36 {
//第一个是引用,第二个是版本号
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A",0);
public static void main(String[] args) throws InterruptedException {
String prev = ref.getReference();
int stamp =ref.getStamp();
log.debug("stamp 1:{}",stamp);
other();
Thread.sleep(2000);
int stamp2 =ref.getStamp();
log.debug("stamp 2:{}",stamp2);
//
log.debug("A -> C:{}",ref.compareAndSet(prev,"A",stamp,stamp+1));
}
public static void other() throws InterruptedException {
new Thread(()->{
String prev1 = ref.getReference();
int stamp1 = ref.getStamp();
log.debug("A -> B:{}",ref.compareAndSet(prev1,"B",stamp1,stamp1+1));
},"t1").start();
Thread.sleep(500);
new Thread(()->{
String prev2 = ref.getReference();
int stamp2 = ref.getStamp();
log.debug("B -> A:{}",ref.compareAndSet(prev2,"A",stamp2,stamp2+1));
},"t2").start();
}
}
输出如下:
22:51:29.856 [main] DEBUG c.test - stamp 1:0
22:51:29.891 [t1] DEBUG c.test - A -> B:true
22:51:30.393 [t2] DEBUG c.test - B -> A:true
22:51:32.403 [main] DEBUG c.test - stamp 2:2
22:51:32.403 [main] DEBUG c.test - A -> C:false
AtomicMarkableReference
示例代码:
@Slf4j(topic = "c.test")
public class Test35 {
public static void main(String[] args) throws InterruptedException {
Car car = new Car("666");
//第二个参数是初始化的标记位,true/false
AtomicMarkableReference<Car> ref = new AtomicMarkableReference<>(car,true);
Car prev = ref.getReference();
new Thread(() -> {
prev.setCarMark("888");
//1.对比的引用,2.新的引用,3.期望的标记位,4.新的标记位
//每次改前,会对比引用,和看下期望的标记位,比如上面设置为true,如果有别的线程改为false了,那么修改不成功。成功修改后会设置新的标记位。
//引用也会被设置为新的(即第二个参数)
ref.compareAndSet(prev,prev,true,false);
},"t1").start();
//主线程暂停1秒,然后让t1修改,那么主线程就会修改失败。
Thread.sleep(1000);
boolean b = ref.compareAndSet(car, new Car("888"), true, false);
log.debug("888 b:{}",b);
}
}
class Car{
public Car(String carMark) {
this.carMark = carMark;
}
private String carMark;
public String getCarMark() {
return carMark;
}
public void setCarMark(String carMark) {
this.carMark = carMark;
}
}
输出如下:
11:57:15.820 [main] c.test - 888 b:false
AtomicIntegerArray
数组的并发问题
@Slf4j(topic = "c.test")
public class Test06 {
public static void main(String[] args) throws InterruptedException {
int [] array = new int[10];
Thread [] threads = new Thread[50];
for (int i =0;i<threads.length;i++){
threads[i]= new Thread(() -> {
for (int j =0;j<array.length;j++){
array[j]++;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
for (Thread thread:threads){
thread.start();
}
for(Thread thread:threads){
thread.join();
}
for(int i=0;i<array.length;i++){
log.debug("array[{}]:{}",i,array[i]);
}
}
}
输出如下:理论上应该都是50
18:09:14.142 [main] c.test - array[0]:50
18:09:14.144 [main] c.test - array[1]:46
18:09:14.144 [main] c.test - array[2]:45
18:09:14.144 [main] c.test - array[3]:43
18:09:14.144 [main] c.test - array[4]:47
18:09:14.144 [main] c.test - array[5]:50
18:09:14.144 [main] c.test - array[6]:41
18:09:14.144 [main] c.test - array[7]:48
18:09:14.144 [main] c.test - array[8]:46
18:09:14.144 [main] c.test - array[9]:49
解决方案
错误示范:原因,AtomicIntegerArray会把array复制一份到自己里面的数组去,然后操作里面的数组,原来的数组array是不会被改变的。牢记。
@Slf4j(topic = "c.test")
public class Test06 {
public static void main(String[] args) throws InterruptedException {
int [] array = new int[10];
AtomicIntegerArray integerArray =new AtomicIntegerArray(array);
Thread [] threads = new Thread[50];
for (int i =0;i<threads.length;i++){
threads[i]= new Thread(() -> {
for (int j =0;j<array.length;j++){
while (true){
//boolean b = integerArray.compareAndSet(j, integerArray.get(j), integerArray.get(j) + 1);
boolean b = integerArray.compareAndSet(j, array[j], array[j] + 1);
if (b){
break;
}
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
for (Thread thread:threads){
thread.start();
}
for(Thread thread:threads){
thread.join();
}
for(int i=0;i<array.length;i++){
//log.debug("integerArray[{}]:{}",i,integerArray.get(i));
log.debug("array[{}]:{}",i,array[i]);
}
}
}
正确:
@Slf4j(topic = "c.test")
public class Test06 {
public static void main(String[] args) throws InterruptedException {
int [] array = new int[10];
AtomicIntegerArray integerArray =new AtomicIntegerArray(array);
Thread [] threads = new Thread[50];
for (int i =0;i<threads.length;i++){
threads[i]= new Thread(() -> {
for (int j =0;j<array.length;j++){
while (true){
//第一个是数组下标,第二个是期待的值,第三个是要修改成什么值
boolean b = integerArray.compareAndSet(j, integerArray.get(j), integerArray.get(j) + 1);
//boolean b = integerArray.compareAndSet(j, array[j], array[j] + 1);
if (b){
break;
}
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
for (Thread thread:threads){
thread.start();
}
for(Thread thread:threads){
thread.join();
}
for(int i=0;i<array.length;i++){
log.debug("integerArray[{}]:{}",i,integerArray.get(i));
//log.debug("array[{}]:{}",i,array[i]);
}
}
}
字段更新器


原子累加器
LongAdder
Unsafe
在Java中,实际实现乐观锁时需要使用Unsafe类,主要是因为CAS(Compare-And-Swap)操作需要底层CPU指令的支持,而Java语言本身并没有直接提供这样的原生操作。
通过Unsafe类,可以实现真正的无锁数据结构,如AtomicInteger、AtomicReference等。 这些类在Java并发包中被广泛使用,它们的底层实现都依赖于Unsafe类提供的CAS操作。
浙公网安备 33010602011771号