Java多线程之CAS中的ABA问题与JUC的常见类

EIP

文章目录

throws声明异常

一. CAS指令与ABA问题

1. 解析CAS

CAS即compare and awap, 字面意思是比较并交换, 具体说就是将寄存器或者某个内存上的值A与另一个内存上的值V进行比较, 如果相同就将B与需要修改的值V进行交换, 并返回交换是否成功的结果.

vue3.0

我们假设内存中的原数据V, 旧的预期值A, 需要修改的新值B, 具体涉及下面三个操作.

Java前后端分离

img

图论

  1. 比较AV是否相等(比较).
  2. 如果比较相等, 将B写入V(交换), 不相等则不执行任何操作.
  3. 返回操作是否成功.

在上述交换过程中, 大多数情况下并不关心B后续的情况, 更关心的是V这个变量的情况, 这里的交换, 也可以近似理解成 “赋值”.

数字可视化

伪代码如下:

mukes

boolean CAS(A, B, V) {
	if (A == V) {
		V = B;
		return true;
	}
	return false;
}

CAS最特殊的地方在于, 上述过程, 并非是通过一段代码实现的, 而是通过一条CPU指令完成的, 该指令是具有原子性的, 是线程安全的.

maven

2. 基于CAS实现的原子类

Java标准库中提供了基于CAS所实现的 “原子类”, 这些类的类名以Atomic开头,针对常用的 int, long 等类型进行了封装, 它们可以基于CAS的方式进行修改, 并且保证线程安全性.

ofdma

img

数据分析

方法 解释
addAndGet(int delta) i += delta
decrementAndGet() –i
getAndDecrement() i–
incrementAndGet() ++i
getAndIncrement() i++

这里举个例子, 典型的就是 AtomicInteger 类, 要实现多线程自增同一个变量, 其中的 getAndIncrement 相当于 i++ 操作.

plugin

import java.util.concurrent.atomic.AtomicInteger;

public class TestDemo25 {
    // 编写代码, 基于 AtomicInteger 实现多线程自增同一个变量
    public static void main(String[] args) throws InterruptedException {
        AtomicInteger count = new AtomicInteger(0);
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 50000; i++) {
                // 这个方法就相当于 count++
                count.getAndIncrement();
            }
        });
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 50000; i++) {
                // 这个方法就相当于 count++
                count.getAndIncrement();
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(count.get());
    }
}

执行结果:

虚幻引擎5

img

代码

上面的代码就是基于CAS实现的++操作, 不会存在线程安全问题, 这里既能够保证线程安全, 又要比 synchronized高效, synchronized会涉及到锁的竞争, 两个线程要相互等待, CAS不涉及到线程阻塞等待.

Harbor

上面所使用的AtomicInteger类方法getAndIncrement实现的伪代码如下:

WEB自动化测试实战

class AtomicInteger {
    private int value;//保存的值
    //自增操作
    public int getAndIncrement() {
        int oldValue = value;
        while ( CAS(value, oldValue, oldValue+1) != true) {
            oldValue = value;
        }
        return oldValue;
    }
}

对于CAS指令, 它的执行逻辑就是先判断value的值与oldValue的值是否相同, 如果相同就把oldValue+1的值进写入到value中(相当于将value的值加1), oldValue可以理解成寄存器里的值, 相当于是先把value变量内存中的值读至到寄存器当中, 在单线程环境中oldValue的值与value的值一定是相同, 但多线程环境下就不一定了, 因为value的值随时都有可能被其他线程修改, 比如执行完读取value到寄存器, 线程就发生切换了, 另外一个线程, 也进行修改了value的值, 然后当这个线程再被调度回来后, 再进行CAS判定 ,就认为value和oldValue不相等了.

微软

接着往下看while循环, 该循环使用CAS指令是否成功为判断条件, 如果CAS成功了则退出循环, 此时value的值已经加1, 最终返回oldValue; 如果CAS指令失败了; 这就说明有新线程提前对当前的value进行了++, value的值发生了改变, 这这时候就需要将新的value的值赋值给oldValue, 然后尝试重新进行CAS操作, 这样就能保证有几个线程操作, 那就自增几次, 从而也就保证了线程安全.

交叉验证

结合下图, 当两个线程现指令交错的情况, 理解基于CAS指令实现的多线程自增操作是如何保证线程安全的.

媒体

img

K8s入门教学

3. 基于CAS实现自旋锁

CAS的应用场景处了实现原子类, 还能够实现自旋锁, 伪代码如下:

代码自动生成

//自旋锁对象
public class SpinLock {
    //记录当前锁对象被哪个线程占用,为null,表示锁对象未被占用
    private Thread ownerv = null;
    public void lock(){
        // 通过 CAS 看当前锁是否被某个线程持有. 
        // 如果这个锁已经被别的线程持有,那么就自旋等待. 
        // 如果这个锁没有被别的线程持有,那么就把 owner 设为当前尝试加锁的线程. 
        while(!CAS(this.owner, null, Thread.currentThread())){
        }
   }
    public void unlock (){
        this.owner = null;
   }
}

上面CAS与自旋锁的逻辑为了监测当前锁对象是否被线程占用, CAS监测当前的owner是否是null, 如果是null, 就进行交换, 也就是把当前线程的引用赋值给owner, 此时循环结束, 退出lock方法, 加锁就完成了.

IC

如果当前锁, 已经被别的线程占用了, CAS就会发现, this.owner不是null, CAS就不会产生赋值, 也同时返回false, 循环继续执行, 并进行下次判定.

Layer0扩容

解锁的逻辑简单了, 将占用锁对象的线程(ownerv)置为null即可.

监控

4. ABA问题

CAS指令操作的核心的检查valueoldValue是否相同, 如果相同, 就视为value中途没有被修改过, 所以进行下一步交换操作是没问题的, 在大部分情况下都能保证线程安全.

摸鱼

但这里有一种非常极端的情况, 这里说到的相同, value的值可能是没改过的, 还有可能是value的值被修改后又被改回到原来的值, 比如把value的值设为A的话, CAS判定value为A, 此时可能确实value始终是A, 也可能是value本来是A, 然后被改成了B, 最后又还原成了A.

swing

上数说到的极端情况就是CAS中的ABA问题, 在一些极端场景下就会有bug存在, 比如下面的场景.

数据埋点

有一天, 滑稽老铁要到ATM机去取款, 假设当前滑稽的账户余额1000, 滑稽准备取500, 当滑稽老铁按下取款的这一瞬间, 机器卡了一下, 滑稽下意识就多按了一下, 如果考虑使用CAS的方式来扣款, 系统扣款的情况可能是下图所示:

swiftui

img正常情况下, 即使按下两次取款按钮最终的结果也是正常的, 但考虑一种极端情况, 如果在第一次CAS成功后的一瞬间, 滑稽老铁的朋友又给给滑稽转账了500, 导致第一次CAS扣款500后的余额从500又变回到了1000, 然后紧接着第二次CAS操作也会执行成功, 又成功扣款500, 最终余额变成了500, 这种结果显然是不合理的, 而正确的程序应该是第二次CAS仍然失败, 最终余额为1000元.

python_error

img

人脸识别

上述描述场景是极端的情况, 发生的概率是非常非常低的, 一方面, 恰好滑稽这边多按了几次产生多个扣款操作, 另一方面, 又赶巧在这个非常极限的时间内, 有人转账了一样的金额…

区块链

不过上述ABA问题在极端可能下造成的bug也是有办法解决的, 可以针对当前问题引入一个版本号, 假设初始版本号是1, 版本号只能增加不能减少, 每次修改版本号都+1, 然后进行CAS的时候, 就不是以金额值为基准了, 而是以版本号为基准, 在进行CAS操作之前, 都要对版本号进行验证, 如果版本号与之前加载的版本号不同, 则放弃此次CAS指令操作, 看下图理解, 这样最终的结果就是正确的了.

img

二. JUC中的常见类

Java中的JUC就是来自java.util.concurrent包下的一些标准类或者接口, 放的都是并发编程(多线程)相关的组件.

1. Callable接口

常见的创建线程的方式有两种方式, 第一种方法是直接继承Thread类, 重写run方法, 第二种方法是实现Runnable接口, 然后还是要靠Thread类的构造器, 把Runnable传进去, 最终调用的就是Runnablerun方法。; 和Runnable类似, 我们还可以通过Callable接口描述一个任务配合FutureTask类来创建线程, 和Runnable不同的是, Callable接口配合FutureTask类所创建的线程其中的任务是可以带有返回值的, 而一开始提到的那两种方式任务是不支持带返回值的.

Callable接口中有一个call方法(返回值是泛型参数), 就相当于Runnable接口中的run方法(无返回值), FutureTask可用于异步获取执行结果或取消执行任务的场景, 通过传入Runnable或者Callable的任务给FutureTask, 直接调用其run方法或者放入线程池执行, 之后可以在外部通过FutureTaskget方法异步获取执行结果, 如果任务还没有执行完毕, get方法会阻塞直到任务返回结果.

理解FutureTask可以为想象去吃麻辣烫, 当餐点好后, 后厨就开始做了, 同时前台会给你一张 “小票”, 这个小票就是FutureTask, 后面我们可以随时凭这张小票去查看自己的这份麻辣烫做出来了没.

使用Thread类的构造器创建线程的时候, 传入的引用不能是Callable类型的, 而应该是FutrueTask类型, 因为构造器中传入的任务类型需要是一个Runnable类,CallableRunnable是没有直接关系的, 但FutrueTask类实现了Runnable类, 所以要想使用Callable创建线程, 我们就需要先把实现Callable接口的对象引用传给FutrueTask类的实例对象, 再将FutrueTask实例传入线程构造器中.

img

总结一下就是, 我们可以用Callable用来描述任务, FutureTask类用来管理Callable任务的执行结果.

比如, 我们使用Callable来计算 1 + 2 + 3 + … + 1000 的值, 并通过返回值的方式获取执行结果.

代码示例:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class TestDemo26 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 使用 Callable 来计算 1 + 2 + 3 + ... + 1000
        Callable<Integer> callable = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int sum = 0;
                for (int i = 0; i <= 1000; i++) {
                    sum += i;
                }
                return sum;
            }
        };
        FutureTask<Integer> futureTask = new FutureTask<>(callable);
        Thread t = new Thread(futureTask);
        t.start();
        //获取执行结果
        Integer sum = futureTask.get();
        System.out.println(sum);
    }
}

执行结果:

img

2. ReentrantLock类(可重入锁)

ReentrantLock是除了synchronized外标准库给我们提供的另一种可重入锁, 与synchronized不同的是, ReentrantLock是通过lock方法加锁,unlock方法解锁, 相比于synchronized直接基于代码块的方式来加锁解锁更加传统.

正是由于加锁解锁两个操作是分开的, 所以代码的写法上需要格外注意, 一方面lock后如果之后的工作代码比较长久容易忘记去unlock从而造成死锁, 另一方面加锁后解锁前中间的代码万一出了问题(比如直接return或者出现异常), 都可能导致不能顺利执行unlock造成死锁.

ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();
// working
reentrantLock.unlock();

所以使用ReentrantLock类时, 一般要搭配finally使用, 将unlock放入到finally保证unlock一定会执行.

ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();
try {
    // working
} finally {
    reentrantLock.unlock();
}
reentrantLock.unlock();

🎯ReentrantLock相较于synchronized的优势:

  1. ReentrantKLock提供了公平锁版本的实现, 而synchronized只实现了非公平锁, ReentrantKLock默认是非公平锁, 而公平锁版本只需要指定fair参数为true即可.img.
  2. 对于synchronized来说, 提供的加锁操作就是"死等", 也就是说如果锁已经被占用, 只要锁没释放就需要一直等下去,而ReentrantLock提供了更灵活的等待方式: tryLeock.
方法 解释
boolean trylock() 无参数, 能加锁就加, 加不上就放弃
boolean trylock(long timeout, TimeUnit unit) 有参数, 超过指定时间, 加不上锁就放弃
  1. ReentrantLock提供了一个更方便的等待通知机制, synchronized搭配的是wait, notify, 当我们notify的时候是随即唤醒一个wait状态的线程; 而ReentrantLock搭配一个Condition类, 进行唤醒的时候可以唤醒指定线程.

3. Semaphore类(信号量)

Java中信号量(Semaphore)是把操作系统原生的信号量封装了一下, 本质就是一个计数器, 描述了 “可用资源的个数”,主要涉及到两个操作
P操作: 申请一个可用资源, 计数器 -1.
V操作: 释放一个可用资源, 计数器 +1.

如果计数器为0了, 继续Р操作, 就会出现阻塞等待的情况.

🍂举个例子来理解信号量:

会开车的应该经常会碰到, 停车, 停车场门口有一个灯牌, 会显示停车位还剩余多少个, 每进去一辆车, 显示的停车位数量就-1, 就相当于进行了一次P操作, 每出去一辆车, 显示的停车位数量就+1, 就相当于进行了一次V操作; 而当停车场的剩余车位为0时, 显示的停车位数量就为0了, 此时如果还有车想停, 要么在这里等, 要么就去其他停车场.

🎯Semaphore类的常用方法:

  • 构造方法
public Semaphore(int permits) 构造可用资源为permits个的信号量对象
public Semaphore(int permits, boolean fair) 构造可用资源为permits个的信号量对象, 并指定信号量是否是公平性质的
  • PV方法

Semaphore的PV操作中的加减计数器操作都是原子的, 可以在多线程环境下直接使用.

public void acquire() throws InterruptedException 申请可用资源
public void release() 释放可用资源

代码示例:

  • 创建Semaphore示例, 初始化为4, 表示有4个可用资源.
  • acquire方法表示申请资源(P操作), release方法表示释放资源(V操作).
  • 创建20个线程, 每个线程都尝试申请资源, sleep1秒之后, 释放资源. 观察程序的执行效果.
import java.util.concurrent.Semaphore;

public class Test {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(4);

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("申请资源");
                    semaphore.acquire();
                    System.out.println("我获取到资源了");
                    Thread.sleep(1000);
                    System.out.println("我释放资源了");
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        for (int i = 0; i < 20; i++) {
            Thread t = new Thread(runnable);
            t.start();
        }
    }
}

执行结果:

img

考虑这里一个场景, 假设有一个计数器初始值为1的信号量, 针对这个信号量的值, 就只有10两种取值, 因为信号量不能是负的,

  • 执行一次Р操作, 1->0
  • 执行一次V操作, 0->1

如果已经进行一次Р操作了, 继续进行Р操作, 就会阻塞等待, 这是不是和锁的功效有点类似呢?

锁就可以视为 “二元信号量”, 可用资源就1个, 计数器的取值非01, 可以说, 锁是信号量的一种特殊情况, 信号量就把锁推广到了一般情况, 描述了可用资源更多的时候是如何处理的.

所以说, 计数器初始值为1的信号量就可以当成锁来使用, 这里我们编写一个代码实现两个线程增加同一个变量, 使用Semphore来控制线程安全.

import java.util.concurrent.Semaphore;

public class TestDemo27 {
    // 编写代码实现两个线程增加同一个变量, 使用 Semphore 来控制线程安全.
    public static int count = 0;
    public static void main(String[] args) throws InterruptedException {
        Semaphore semaphore = new Semaphore(1);
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 50000; i++) {
                try {
                    semaphore.acquire();
                    count++;
                    semaphore.release();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 50000; i++) {
                try {
                    semaphore.acquire();
                    count++;
                    semaphore.release();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(count);
    }
}

执行结果:

img

4. CountDownLatch同步工具类

CountDownLatch是一个同步工具类, 用来协调多个线程之间的同步, 或者说起到线程之间的通信(而不是用作互斥的作用).

CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后, 再继续执行; 使用一个计数器进行实现, 计数器初始值为线程的数量; 当每一个线程完成自己任务后, 计数器的值就会-1, 当计数器的值为0时, 表示所有的线程都已经完成一些任务, 然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务.

想象一个场景进行理解, 假设有一场跑步比赛, 比赛开始时间是明确的, 裁判的发令枪响了比赛就开始了, 但结束时间是不确定的, 只有当所有的选手都冲过了终点线才算结束, 这里的运动员就相当于线程, 而CountDownLatch类用判断什么时候最后一个远动员冲过终点线, 即这些线程在什么时候可以全部执行结束.

🎯CountDownLatch类常用方法:

  • 构造方法
public CountDownLatch(int count) 构造实例对象, count表示CountDownLatch对象中计数器的值
  • 普通方法
public void await() throws InterruptedException 使所处的线程进入阻塞等待, 直到计数器的值清零
public void countDown() 将计数器的值减1
public long getCount() 获取计数器最初的值

代码示例:

  • 10个选手依次就位, 哨声响才同时出发, 所有选手都通过终点, 比赛结束.
  • 构造CountDownLatch实例, 初始化10表示有10个任务需要完成(10个选手参加比赛).
  • 每个任务执行完毕, 都调用latch.countDown(), 在CountDownLatch内部的计数器同时自减(有一个选手冲过了终点线).
  • 主线程中使用 latch.await(), 阻塞等待所有任务执行完毕, 相当于计数器为0了(所有选手都冲过了终点线比赛结束).
import java.util.concurrent.CountDownLatch;

public class Test2 {
    public static void main(String[] args) throws InterruptedException {
        //构造方法的参数表示有几个选手
        CountDownLatch latch = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            Thread t = new Thread(() -> {
                try {
                    Thread.sleep(3000);
                    System.out.println(Thread.currentThread().getName() + " 到达终点");
                    latch.countDown(); //调用countDown的次数和个数一致
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            t.start();
        }

        //裁判要等所有线程到达
        //当这些线程没有执行完的时候,await就会阻塞,所有线程执行完了,await 才返回
        latch.await();
        System.out.println("比赛结束");
    }
}

执行结果:
img

实际开发中这样的场景也是存在的, 比如多线程下载(迅雷, steam等下载器), 当我们下载一个比较大的文件资源(电影), 通过多线程下载就可以提高下载速度, 把一个大的文件拆成多个部分安排多个线程下载, 每个线程负责下载其中的一个部分, 等到是所有的线程都完成自己的下载, 才算把整个文件下载完, 这里就可以用到CountDownLatch来判断文件整体是否下载完毕, 多线程下载是充分利用了带宽(下载是IO操作, 和CPU关系不大).

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注