JAVA - 并发 - 无锁(乐观锁)

无锁(乐观锁)

介绍,无锁,即乐观锁,是非阻塞的

image

CAS

先看一段代码,采用加锁的方式保证并发的安全

package org.example.xiancheng.study11;

import java.util.ArrayList;
import java.util.List;

public class AccountTest{

    public static void main(String[] args) {
        Account account =new AccountUnsafe(10000);
        Account.demo(account);
    }

}

class AccountUnsafe implements Account{

    private Integer balance;

    public AccountUnsafe(Integer balance){
        this.balance=balance;
    }

    @Override
    public Integer getBalance() {
        synchronized (this){
            return this.balance;
        }
    }

    @Override
    public void withdraw(Integer amount) {
        synchronized (this){
            this.balance -= amount;
        }
    }
}

interface Account {
    //获取余额
    Integer getBalance();

    //取款
    void withdraw(Integer amout);

    /*起1000个线程,分别取10元*/
    static void demo(Account account) {
        List<Thread> ts  =new ArrayList<>();
        for(int i =0;i<1000;i++){
            ts.add(new Thread(() -> {
                account.withdraw(10);
            }));
        }
        long start = System.currentTimeMillis();
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.currentTimeMillis();
        System.out.println(account.getBalance()+" cost:"+(end-start));
    }

}

通过CAS方式来实现并发的安全

package org.example.xiancheng.study11;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class AccountTest{

    public static void main(String[] args) {
        Account account = new AccountCAS(10000);
        Account.demo(account);
    }

}

class AccountCAS implements Account{

    private AtomicInteger balance;

    public AccountCAS(int balance){
        this.balance=new AtomicInteger(balance);
    }

    @Override
    public Integer getBalance() {
        return balance.get();
    }

    @Override
    public void withdraw(Integer amount) {
        while (true){
            int prev = balance.get();
            int next =  prev - amount;
            if (balance.compareAndSet(prev,next)){
                break;
            }
        }
    }
}

interface Account {
    //获取余额
    Integer getBalance();

    //取款
    void withdraw(Integer amout);

    /*起1000个线程,分别取10元*/
    static void demo(Account account) {
        List<Thread> ts  =new ArrayList<>();
        for(int i =0;i<1000;i++){
            ts.add(new Thread(() -> {
                account.withdraw(10);
            }));
        }
        long start = System.currentTimeMillis();
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.currentTimeMillis();
        System.out.println(account.getBalance()+" cost:"+(end-start));
    }

}

分析原理

image
线程1获取到余额100(第一个参数值),改为90,线程2已经最新的余额改为90,线程1调用cas方法,比较自己获取到的余额100与最新的余额90,不相等,方法返回false,线程1不修改,进入下一次循环,直到比较相等时返回true,则修改为第二个参数值。
image
image

慢动作分析,见视频,此处不赘述。

CAS借助了

image
image

CAS的特点
image

原子操作类

AtomicInteger

示例一:

public class Test34 {

    public static void main(String[] args) {
        AtomicInteger i = new AtomicInteger(0);
        //先自增1再获得
        System.out.println(i.incrementAndGet());
        //先获得在自增1
        System.out.println(i.getAndIncrement());
        System.out.println(i.get());
    }
}

输出:

1
1
2

示例二:

public class Test34 {

    public static void main(String[] args) {
        AtomicInteger i = new AtomicInteger(0);
        //先自增5再获得
        System.out.println(i.addAndGet(5));
        //先获得在自增12
        System.out.println(i.getAndAdd(12));
        System.out.println(i.get());
    }
}

输出:

5
5
17

示例三:

public class Test34 {

    public static void main(String[] args) {
        AtomicInteger i = new AtomicInteger(6);
        //value是AtomicInteger里的值,即原子变量,读取到变量,然后相乘
        i.updateAndGet(value -> value*7);
        System.out.println(i.get());
    }
}

输出:

42

手写模仿updateAndGet方法

第一步
public class Test34 {

    public static void main(String[] args) {
        AtomicInteger i = new AtomicInteger(6);
        while (true){
            int prev = i.get();
            int next = prev * 7;
            if (i.compareAndSet(prev,next)){
                //成功就设置原子变量
                break;
            }
        }
        System.out.println(i.get());
    }
}

输出:

42
接着优化

image

public class Test34 {

    public static void main(String[] args) {
        AtomicInteger i = new AtomicInteger(6);
        updateAndGet(i,p -> p*9);
        System.out.println(i.get());
    }

    public static int updateAndGet(AtomicInteger i, IntUnaryOperator operator){
        while (true){
            int prev = i.get();
            int next = operator.applyAsInt(prev);
            if (i.compareAndSet(prev,next)){
                //成功就设置原子变量
                break;
            }
        }
        return i.get();
    }
}

原子引用

AtomicReference

@Slf4j(topic = "c.test")
class DecimalAccountCas implements DecimalAccount{

    //直接操作BigDecimal,可能线程不安全
    //private BigDecimal balance;

    private AtomicReference<BigDecimal> balance;

    public DecimalAccountCas(BigDecimal balance){
        this.balance=new AtomicReference<>(balance);
    }

    @Override
    public BigDecimal getBalance() {

        return balance.get();
    }

    @Override
    public void withdraw(BigDecimal amount) {
        while (true){
            BigDecimal prev =balance.get();
            //此处相减后会生成了新的变量next
            BigDecimal next = prev.subtract(amount);
            log.debug("prev id:{} - value:{},next id:{} - value:{}",prev.hashCode(),balance.get(),next.hashCode(),next);
            //线程取得的变量prev的值(以前获得的)对比再次取得的变量prev的值(现在取得的),有没有改变
            if (balance.compareAndSet(prev,next)){
                break;
            }
        }
    }
}

interface DecimalAccount{

    BigDecimal getBalance();

    void withdraw(BigDecimal amount);

    static void demo (DecimalAccount account){
        List<Thread> ts = new ArrayList<>();
        for(int i=0;i<3;i++){
            ts.add(new Thread(()->{
                account.withdraw(BigDecimal.TEN);
            },"t"+i));
        }
        ts.forEach(Thread::start);
        ts.forEach(t->{
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(account.getBalance());
    }

}

输出如下:并发执行,第一轮,时间上,t2先修改成功,结束循环,t1、t0修改失败,接着循环,第二轮,t1修改成功,结束,t0修改失败,第三轮t0修改成功。

21:31:55.330 [t2] DEBUG c.test - prev id:65100 - value:2100,next id:64790 - value:2090
21:31:55.330 [t1] DEBUG c.test - prev id:65100 - value:2100,next id:64790 - value:2090
21:31:55.330 [t0] DEBUG c.test - prev id:65100 - value:2100,next id:64790 - value:2090
21:31:55.330 [t1] DEBUG c.test - prev id:64790 - value:2090,next id:64480 - value:2080
21:31:55.330 [t0] DEBUG c.test - prev id:64790 - value:2090,next id:64480 - value:2080
21:31:55.330 [t0] DEBUG c.test - prev id:64480 - value:2080,next id:64170 - value:2070
2070

ABA问题

t1将A改为B,t2将B改为A,主线程无法感知被修改过,因为字符串常量池的关系。需要使用AtomicStampedReference,增加版本号。
image
image
image

AtomicStampedReference

通过版本号,每次修改比对获取到的值和获取到的值时的版本号,成功则改值,版本加一

@Slf4j(topic = "c.test")
public class Test36 {

    //第一个是引用,第二个是版本号
    static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A",0);

    public static void main(String[] args) throws InterruptedException {
        String prev = ref.getReference();
        int stamp =ref.getStamp();
        log.debug("stamp 1:{}",stamp);
        other();
        Thread.sleep(2000);
        int stamp2 =ref.getStamp();
        log.debug("stamp 2:{}",stamp2);
        //
        log.debug("A -> C:{}",ref.compareAndSet(prev,"A",stamp,stamp+1));
    }

    public static void other() throws InterruptedException {
        new Thread(()->{
            String prev1 = ref.getReference();
            int stamp1 = ref.getStamp();
            log.debug("A -> B:{}",ref.compareAndSet(prev1,"B",stamp1,stamp1+1));
        },"t1").start();
        Thread.sleep(500);
        new Thread(()->{
            String prev2 = ref.getReference();
            int stamp2 = ref.getStamp();
            log.debug("B -> A:{}",ref.compareAndSet(prev2,"A",stamp2,stamp2+1));
        },"t2").start();
    }

}

输出如下:

22:51:29.856 [main] DEBUG c.test - stamp 1:0
22:51:29.891 [t1] DEBUG c.test - A -> B:true
22:51:30.393 [t2] DEBUG c.test - B -> A:true
22:51:32.403 [main] DEBUG c.test - stamp 2:2
22:51:32.403 [main] DEBUG c.test - A -> C:false

AtomicMarkableReference

示例代码:

@Slf4j(topic = "c.test")
public class Test35 {

    public static void main(String[] args) throws InterruptedException {
        Car car = new Car("666");
        //第二个参数是初始化的标记位,true/false
        AtomicMarkableReference<Car> ref = new AtomicMarkableReference<>(car,true);
        Car prev = ref.getReference();
        new Thread(() -> {
            prev.setCarMark("888");
            //1.对比的引用,2.新的引用,3.期望的标记位,4.新的标记位
            //每次改前,会对比引用,和看下期望的标记位,比如上面设置为true,如果有别的线程改为false了,那么修改不成功。成功修改后会设置新的标记位。
            //引用也会被设置为新的(即第二个参数)
            ref.compareAndSet(prev,prev,true,false);
        },"t1").start();
        //主线程暂停1秒,然后让t1修改,那么主线程就会修改失败。
        Thread.sleep(1000);
        boolean b = ref.compareAndSet(car, new Car("888"), true, false);
        log.debug("888 b:{}",b);
    }

}

class Car{

    public Car(String carMark) {
        this.carMark = carMark;
    }

    private String carMark;

    public String getCarMark() {
        return carMark;
    }

    public void setCarMark(String carMark) {
        this.carMark = carMark;
    }
}

输出如下:

11:57:15.820 [main] c.test - 888 b:false

AtomicIntegerArray

数组的并发问题

@Slf4j(topic = "c.test")
public class Test06 {

    public static void main(String[] args) throws InterruptedException {
        int [] array = new int[10];
        Thread [] threads = new Thread[50];
        for (int i =0;i<threads.length;i++){
            threads[i]= new Thread(() -> {
                for (int j =0;j<array.length;j++){
                    array[j]++;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }
        for (Thread thread:threads){
            thread.start();
        }
        for(Thread thread:threads){
            thread.join();
        }
        for(int i=0;i<array.length;i++){
            log.debug("array[{}]:{}",i,array[i]);
        }
    }
}

输出如下:理论上应该都是50

18:09:14.142 [main] c.test - array[0]:50
18:09:14.144 [main] c.test - array[1]:46
18:09:14.144 [main] c.test - array[2]:45
18:09:14.144 [main] c.test - array[3]:43
18:09:14.144 [main] c.test - array[4]:47
18:09:14.144 [main] c.test - array[5]:50
18:09:14.144 [main] c.test - array[6]:41
18:09:14.144 [main] c.test - array[7]:48
18:09:14.144 [main] c.test - array[8]:46
18:09:14.144 [main] c.test - array[9]:49

解决方案

错误示范:原因,AtomicIntegerArray会把array复制一份到自己里面的数组去,然后操作里面的数组,原来的数组array是不会被改变的。牢记。

@Slf4j(topic = "c.test")
public class Test06 {

    public static void main(String[] args) throws InterruptedException {
        int [] array = new int[10];
        AtomicIntegerArray integerArray =new AtomicIntegerArray(array);
        Thread [] threads = new Thread[50];
        for (int i =0;i<threads.length;i++){
            threads[i]= new Thread(() -> {
                for (int j =0;j<array.length;j++){
                    while (true){
                        //boolean b = integerArray.compareAndSet(j, integerArray.get(j), integerArray.get(j) + 1);
                        boolean b = integerArray.compareAndSet(j, array[j], array[j] + 1);
                        if (b){
                            break;
                        }
                    }
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }
        for (Thread thread:threads){
            thread.start();
        }
        for(Thread thread:threads){
            thread.join();
        }
        for(int i=0;i<array.length;i++){
            //log.debug("integerArray[{}]:{}",i,integerArray.get(i));
            log.debug("array[{}]:{}",i,array[i]);
        }
    }
}

正确:

@Slf4j(topic = "c.test")
public class Test06 {

    public static void main(String[] args) throws InterruptedException {
        int [] array = new int[10];
        AtomicIntegerArray integerArray =new AtomicIntegerArray(array);
        Thread [] threads = new Thread[50];
        for (int i =0;i<threads.length;i++){
            threads[i]= new Thread(() -> {
                for (int j =0;j<array.length;j++){
                    while (true){
                        //第一个是数组下标,第二个是期待的值,第三个是要修改成什么值
                        boolean b = integerArray.compareAndSet(j, integerArray.get(j), integerArray.get(j) + 1);
                        //boolean b = integerArray.compareAndSet(j, array[j], array[j] + 1);
                        if (b){
                            break;
                        }
                    }
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }
        for (Thread thread:threads){
            thread.start();
        }
        for(Thread thread:threads){
            thread.join();
        }
        for(int i=0;i<array.length;i++){
            log.debug("integerArray[{}]:{}",i,integerArray.get(i));
            //log.debug("array[{}]:{}",i,array[i]);
        }
    }
}

字段更新器

image
image

原子累加器

LongAdder

Unsafe

在Java中,实际实现乐观锁时需要使用Unsafe类,主要是因为‌CAS(Compare-And-Swap)操作需要底层CPU指令的支持‌,而Java语言本身并没有直接提供这样的原生操作。
通过Unsafe类,可以实现真正的无锁数据结构,如AtomicInteger、AtomicReference等。 这些类在Java并发包中被广泛使用,它们的底层实现都依赖于Unsafe类提供的CAS操作。

posted @ 2026-02-06 13:47  cdc321  阅读(3)  评论(0)    收藏  举报